Search code examples
c#system.reactive

Observable.Interval executes a method twice at the same time


ExecuteSellAsync method is being called twice at the same time and as you can see on the logs below. It works fine when I put 15 seconds on Observable.Interval(TimeSpan.FromSeconds(15)). How can I prevent that situation? Maybe something like locking or I don't know.

2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: New | Price: 0.06783960 | Last filled price: 0.00000000 | Stop price: 0.00000000 | Quantity: 0.00000000 | Quote quantity: 0.00000000 | Commission: 0 
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: Filled | Price: 0.06783960 | Last filled price: 0.06784260 | Stop price: 0.00000000 | Quantity: 5420.00000000 | Quote quantity: 367.70689200 | Commission: 0.00201210 BNB
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Sell order was filled | Close date: 2021/02/12 17:04:09 | Close rate (price): 0.06784260
2021-02-12 19:04:13 [9] INFO  Wallets Wallets synced.
2021-02-12 19:04:14 [10] DEBUG LiveTradeManager Timer triggered. Price: 0.06783910 | Timestamp: 2/12/2021 5:03:00 PM | Close: 0.06790680
2021-02-12 19:04:17 [9] DEBUG BinanceSpotClient Limit sell order has failed | Error code: -2010 | Error message: Account has insufficient balance for requested action. | Pair: DOGEUSDT | Quantity: 0.00000000 | Price: 0.06782540

_throttlerObservable = Observable.Interval(TimeSpan.FromSeconds(5))
   .SelectMany(_ => Observable.FromAsync(async () =>
   {
       var lastCandle = _candles.Last();

       _logger.Debug($"Timer triggered. Price: {_ticker.LastPrice} | Open time: {lastCandle.Timestamp} | Close: {lastCandle.Close}");

       if (_orderSide == OrderSide.Sell)
       {
           var trade = _trades.FirstOrDefault(e => e.Pair.Equals(_tradeOptions.Pair) && e.IsOpen);

           if (trade.NotNull())
           {
               var shouldSell = _tradingStrategy.ShouldSell(trade, _ticker.LastPrice, _tradeAdvice);

               if (shouldSell.SellFlag)
               {
                   await ExecuteSellAsync(trade, lastCandle.Timestamp, shouldSell.SellType).ConfigureAwait(false);
               }
           }
       }
   }))
   .Subscribe();

Edit

I see what the problem is. _tradingStrategy.ShouldSell takes a few seconds to execute and the next execution starts executing the next check in the same time. Can I use lock into that logic?

That's what solves it but I need to lock the entire check:

bool test = false;
public (bool SellFlag, SellType SellType) ShouldSell(Trade trade, decimal rate, TradeAdvice tradeAdvice, decimal? low = null, decimal? high = null)
{
    if (!test)
    {
        test = true;

        // my logic is here. It takes a few seconds.

        test = false;
    }

    return (false, SellType.None);
}

Edit2

A testable code. Observable.Interval is executed on each second and ShouldSellAsync's logic takes 5 seconds to execute. Once _completed becomes true, the message is no longer printed. It executes the message 5 times, when I expect it only once.

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace RxNETDispose
{
    class Program
    {
        private static bool _completed = false;

        public static async Task ShouldSellAsync()
        {
            if (!_completed)
            {
                await Task.Delay(5000);

                Console.WriteLine($"{DateTime.UtcNow} - ShouldSell called");

                _completed = true;
            }
        }

        static void Main(string[] args)
        {
            Observable.Interval(TimeSpan.FromSeconds(1))
                .SelectMany(_ => Observable.FromAsync(async () =>
                {
                    await ShouldSellAsync().ConfigureAwait(false);
                }))
            .Subscribe();

            Console.ReadLine();
        }
    }
}

Solution

  • SelectMany does indeed introduce concurrency. We want to control that concurrency so the answer here is to roll your own operator to be able to ensure that there's a fixed gap between the calls to ExecuteSellAsync.

    Thankfully there's a beautiful, but non-obvious, way to do this with the Rx schedulers.

    The method we're looking for is this:

    public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
    

    To use this call be need to define the Func<IScheduler, CancellationToken, Task<IDisposable>> action to be recursive so as to call itself to reschedule once the call to ExecuteSellAsync is complete.

    So, to do this every 2.0 seconds, for example, we do this:

    Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
    handler = async (s, ct) =>
    {
        await ExecuteSellAsync();
        return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
    };
    

    We can kick it off by calling this:

    IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
    

    Of course, like all good Rx operations we can call subscription.Dispose() to stop it running.

    Here's a complete example:

    async Task Main()
    {
        Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
        handler = async (s, ct) =>
        {
            await ExecuteSellAsync();
            return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
        };
        
        IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
    
        await Task.Delay(TimeSpan.FromSeconds(9.0));
        
        subscription.Dispose();
     }
    
    private DateTime then = DateTime.Now;
    private int __counter = 0;
    
    async Task ExecuteSellAsync()
    {
        var counter = Interlocked.Increment(ref __counter);
        Console.WriteLine($"ExecuteSellAsync() Start {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
        await Task.Delay(TimeSpan.FromSeconds(2.0));
        Console.WriteLine($"ExecuteSellAsync() End {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
    }
    

    When I run this I get this output:

    ExecuteSellAsync() Start 1 - 0.0019952
    ExecuteSellAsync() End 1 - 2.0095866
    ExecuteSellAsync() Start 2 - 4.0185182
    ExecuteSellAsync() End 2 - 6.0199157
    ExecuteSellAsync() Start 3 - 8.0303588
    ExecuteSellAsync() End 3 - 10.0417079
    

    Note that ExecuteSellAsync() does not cooperatively cancel so it runs to completion. It's not hard to change it to async Task ExecuteSellAsync(CancellationToken ct) and allow it to cancel cooperatively.

    Now, this can be extended to make it into a nice observable.

    Try this:

    IObservable<Unit> query =
        Observable.Create<Unit>(o =>
        {
            Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
            handler = async (s, ct) =>
            {
                if (ct.IsCancellationRequested)
                {
                    o.OnCompleted();
                }
                else
                {
                    await ExecuteSellAsync();
                    o.OnNext(Unit.Default);
                }
                return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
            };
        
            return Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
        });
    
    IDisposable subscription = query.Take(3).Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
    
    await Task.Delay(TimeSpan.FromSeconds(11.0));
    
    subscription.Dispose();
    

    This has the following output:

    ExecuteSellAsync() Start 1 - 0.0009972
    ExecuteSellAsync() End 1 - 2.0115375
    U
    ExecuteSellAsync() Start 2 - 4.0128375
    ExecuteSellAsync() End 2 - 6.0282818
    U
    ExecuteSellAsync() Start 3 - 8.0370135
    ExecuteSellAsync() End 3 - 10.0521106
    U
    C
    

    Note it completes. If you call subscription.Dispose(); before it naturally completes then it behaves properly and doesn't issue the OnComplete notification.

    Let's wrap this in a nice set of extension methods:

    public static class ObservableEx
    {
        public static IObservable<Unit> IntervalAsync(TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
            TimerAsync(period, period, actionAsync, scheduler);
    
        public static IObservable<T> IntervalAsync<T>(TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
            TimerAsync(period, period, functionAsync, scheduler);
    
        public static IObservable<Unit> TimerAsync(TimeSpan dueTime, TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
            Observable.Create<Unit>(o =>
            {
                Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
                handler = async (s, ct) =>
                {
                    if (ct.IsCancellationRequested)
                    {
                        o.OnCompleted();
                    }
                    else
                    {
                        await actionAsync();
                        o.OnNext(Unit.Default);
                    }
                    return s.ScheduleAsync(period, handler);
                };
                return scheduler.ScheduleAsync(dueTime, handler);
            });
    
        public static IObservable<T> TimerAsync<T>(TimeSpan dueTime, TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
            Observable.Create<T>(o =>
            {
                Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
                handler = async (s, ct) =>
                {
                    if (ct.IsCancellationRequested)
                    {
                        o.OnCompleted();
                    }
                    else
                    {
                        o.OnNext(await functionAsync());
                    }
                    return s.ScheduleAsync(period, handler);
                };
                return scheduler.ScheduleAsync(dueTime, handler);
            });
    }
    

    Now, clearly there are a bunch of overloads that I didn't write - ones that use a default scheduler and ones that allow for cooperative cancellation - but I hope you get the idea.

    Now with these extension methods I can do this:

    IDisposable subscription =
        ObservableEx
            .IntervalAsync(TimeSpan.FromSeconds(2.0), () => ExecuteSellAsync(), Scheduler.Default)
            .Take(3)
            .Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
    

    I get the same output as before.

    I haven't fully tested the extension methods. They might require a little more love and attention.