首页 > 解决方案 > 每个符号一次只有一个异步任务操作

问题描述

SubscribeToKlineUpdatesAsync是一个网络套接字流事件处理程序,每秒从Binance接收数据。我在里面进行了 10 秒的长时间运行操作。它被分成一个任务,因为我不希望它阻止任何传入的价格数据。

问题是它一次产生数千个任务,我希望它一次只产生一个任务每个符号。该符号由 标识data.Symbol。在代码下方,您会找到日志。第二个日志是我想要的样子。

我想使用 TPL Dataflow 或 Rx.NET 之类的东西。我只是不知道该怎么做。我知道的lock方式,BlockingCollectionSemaphoreSlim

下面有一个关于什么是当前的和我想要完成的日志。

代码

using System;
using System.Threading;
using System.Threading.Tasks;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;

namespace SubscribeToCandlesEventFixTest
{
    public class BinanceSocketHandler
    {
        private readonly IBinanceClient _client;
        private readonly IBinanceSocketClient _socketClient;

        public BinanceSocketHandler()
        {
            _client = new BinanceClient(new BinanceClientOptions
            {
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoTimestamp = true,
                AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });

            _socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
            {
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoReconnect = true,
                ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });
        }
        
        public Task StartAsync(CancellationToken cancellationToken)
        {
            var symbols = new[] { "TRXUSDT", "BTCUSDT" };
            var interval = KlineInterval.OneMinute;

            return _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbols, interval,
                data =>
                {
                    if (data.Data.Final)
                    {
                        Console.WriteLine($"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
                    }
                    else
                    {
                        Console.WriteLine($"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");

                        Task.Run(async () =>
                        {
                            Console.WriteLine("Operation taking 10 seconds to execute...");

                            await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
                        }, cancellationToken);
                    }
                });
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var test = new BinanceSocketHandler();
            await test.StartAsync(new CancellationToken()).ConfigureAwait(false);

            Console.ReadLine();
        }
    }
}

日志

它产生了数千个任务。每个符号我只想要一个任务。

[3/7/2021 12:24:53 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:53 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49240.17000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:55 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49245.66000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:57 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49251.02000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:59 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49254.44000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:00 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:01 AM] [BTCUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 49257.99000000
[3/7/2021 12:25:01 AM] [TRXUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:03 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49264.26000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:04 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:05 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49266.60000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:07 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49251.02000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:07 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05050000
Operation taking 10 seconds to execute...

我想要的示例日志

[3/7/2021 12:24:53 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:53 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49240.17000000
Operation taking 10 seconds to execute...
[3/7/2021 12:24:55 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49245.66000000
[3/7/2021 12:24:57 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49251.02000000
[3/7/2021 12:24:59 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 49254.44000000
[3/7/2021 12:25:00 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:01 AM] [BTCUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 49257.99000000
[3/7/2021 12:25:01 AM] [TRXUSDT] New final candle | Timestamp: 3/7/2021 12:24:00 AM | Price: 0.05049000
[3/7/2021 12:25:03 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49264.26000000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:04 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05049000
Operation taking 10 seconds to execute...
[3/7/2021 12:25:05 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49266.60000000
[3/7/2021 12:25:07 AM] [BTCUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 49251.02000000
[3/7/2021 12:25:07 AM] [TRXUSDT] Candle update | Timestamp: 3/7/2021 12:25:00 AM | Price: 0.05050000

标签: c#system.reactive.net-5tpl-dataflowasp.net-core-5.0

解决方案


推荐阅读