I'm working on a project using Masstransit with Sagas and Kafka and just stumble upon an issue. I'm trying to use the EventActivityBinder.Produce
to produce messages to some Kafka topic. The problem is that the defined producer needs a message key to be specified and I couldn't figure out a way to send the message key using this EventActivityBinder.Produce
method so it raises an exception saying that the producer is not registered.
So, my question is: is it possible to send a message key using this produce
method mentioned above?
Here is the method that I`ve implemented:
public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
var @event = binder.Produce(context =>
{
context.Saga.UpdatedAt = DateTime.Now;
var @event = new
{
Success = false,
context.Saga.Reason,
__Header_Reason = context.Saga.Reason,
};
return context.Init<ResponseWrapper<OrderResponseEvent>>(@event);
});
return @event;
}
and there's my producer registration
rider.AddProducer<string, ResponseWrapper<OrderResponseEvent>>(kafkaTopics.SourceSystemTopic);
I ended up using this BehaviorContext.GetServiceOrCreateInstance
method, passing the registration signature of my producer. Apparently, it worked but I'm absolutely not confident that that is the best approach. I do believe that, ideally, the EventActivityBinder.Produce
method should have an option in which we could pass the key, if necessary! But anyways, here goes the code.
public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
Func<BehaviorContext<OrderRequestSagaInstance>, Task> asyncAction = async context =>
{
await context.GetServiceOrCreateInstance<ITopicProducer<string, ErrorMessageEvent>()
.Produce(
Guid.NewGuid().ToString(),
context.Saga.NotificationReply!,
context.CancellationToken)
.ConfigureAwait(false);
};
return binder.Add(new AsyncActivity<OrderRequestSagaInstance>(asyncAction));
}