I'm trying to use Rx in my Kafka consumer.
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
then I have the following code
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
then somewhere I use it like
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
Are the any possibility to run separated thread by topic name (consumeResult.Topic
)? When consumer receive a message, it should redirect it to the corresponding thread by topic
Give this a go:
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.Take(20)
.GroupBy(x => x % 3)
.SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
.Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
This ensures that a thread is created in the new EventLoopScheduler()
scheduler for each inner observable created by the GroupBy
operator. The SelectMany
flattens out the group, but keeps the EventLoopScheduler
associated for each group.
In your case you GroupBy
the consumeResult.Topic
property.
Do make sure that your source observable ends as the threads live forever until they do. Calling Dispose()
on the subscription is sufficient to end the observable.