Search code examples
c#.netreactive-programmingsystem.reactiverx.net

How to wait subscriber to complete before onNext?


I am very new to reactive programming and I stuck at one point.

I try to implement if my collection has reaches a certain number element or certain time has paseed i need to trigger some method.

Purpose of use: I will process the events coming from the database, but I need to mark where I left off after every 200 events OR save last processed event every 10 seconds because maybe there won't be an event to reach 200 for a long time. But OnNext do not wait finish saveCheckPoint method and continue acepting processed item. I need to wait in before OnNext to sake of data consistency.

To simulate my design you can check this code, how can i achieve my requirement?

Subject<string> events = new Subject<string>();

events
    .Buffer(TimeSpan.FromSeconds(10), 200)
    .SelectMany(async x => await SaveCheckpoint(x)).Subscribe();


async Task<bool> SaveCheckpoint(IList<string> i)
{
    var lastProcessedEvent = i.LastOrDefault();

    if (lastProcessedEvent != null)
    {
        //Save checkpoint to db or somewhere else
        await Task.Delay(1000);
        Console.WriteLine($"Saved checkpoint of : {lastProcessedEvent}");
    }
   
    return await Task.FromResult(true);
}


for (int j = 1; j < 10000; j++)
{
    Console.WriteLine("Event processed");
    // I need this step must be wait until subscriber to comlete task
    events.OnNext($"Event - {j}");
}

I am expecting to find a way to wait complete of Subscribe method before onNext


Solution

  • As for me you need to do it without async/await execution, like this:

    static async Task Main()
    {
        void TimestampedPrint(object o) => Console.WriteLine($"{DateTime.Now:HH.mm.ss.fff}: {o}");
        Subject<string> events = new Subject<string>();
    
        using var disposable = events
            .Buffer(TimeSpan.FromSeconds(1), 20)
            .Subscribe(x => SaveCheckpoint(x).GetAwaiter().GetResult());
    
    
        async Task SaveCheckpoint(IList<string> i)
        {
            var lastProcessedEvent = i.LastOrDefault();
    
            if (lastProcessedEvent != null)
            {
                //Save checkpoint to db or somewhere else
                await Task.Delay(100);
                TimestampedPrint($"Saved checkpoint of : {lastProcessedEvent}");
            }
        }
    
    
        for (int j = 1; j < 100; j++)
        {
            TimestampedPrint($"Event processed {j}");
            // This do not wait subscriber to finish batch
            events.OnNext($"Event - {j}");
        }
    
        TimestampedPrint("End");
    
        Console.ReadLine();
    
    }
    

    This produces the following output:

    12.54.50.657: Event processed 1
    12.54.50.693: Event processed 2
    12.54.50.693: Event processed 3
    12.54.50.693: Event processed 4
    12.54.50.693: Event processed 5
    12.54.50.693: Event processed 6
    12.54.50.693: Event processed 7
    12.54.50.693: Event processed 8
    12.54.50.693: Event processed 9
    12.54.50.693: Event processed 10
    12.54.50.693: Event processed 11
    12.54.50.693: Event processed 12
    12.54.50.693: Event processed 13
    12.54.50.693: Event processed 14
    12.54.50.693: Event processed 15
    12.54.50.693: Event processed 16
    12.54.50.693: Event processed 17
    12.54.50.693: Event processed 18
    12.54.50.693: Event processed 19
    12.54.50.693: Event processed 20
    12.54.50.807: Saved checkpoint of : Event - 20
    12.54.50.808: Event processed 21
    12.54.50.808: Event processed 22
    12.54.50.808: Event processed 23
    12.54.50.808: Event processed 24
    12.54.50.808: Event processed 25
    12.54.50.808: Event processed 26
    12.54.50.808: Event processed 27
    12.54.50.808: Event processed 28
    12.54.50.808: Event processed 29
    12.54.50.808: Event processed 30
    12.54.50.808: Event processed 31
    12.54.50.808: Event processed 32
    12.54.50.808: Event processed 33
    12.54.50.808: Event processed 34
    12.54.50.808: Event processed 35
    12.54.50.808: Event processed 36
    12.54.50.808: Event processed 37
    12.54.50.808: Event processed 38
    12.54.50.808: Event processed 39
    12.54.50.808: Event processed 40
    12.54.50.909: Saved checkpoint of : Event - 40
    12.54.50.909: Event processed 41
    12.54.50.909: Event processed 42
    12.54.50.909: Event processed 43
    12.54.50.909: Event processed 44
    12.54.50.909: Event processed 45
    12.54.50.909: Event processed 46
    12.54.50.909: Event processed 47
    12.54.50.909: Event processed 48
    12.54.50.909: Event processed 49
    12.54.50.909: Event processed 50
    12.54.50.909: Event processed 51
    12.54.50.909: Event processed 52
    12.54.50.909: Event processed 53
    12.54.50.909: Event processed 54
    12.54.50.909: Event processed 55
    12.54.50.909: Event processed 56
    12.54.50.909: Event processed 57
    12.54.50.909: Event processed 58
    12.54.50.909: Event processed 59
    12.54.50.909: Event processed 60
    12.54.51.007: Saved checkpoint of : Event - 60
    12.54.51.007: Event processed 61
    12.54.51.007: Event processed 62
    12.54.51.007: Event processed 63
    12.54.51.007: Event processed 64
    12.54.51.007: Event processed 65
    12.54.51.007: Event processed 66
    12.54.51.007: Event processed 67
    12.54.51.007: Event processed 68
    12.54.51.007: Event processed 69
    12.54.51.007: Event processed 70
    12.54.51.007: Event processed 71
    12.54.51.007: Event processed 72
    12.54.51.007: Event processed 73
    12.54.51.007: Event processed 74
    12.54.51.007: Event processed 75
    12.54.51.007: Event processed 76
    12.54.51.008: Event processed 77
    12.54.51.008: Event processed 78
    12.54.51.008: Event processed 79
    12.54.51.008: Event processed 80
    12.54.51.108: Saved checkpoint of : Event - 80
    12.54.51.108: Event processed 81
    12.54.51.108: Event processed 82
    12.54.51.108: Event processed 83
    12.54.51.108: Event processed 84
    12.54.51.108: Event processed 85
    12.54.51.108: Event processed 86
    12.54.51.108: Event processed 87
    12.54.51.108: Event processed 88
    12.54.51.108: Event processed 89
    12.54.51.108: Event processed 90
    12.54.51.108: Event processed 91
    12.54.51.108: Event processed 92
    12.54.51.108: Event processed 93
    12.54.51.108: Event processed 94
    12.54.51.108: Event processed 95
    12.54.51.108: Event processed 96
    12.54.51.108: Event processed 97
    12.54.51.108: Event processed 98
    12.54.51.108: Event processed 99
    12.54.51.108: End
    12.54.52.212: Saved checkpoint of : Event - 99