Search code examples
c#.netsystem.reactivereactivex

Publish/Consume: Wait on Subscribe, filter messages and Dispose


Using Rx.Net 3

With use of the Quartz.Net Scheduler I build a workflow manager to chain jobs (using Quartz Joblistener on finished jobs) with an embedded web server. The application instantiates an instance of a Subject (singleton).

A web service takes data and starts a workflow, injecting a unique ID. This unique ID is propagated through the workflow. The Joblistener delegate detects the end of a specific job and calls OnNext on the injected Subject instance with a Type holding the unique ID and an DB table ID.

The idea was that the web service on each call subscribes to the Subject and waits for incoming messages/events and filters them on the unique ID. When found disposes the subscription, collects and returns the generated data to the caller.

How can I make my Subscribe() wait for incoming messages, filter them and Dispose(), without finishing the web service prematurely.


Solution

  • // model
    public class AsyncCommunicationObject
    {
        public string Key { get; }
        public string Value { get; }
    
        public AsyncCommunicationObject(string key, string value)
        {
            Key = key;
            Value = value;
        }
    }
    
    // injectable singleton 
    public static Subject<AsyncCommunicationObject> AsyncCommunication { get; set; } = new Subject<AsyncCommunicationObject>();
    
    // in web service           
    System.Threading.EventWaitHandle waitHandle = new System.Threading.AutoResetEvent(false);
    string yourID = some ID
    
    var subscription = _asyncCommunication (injected)
        .Where(x => x.Key == yourID)
        .Take(1)
        .Subscribe(
            x =>
            {
                dbId = x.Value;
                waitHandle.Set();
            }
        );
    
    _schedulerCore.ExecuteJob(upload.JobId, jobDataMap);
    
    waitHandle.WaitOne();
    waitHandle.Reset();
    
    subscription.Dispose();
    
    // in job listener
    _asyncCommunication.OnNext(new AsyncCommunicationObject(your ID, some value)