Search code examples
c#websocketethereumnethereum

Subscribe to contract events using Nethereum


I need to get subscribe to Uniswap pair contract Sync event and get pair reserves. So here what I tried to do:

[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
    [Parameter("uint112", "reserve0")]
    public virtual BigInteger Reserve0 { get; set; }

    [Parameter("uint112", "reserve1", 2)]
    public virtual BigInteger Reserve1 { get; set; }
}

public async Task Start()
{
    readonly string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
    
    string uniSwapFactoryAbi = GetAbi(Resources.IUniswapV2Factory);
    string uniSwapPairAbi    = GetAbi(Resources.IUniswapV2Pair);
    
    var      web3                   = new Web3("https://mainnet.infura.io/v3/fff");
    Contract uniSwapFactoryContract = web3.Eth.GetContract(uniSwapFactoryAbi, uniSwapFactoryAddress);
    Function uniSwapGetPairFunction = uniSwapFactoryContract.GetFunction("getPair");
    
    string daiAddress          = "0x6b175474e89094c44da98b954eedeac495271d0f";
    string wethAddress         = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
    string pairContractAddress = await uniSwapGetPairFunction.CallAsync<string>(wethAddress, daiAddress);
    
    Contract pairContract = web3.Eth.GetContract(uniSwapPairAbi, pairContractAddress);
    
    Event          pairSyncEvent  = pairContract.GetEvent("Sync");
    NewFilterInput pairSyncFilter = pairSyncEvent.EventABI.CreateFilterInput();
    
    using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/fff"))
    {
        var subscription = new EthLogsObservableSubscription(client);
        subscription.GetSubscriptionDataResponsesAsObservable().
                     Subscribe(log =>
                               {
                                   try
                                   {
                                       EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
                                       if (decoded != null)
                                       {
                                           decimal reserve0 = Web3.Convert.FromWei(decoded.Event.Reserve0);
                                           decimal reserve1 = Web3.Convert.FromWei(decoded.Event.Reserve1);
                                           Console.WriteLine($@"Price={reserve0 / reserve1}");
                                       }
                                       else Console.WriteLine(@"Found not standard transfer log");
                                   }
                                   catch (Exception ex)
                                   {
                                       Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
                                   }
                               });
    
        await client.StartAsync();
        await subscription.SubscribeAsync(pairSyncFilter);
    }
}

string GetAbi(byte[] storedContractJson)
{
    string  json           = Encoding.UTF8.GetString(storedContractJson);
    JObject contractObject = JObject.Parse(json);

    if (!contractObject.TryGetValue("abi", out JToken abiJson)) throw new KeyNotFoundException("abi object was not found in stored contract json");
    return abiJson.ToString();
}

And it seems to subscribe, but never enters Subscribe lambda. Also if I try to await subscription.SubscribeAsync(); without any filter, it also doesn't enter Subscribe lambda. But after executing SubscribeAsync CPU is significantly loaded by the process.

  1. What am I doing wrong? Why isn't Subscribe lambda called?
  2. Why does it load CPU?

Solution

  • I don't see a major issue with your code, but as I don't have the abis, this is an example without them. The "Sync" event does not fire all the time, so that might have been the issue.

    using Nethereum.ABI.FunctionEncoding.Attributes;
    using Nethereum.Contracts;
    using Nethereum.JsonRpc.WebSocketStreamingClient;
    using Nethereum.RPC.Reactive.Eth.Subscriptions;
    using System;
    using System.Collections.Generic;
    using System.Numerics;
    using System.Text;
    using System.Threading.Tasks;
    using Nethereum.RPC.Eth.DTOs;
    using Nethereum.RPC.Web3;
    using Newtonsoft.Json.Linq;
    
    namespace Nethereum.WSLogStreamingUniswapSample
    {
      class Program
      {
    
        [Event("Sync")]
        class PairSyncEventDTO : IEventDTO
        {
            [Parameter("uint112", "reserve0")]
            public virtual BigInteger Reserve0 { get; set; }
    
            [Parameter("uint112", "reserve1", 2)]
            public virtual BigInteger Reserve1 { get; set; }
        }
    
    
        public partial class GetPairFunction : GetPairFunctionBase { }
    
        [Function("getPair", "address")]
        public class GetPairFunctionBase : FunctionMessage
        {
            [Parameter("address", "tokenA", 1)]
            public virtual string TokenA { get; set; }
            [Parameter("address", "tokenB", 2)]
            public virtual string TokenB { get; set; }
        }
    
       
    
        public static async Task Main()
        {
    
    
            string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
    
            var web3 = new Web3.Web3("https://mainnet.infura.io/v3/7238211010344719ad14a89db874158c");
    
    
            string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
            string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
    
            var pairContractAddress = await web3.Eth.GetContractQueryHandler<GetPairFunction>()
                .QueryAsync<string>(uniSwapFactoryAddress,
                    new GetPairFunction() {TokenA = daiAddress, TokenB = wethAddress});
    
            var filter =  web3.Eth.GetEvent<PairSyncEventDTO>(pairContractAddress).CreateFilterInput();
    
            
            using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/7238211010344719ad14a89db874158c"))
            {
                var subscription = new EthLogsObservableSubscription(client);
                subscription.GetSubscriptionDataResponsesAsObservable().
                             Subscribe(log =>
                             {
                                 try
                                 {
                                     EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
                                     if (decoded != null)
                                     {
                                         decimal reserve0 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve0);
                                         decimal reserve1 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve1);
                                         Console.WriteLine($@"Price={reserve0 / reserve1}");
                                     }
                                     else Console.WriteLine(@"Found not standard transfer log");
                                 }
                                 catch (Exception ex)
                                 {
                                     Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
                                 }
                             });
    
                await client.StartAsync();
                subscription.GetSubscribeResponseAsObservable().Subscribe(id => Console.WriteLine($"Subscribed with id: {id}"));
                await subscription.SubscribeAsync(filter);
    
                Console.ReadLine();
    
                await subscription.UnsubscribeAsync();
            }
        }
    }
    

    To keep it alive in infura, you might want to ping it every so often Example

     while (true)
     {
          var handler = new EthBlockNumberObservableHandler(client);
          handler.GetResponseAsObservable().Subscribe(x => Console.WriteLine(x.Value));
          await handler.SendRequestAsync();
          Thread.Sleep(30000);
      }