c# - 每个符号一次只有一个异步任务操作
问题描述
SubscribeToKlineUpdatesAsync
是一个网络套接字流事件处理程序,每秒从Binance接收数据。我在里面进行了 10 秒的长时间运行操作。它被分成一个任务,因为我不希望它阻止任何传入的价格数据。
问题是它一次产生数千个任务,我希望它一次只产生一个任务每个符号。该符号由 标识data.Symbol
。在代码下方,您会找到日志。第二个日志是我想要的样子。
我想使用 TPL Dataflow 或 Rx.NET 之类的东西。我只是不知道该怎么做。我知道的lock
方式,BlockingCollection
和SemaphoreSlim
。
下面有一个关于什么是当前的和我想要完成的日志。
代码
using System;
using System.Threading;
using System.Threading.Tasks;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;
namespace SubscribeToCandlesEventFixTest
{
public class BinanceSocketHandler
{
private readonly IBinanceClient _client;
private readonly IBinanceSocketClient _socketClient;
public BinanceSocketHandler()
{
_client = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
}
public Task StartAsync(CancellationToken cancellationToken)
{
var symbols = new[] { "TRXUSDT", "BTCUSDT" };
var interval = KlineInterval.OneMinute;
return _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbols, interval,
data =>
{
if (data.Data.Final)
{
Console.WriteLine($"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
}
else
{
Console.WriteLine($"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
Task.Run(async () =>
{
Console.WriteLine("Operation taking 10 seconds to execute...");
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
}, cancellationToken);
}
});
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new BinanceSocketHandler();
await test.StartAsync(new CancellationToken()).ConfigureAwait(false);
Console.ReadLine();
}
}
}
日志
它产生了数千个任务。每个符号我只想要一个任务。
[3/7/2021 12:24:53 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:53 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49240.17000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:55 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49245.66000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:57 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49251.02000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:59 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49254.44000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:00 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:01 AM] [BTCUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 49257.99000000
[3/7/2021 12:25:01 AM] [TRXUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:03 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49264.26000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:04 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:05 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49266.60000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:07 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49251.02000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:07 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05050000
Operation taking 10 seconds to execute...
我想要的示例日志
[3/7/2021 12:24:53 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:53 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49240.17000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:55 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49245.66000000
[3/7/2021 12:24:57 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49251.02000000
[3/7/2021 12:24:59 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49254.44000000
[3/7/2021 12:25:00 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:01 AM] [BTCUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 49257.99000000
[3/7/2021 12:25:01 AM] [TRXUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:03 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49264.26000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:04 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:05 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49266.60000000
[3/7/2021 12:25:07 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49251.02000000
[3/7/2021 12:25:07 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05050000
解决方案
推荐阅读
- python - 在 python3.9 类型模块中,NewType 函数允许使用新数据类型创建派生
- ruby-on-rails - 如何解决来自 nylas 的“无法在只读日历上创建事件”的错误?
- sql - 使用 SQL 查询每天自动化元数据库中的仪表板
- python-3.x - 获取在 top.gg 上支持服务器的用户
- python - 有没有办法在不使用双 for 循环的情况下迭代 Python 中的嵌套字典?
- graphics - 通过 R 中边上的概率生成无向图
- sql - 如何在sql语句中同时使用like和in?
- python - 您可以根据特征值在另一个数据框中出现的次数来复制 pandas 数据框中的行吗?
- ffmpeg - 如何将整个 flac 文件目录转换为 alac 并使用 ffmpeg 保存元数据?
- date - MS Access 和系统时间设置关系