I need to get subscribe to Uniswap pair contract Sync event and get pair reserves. So here what I tried to do:
[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
[Parameter("uint112", "reserve0")]
public virtual BigInteger Reserve0 { get; set; }
[Parameter("uint112", "reserve1", 2)]
public virtual BigInteger Reserve1 { get; set; }
}
public async Task Start()
{
readonly string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
string uniSwapFactoryAbi = GetAbi(Resources.IUniswapV2Factory);
string uniSwapPairAbi = GetAbi(Resources.IUniswapV2Pair);
var web3 = new Web3("https://mainnet.infura.io/v3/fff");
Contract uniSwapFactoryContract = web3.Eth.GetContract(uniSwapFactoryAbi, uniSwapFactoryAddress);
Function uniSwapGetPairFunction = uniSwapFactoryContract.GetFunction("getPair");
string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
string pairContractAddress = await uniSwapGetPairFunction.CallAsync<string>(wethAddress, daiAddress);
Contract pairContract = web3.Eth.GetContract(uniSwapPairAbi, pairContractAddress);
Event pairSyncEvent = pairContract.GetEvent("Sync");
NewFilterInput pairSyncFilter = pairSyncEvent.EventABI.CreateFilterInput();
using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/fff"))
{
var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().
Subscribe(log =>
{
try
{
EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
if (decoded != null)
{
decimal reserve0 = Web3.Convert.FromWei(decoded.Event.Reserve0);
decimal reserve1 = Web3.Convert.FromWei(decoded.Event.Reserve1);
Console.WriteLine($@"Price={reserve0 / reserve1}");
}
else Console.WriteLine(@"Found not standard transfer log");
}
catch (Exception ex)
{
Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
}
});
await client.StartAsync();
await subscription.SubscribeAsync(pairSyncFilter);
}
}
string GetAbi(byte[] storedContractJson)
{
string json = Encoding.UTF8.GetString(storedContractJson);
JObject contractObject = JObject.Parse(json);
if (!contractObject.TryGetValue("abi", out JToken abiJson)) throw new KeyNotFoundException("abi object was not found in stored contract json");
return abiJson.ToString();
}
And it seems to subscribe, but never enters Subscribe
lambda.
Also if I try to await subscription.SubscribeAsync();
without any filter, it also doesn't enter Subscribe
lambda.
But after executing SubscribeAsync
CPU is significantly loaded by the process.
Subscribe
lambda called?I don't see a major issue with your code, but as I don't have the abis, this is an example without them. The "Sync" event does not fire all the time, so that might have been the issue.
using Nethereum.ABI.FunctionEncoding.Attributes;
using Nethereum.Contracts;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Text;
using System.Threading.Tasks;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Web3;
using Newtonsoft.Json.Linq;
namespace Nethereum.WSLogStreamingUniswapSample
{
class Program
{
[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
[Parameter("uint112", "reserve0")]
public virtual BigInteger Reserve0 { get; set; }
[Parameter("uint112", "reserve1", 2)]
public virtual BigInteger Reserve1 { get; set; }
}
public partial class GetPairFunction : GetPairFunctionBase { }
[Function("getPair", "address")]
public class GetPairFunctionBase : FunctionMessage
{
[Parameter("address", "tokenA", 1)]
public virtual string TokenA { get; set; }
[Parameter("address", "tokenB", 2)]
public virtual string TokenB { get; set; }
}
public static async Task Main()
{
string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
var web3 = new Web3.Web3("https://mainnet.infura.io/v3/7238211010344719ad14a89db874158c");
string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
var pairContractAddress = await web3.Eth.GetContractQueryHandler<GetPairFunction>()
.QueryAsync<string>(uniSwapFactoryAddress,
new GetPairFunction() {TokenA = daiAddress, TokenB = wethAddress});
var filter = web3.Eth.GetEvent<PairSyncEventDTO>(pairContractAddress).CreateFilterInput();
using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/7238211010344719ad14a89db874158c"))
{
var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().
Subscribe(log =>
{
try
{
EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
if (decoded != null)
{
decimal reserve0 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve0);
decimal reserve1 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve1);
Console.WriteLine($@"Price={reserve0 / reserve1}");
}
else Console.WriteLine(@"Found not standard transfer log");
}
catch (Exception ex)
{
Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
}
});
await client.StartAsync();
subscription.GetSubscribeResponseAsObservable().Subscribe(id => Console.WriteLine($"Subscribed with id: {id}"));
await subscription.SubscribeAsync(filter);
Console.ReadLine();
await subscription.UnsubscribeAsync();
}
}
}
To keep it alive in infura, you might want to ping it every so often Example
while (true)
{
var handler = new EthBlockNumberObservableHandler(client);
handler.GetResponseAsObservable().Subscribe(x => Console.WriteLine(x.Value));
await handler.SendRequestAsync();
Thread.Sleep(30000);
}