Search code examples
.netasp.net-coresystem.reactiveasp.net-core-webapimediatr

Execute a method after some inactivity using Rx .Net


I have a Controller Action like this:

[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    await _mediator.Send(command);
}

It is done in .Net Core, and is using MediatR to process commands.

Now, the UpdateDataCommand has a integer StationId property that identifies the Station number. When a client application calls this method by doing a Post, it updates data in the database. What I want to do using Rx .Net is to somehow start a timer after the Await _mediator.Send(command). The timer will be set to 1 minute. After 1 minute, I want to call another method that will set the flag in the database but only for this StationId. If someone does a Post using the same StationId, the timer should reset itself.

In pseudo-code looks like this:

[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    int stationId = command.StationId;
    // let's assume stationId==2

    //saves data for stationId==2
    await _mediator.Send(command);

    //Start a timer of 1 min
    //if timer fires (meaning the 1 minute has passed) call Method2();
    //if client does another "Post" for stationId==2 in the meantime 
      (let's say that the client does another "Post" for stationId==2 after 20 sec)
      then reset the timer
}

How to do this using Reactive Extensions in.Net?

UPDATE (@Enigmativity): It still doesn't work,I put the timer to 10sec and if you look at the output times you'll see that I have made a Post on 09:17:49 (which started a timer of 10 sec), then I made a new Post at 09:17:55 (which has started another timer, but it should only have reset the old one) and bothe the timers kicked off, one 10 secs after the first call, and another 10 sec after the second call.: Application output


Solution

  • To start a timer using Rx.Net, we could invoke:

    var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
        .Subscribe(
            value =>{    /* ... */ }
        );
    

    To cancel this subscription, we just need dispose this subscription later:

    subscription.Dispose();
    

    The problem is how to persist the subscription. One approach is to create a SubscriptionManager service (singleton) , thus we can invoke such a service to schedule a task and then cancel it later within the controller action as below:

    // you controller class
    
        private readonly ILogger<HomeController> _logger;       // injected by DI
        private readonly SubscriptionManager _subscriptionMgr;  // injected by DI
    
    
        public async Task Post(...)
        {
            ...
            // saves data for #stationId
            // Start a timer of 1 min
            this._subscriptionMgr.ScheduleForStationId(stationId);    // schedule a task that for #stationId that will be executed in 60s
        }
    
    
        [HttpPost("/Command2")]
        public async Task Command2(...)
        {
            int stationId =  command.StationId;
            if( shouldCancel ){
                this._subscriptionMgr.CancelForStationId(stationId);  // cancel previous task for #stationId
            }
        }
    

    If you like to manage the subscriptions within memory, we can use a ConcurrentDictionary to store the subscirptions:

    public class SubscriptionManager : IDisposable
    {
        private ConcurrentDictionary<string,IDisposable> _dict;
        private readonly IServiceProvider _sp;
        private readonly ILogger<SubscriptionManager> _logger;
    
        public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
        {
            this._dict= new ConcurrentDictionary<string,IDisposable>();
            this._sp = sp;
            this._logger = logger;
        }
    
        public IDisposable ScheduleForStationId(int stationId)
        {
            var timeout = 60;
            this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
            var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
                .Subscribe(
                    value =>{  
                        // if you need update the db, create a new scope:
                        using(var scope = this._sp.CreateScope()){
                            var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
                            var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                                .FirstOrDefault();
                            station.Note = "updated";
                            dbContext.SaveChanges();
                        }
                        this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
                    },
                    e =>{
                        Console.WriteLine("Error!"+ e.Message);
                    }
                );
            this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
                sub.Dispose();       // dispose the old one
                return subscription;
            });
            return subscription;
        }
    
        public void CancelForStationId(int stationId)
        {
            IDisposable subscription = null;
            this._dict.TryGetValue(stationId.ToString(), out subscription);
            this._logger.LogWarning($"Task for station#{stationId} has been canceled");
            subscription?.Dispose();
    
            // ... if you want to update the db , create a new scope
            using(var scope = this._sp.CreateScope()){
                var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
                var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                    .FirstOrDefault();
                station.Note = "canceled";
                dbContext.SaveChanges();
                this._logger.LogWarning("The db has been changed");
            }
        }
    
        public void Dispose()
        {
            foreach(var entry in this._dict){
                entry.Value.Dispose();
            }
        }
    }
    

    Another Approach is to create a flat record to a task manager (like cron), but it won't use Rx.NET at all.