I am very new to reactive programming and I stuck at one point.
I try to implement if my collection has reaches a certain number element or certain time has paseed i need to trigger some method.
Purpose of use: I will process the events coming from the database, but I need to mark where I left off after every 200 events OR save last processed event every 10 seconds because maybe there won't be an event to reach 200 for a long time. But OnNext do not wait finish saveCheckPoint method and continue acepting processed item. I need to wait in before OnNext to sake of data consistency.
To simulate my design you can check this code, how can i achieve my requirement?
Subject<string> events = new Subject<string>();
events
.Buffer(TimeSpan.FromSeconds(10), 200)
.SelectMany(async x => await SaveCheckpoint(x)).Subscribe();
async Task<bool> SaveCheckpoint(IList<string> i)
{
var lastProcessedEvent = i.LastOrDefault();
if (lastProcessedEvent != null)
{
//Save checkpoint to db or somewhere else
await Task.Delay(1000);
Console.WriteLine($"Saved checkpoint of : {lastProcessedEvent}");
}
return await Task.FromResult(true);
}
for (int j = 1; j < 10000; j++)
{
Console.WriteLine("Event processed");
// I need this step must be wait until subscriber to comlete task
events.OnNext($"Event - {j}");
}
I am expecting to find a way to wait complete of Subscribe method before onNext
As for me you need to do it without async/await execution, like this:
static async Task Main()
{
void TimestampedPrint(object o) => Console.WriteLine($"{DateTime.Now:HH.mm.ss.fff}: {o}");
Subject<string> events = new Subject<string>();
using var disposable = events
.Buffer(TimeSpan.FromSeconds(1), 20)
.Subscribe(x => SaveCheckpoint(x).GetAwaiter().GetResult());
async Task SaveCheckpoint(IList<string> i)
{
var lastProcessedEvent = i.LastOrDefault();
if (lastProcessedEvent != null)
{
//Save checkpoint to db or somewhere else
await Task.Delay(100);
TimestampedPrint($"Saved checkpoint of : {lastProcessedEvent}");
}
}
for (int j = 1; j < 100; j++)
{
TimestampedPrint($"Event processed {j}");
// This do not wait subscriber to finish batch
events.OnNext($"Event - {j}");
}
TimestampedPrint("End");
Console.ReadLine();
}
This produces the following output:
12.54.50.657: Event processed 1
12.54.50.693: Event processed 2
12.54.50.693: Event processed 3
12.54.50.693: Event processed 4
12.54.50.693: Event processed 5
12.54.50.693: Event processed 6
12.54.50.693: Event processed 7
12.54.50.693: Event processed 8
12.54.50.693: Event processed 9
12.54.50.693: Event processed 10
12.54.50.693: Event processed 11
12.54.50.693: Event processed 12
12.54.50.693: Event processed 13
12.54.50.693: Event processed 14
12.54.50.693: Event processed 15
12.54.50.693: Event processed 16
12.54.50.693: Event processed 17
12.54.50.693: Event processed 18
12.54.50.693: Event processed 19
12.54.50.693: Event processed 20
12.54.50.807: Saved checkpoint of : Event - 20
12.54.50.808: Event processed 21
12.54.50.808: Event processed 22
12.54.50.808: Event processed 23
12.54.50.808: Event processed 24
12.54.50.808: Event processed 25
12.54.50.808: Event processed 26
12.54.50.808: Event processed 27
12.54.50.808: Event processed 28
12.54.50.808: Event processed 29
12.54.50.808: Event processed 30
12.54.50.808: Event processed 31
12.54.50.808: Event processed 32
12.54.50.808: Event processed 33
12.54.50.808: Event processed 34
12.54.50.808: Event processed 35
12.54.50.808: Event processed 36
12.54.50.808: Event processed 37
12.54.50.808: Event processed 38
12.54.50.808: Event processed 39
12.54.50.808: Event processed 40
12.54.50.909: Saved checkpoint of : Event - 40
12.54.50.909: Event processed 41
12.54.50.909: Event processed 42
12.54.50.909: Event processed 43
12.54.50.909: Event processed 44
12.54.50.909: Event processed 45
12.54.50.909: Event processed 46
12.54.50.909: Event processed 47
12.54.50.909: Event processed 48
12.54.50.909: Event processed 49
12.54.50.909: Event processed 50
12.54.50.909: Event processed 51
12.54.50.909: Event processed 52
12.54.50.909: Event processed 53
12.54.50.909: Event processed 54
12.54.50.909: Event processed 55
12.54.50.909: Event processed 56
12.54.50.909: Event processed 57
12.54.50.909: Event processed 58
12.54.50.909: Event processed 59
12.54.50.909: Event processed 60
12.54.51.007: Saved checkpoint of : Event - 60
12.54.51.007: Event processed 61
12.54.51.007: Event processed 62
12.54.51.007: Event processed 63
12.54.51.007: Event processed 64
12.54.51.007: Event processed 65
12.54.51.007: Event processed 66
12.54.51.007: Event processed 67
12.54.51.007: Event processed 68
12.54.51.007: Event processed 69
12.54.51.007: Event processed 70
12.54.51.007: Event processed 71
12.54.51.007: Event processed 72
12.54.51.007: Event processed 73
12.54.51.007: Event processed 74
12.54.51.007: Event processed 75
12.54.51.007: Event processed 76
12.54.51.008: Event processed 77
12.54.51.008: Event processed 78
12.54.51.008: Event processed 79
12.54.51.008: Event processed 80
12.54.51.108: Saved checkpoint of : Event - 80
12.54.51.108: Event processed 81
12.54.51.108: Event processed 82
12.54.51.108: Event processed 83
12.54.51.108: Event processed 84
12.54.51.108: Event processed 85
12.54.51.108: Event processed 86
12.54.51.108: Event processed 87
12.54.51.108: Event processed 88
12.54.51.108: Event processed 89
12.54.51.108: Event processed 90
12.54.51.108: Event processed 91
12.54.51.108: Event processed 92
12.54.51.108: Event processed 93
12.54.51.108: Event processed 94
12.54.51.108: Event processed 95
12.54.51.108: Event processed 96
12.54.51.108: Event processed 97
12.54.51.108: Event processed 98
12.54.51.108: Event processed 99
12.54.51.108: End
12.54.52.212: Saved checkpoint of : Event - 99