Search code examples
c#asynchronousasync-awaitsystem.reactiverx.net

Run processing in separated thread by group


I'm trying to use Rx in my Kafka consumer.

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;

then I have the following code

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}

then somewhere I use it like

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});

Are the any possibility to run separated thread by topic name (consumeResult.Topic)? When consumer receive a message, it should redirect it to the corresponding thread by topic

enter image description here


Solution

  • Give this a go:

    Observable
        .Interval(TimeSpan.FromSeconds(0.1))
        .Take(20)
        .GroupBy(x => x % 3)
        .SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
        .Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
    

    This ensures that a thread is created in the new EventLoopScheduler() scheduler for each inner observable created by the GroupBy operator. The SelectMany flattens out the group, but keeps the EventLoopScheduler associated for each group.

    In your case you GroupBy the consumeResult.Topic property.

    Do make sure that your source observable ends as the threads live forever until they do. Calling Dispose() on the subscription is sufficient to end the observable.