c# - 使用 Nethereum 订阅合约事件
问题描述
我需要订阅 Uniswap 对合约同步事件并获得对储备金。所以在这里我尝试做的是:
[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
[Parameter("uint112", "reserve0")]
public virtual BigInteger Reserve0 { get; set; }
[Parameter("uint112", "reserve1", 2)]
public virtual BigInteger Reserve1 { get; set; }
}
public async Task Start()
{
readonly string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
string uniSwapFactoryAbi = GetAbi(Resources.IUniswapV2Factory);
string uniSwapPairAbi = GetAbi(Resources.IUniswapV2Pair);
var web3 = new Web3("https://mainnet.infura.io/v3/fff");
Contract uniSwapFactoryContract = web3.Eth.GetContract(uniSwapFactoryAbi, uniSwapFactoryAddress);
Function uniSwapGetPairFunction = uniSwapFactoryContract.GetFunction("getPair");
string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
string pairContractAddress = await uniSwapGetPairFunction.CallAsync<string>(wethAddress, daiAddress);
Contract pairContract = web3.Eth.GetContract(uniSwapPairAbi, pairContractAddress);
Event pairSyncEvent = pairContract.GetEvent("Sync");
NewFilterInput pairSyncFilter = pairSyncEvent.EventABI.CreateFilterInput();
using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/fff"))
{
var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().
Subscribe(log =>
{
try
{
EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
if (decoded != null)
{
decimal reserve0 = Web3.Convert.FromWei(decoded.Event.Reserve0);
decimal reserve1 = Web3.Convert.FromWei(decoded.Event.Reserve1);
Console.WriteLine($@"Price={reserve0 / reserve1}");
}
else Console.WriteLine(@"Found not standard transfer log");
}
catch (Exception ex)
{
Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
}
});
await client.StartAsync();
await subscription.SubscribeAsync(pairSyncFilter);
}
}
string GetAbi(byte[] storedContractJson)
{
string json = Encoding.UTF8.GetString(storedContractJson);
JObject contractObject = JObject.Parse(json);
if (!contractObject.TryGetValue("abi", out JToken abiJson)) throw new KeyNotFoundException("abi object was not found in stored contract json");
return abiJson.ToString();
}
它似乎订阅了,但从未进入Subscribe
lambda。此外,如果我尝试await subscription.SubscribeAsync();
不使用任何过滤器,它也不会输入Subscribe
lambda。但是在执行后SubscribeAsync
,进程会显着加载 CPU。
- 我究竟做错了什么?为什么不
Subscribe
调用 lambda? - 为什么会加载 CPU?
解决方案
我没有看到您的代码存在重大问题,但是由于我没有 abis,因此这是一个没有它们的示例。“同步”事件不会一直触发,所以这可能是问题所在。
using Nethereum.ABI.FunctionEncoding.Attributes;
using Nethereum.Contracts;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Text;
using System.Threading.Tasks;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Web3;
using Newtonsoft.Json.Linq;
namespace Nethereum.WSLogStreamingUniswapSample
{
class Program
{
[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
[Parameter("uint112", "reserve0")]
public virtual BigInteger Reserve0 { get; set; }
[Parameter("uint112", "reserve1", 2)]
public virtual BigInteger Reserve1 { get; set; }
}
public partial class GetPairFunction : GetPairFunctionBase { }
[Function("getPair", "address")]
public class GetPairFunctionBase : FunctionMessage
{
[Parameter("address", "tokenA", 1)]
public virtual string TokenA { get; set; }
[Parameter("address", "tokenB", 2)]
public virtual string TokenB { get; set; }
}
public static async Task Main()
{
string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
var web3 = new Web3.Web3("https://mainnet.infura.io/v3/7238211010344719ad14a89db874158c");
string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
var pairContractAddress = await web3.Eth.GetContractQueryHandler<GetPairFunction>()
.QueryAsync<string>(uniSwapFactoryAddress,
new GetPairFunction() {TokenA = daiAddress, TokenB = wethAddress});
var filter = web3.Eth.GetEvent<PairSyncEventDTO>(pairContractAddress).CreateFilterInput();
using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/7238211010344719ad14a89db874158c"))
{
var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().
Subscribe(log =>
{
try
{
EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
if (decoded != null)
{
decimal reserve0 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve0);
decimal reserve1 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve1);
Console.WriteLine($@"Price={reserve0 / reserve1}");
}
else Console.WriteLine(@"Found not standard transfer log");
}
catch (Exception ex)
{
Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
}
});
await client.StartAsync();
subscription.GetSubscribeResponseAsObservable().Subscribe(id => Console.WriteLine($"Subscribed with id: {id}"));
await subscription.SubscribeAsync(filter);
Console.ReadLine();
await subscription.UnsubscribeAsync();
}
}
}
为了让它在 infura 中保持活力,您可能需要每隔一段时间对其执行一次 ping 操作示例
while (true)
{
var handler = new EthBlockNumberObservableHandler(client);
handler.GetResponseAsObservable().Subscribe(x => Console.WriteLine(x.Value));
await handler.SendRequestAsync();
Thread.Sleep(30000);
}
推荐阅读
- postgresql - 为什么我可以在控制器而不是命令中使用实体管理器?
- ionic-framework - 离子搜索栏有取消图标而不是取消文本 ionic 3
- javascript - 笑话:测试类型或空
- r - 如何在 r 中进行列拆分和 dcast
- ruby-on-rails - 为什么 Active Record 发送的数据与 schema.rb 不同?
- redis - Jedis Benchmarking - Jedis 有多快
- javascript - 如何在 JavaScript 中使用 Base64String 值设置 Dynamics CRM/365 字段
- javascript - 为什么我无法写入我的 firebase 数据库?
- java - 运行我的第一个 java 应用程序时出现 InvalidModuleDescriptorException
- linux - 了解系统中可用连续内存量的方法