Search code examples
c#.netapache-kafkamasstransit

Send message key inside the EventActivityBinder.Produce method


I'm working on a project using Masstransit with Sagas and Kafka and just stumble upon an issue. I'm trying to use the EventActivityBinder.Produce to produce messages to some Kafka topic. The problem is that the defined producer needs a message key to be specified and I couldn't figure out a way to send the message key using this EventActivityBinder.Produce method so it raises an exception saying that the producer is not registered.

So, my question is: is it possible to send a message key using this produce method mentioned above?

Here is the method that I`ve implemented:

public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
    this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
    var @event = binder.Produce(context =>
    {
        context.Saga.UpdatedAt = DateTime.Now;

        var @event = new
        {
            Success = false,
            context.Saga.Reason,
            __Header_Reason = context.Saga.Reason,
        };

        return context.Init<ResponseWrapper<OrderResponseEvent>>(@event);
    });

    return @event;
}

and there's my producer registration

rider.AddProducer<string, ResponseWrapper<OrderResponseEvent>>(kafkaTopics.SourceSystemTopic);

Solution

  • I ended up using this BehaviorContext.GetServiceOrCreateInstance method, passing the registration signature of my producer. Apparently, it worked but I'm absolutely not confident that that is the best approach. I do believe that, ideally, the EventActivityBinder.Produce method should have an option in which we could pass the key, if necessary! But anyways, here goes the code.

    public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
        this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
    {
        Func<BehaviorContext<OrderRequestSagaInstance>, Task> asyncAction = async context =>
        {        
            await context.GetServiceOrCreateInstance<ITopicProducer<string, ErrorMessageEvent>()
                .Produce(
                    Guid.NewGuid().ToString(),
                    context.Saga.NotificationReply!,
                    context.CancellationToken)
                .ConfigureAwait(false);
        };
        
        return binder.Add(new AsyncActivity<OrderRequestSagaInstance>(asyncAction));
    }