I'm using TPL Dataflow in order to perform one task per symbol at a time. The first two Operation taking...
messages are correct but the next ones are using old data. In other words, it uses the old data marked with green on the screenshot below, instead of the newest data (the blue marked one).
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;
namespace SubscribeToCandlesEventFixTest
{
public class BinanceSocketHandler
{
private readonly IBinanceClient _client;
private readonly IBinanceSocketClient _socketClient;
public BinanceSocketHandler()
{
_client = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
}
private Dictionary<string, ActionBlock<IBinanceStreamKlineData>> _ab = new();
public async Task StartAsync(CancellationToken cancellationToken)
{
var symbols = new[] { "TRXUSDT", "BTCUSDT" };
var interval = KlineInterval.OneMinute;
foreach (var symbol in symbols)
{
_ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
async data =>
{
Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: {data.Symbol} | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
await _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbol, interval,
async data =>
{
if (data.Data.Final)
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
}
else
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
// TODO: Use the most up-to-date value
await _ab[symbol].SendAsync(data, cancellationToken).ConfigureAwait(false);
}
}).ConfigureAwait(false);
}
}
public async Task StopAsync()
{
foreach (var symbol in _ab.Keys)
{
_ab[symbol].Complete();
await _ab[symbol].Completion.ConfigureAwait(false);
}
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new BinanceSocketHandler();
await test.StartAsync(new CancellationToken()).ConfigureAwait(false);
Console.ReadLine();
}
}
}
TPL Dataflow will process all items in order; that's what it's made to do. You can try to do a most-recent kind of approach by using a BroadcastBlock
, but since that block is linked to another block, you'll probably end up with one in process, one waiting to be processed, and the third one being the one actually getting overwritten.
If you want it tighter than that (i.e., one in process and one waiting that is also overwritten), then I'd recommend Channels. Specifically, a bounded channel using BoundedChannelFullMode.DropOldest
.