Search code examples
c#apache-kafkamasstransitsaga

MassTransit. Kafka. How to produce message from saga state machine, during message processing


PublishAsync not work

Example program.cs:

namespace MassTransitKafka
{
    class Program
    {
        private static ServiceProvider _serviceProvider;

        static async Task Main(string[] args)
        {
            var services = new ServiceCollection();
            services.AddMassTransit(x =>
            {
                x.UsingInMemory((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });
                
                x.AddRider(rider =>
                {
                    rider.AddProducer<Enter1>(nameof(Enter1));
                    rider.AddProducer<Enter2>(nameof(Enter2));
                    rider.AddProducer<Enter3>(nameof(Enter3));
                    rider.AddProducer<EnterEnter>(nameof(EnterEnter));
                    rider.AddSagaStateMachine<TestSaga1StateMachine, TestSaga1State>(typeof(TestSaga1StateDefinition))
                        .InMemoryRepository();
                    
                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");
                        
                        k.TopicEndpoint<Null, Enter1>(nameof(Enter1), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, Enter2>(nameof(Enter2), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, Enter3>(nameof(Enter3), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, EnterEnter>(nameof(EnterEnter), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                    });
                });
            });
            _serviceProvider = services.BuildServiceProvider();
            var busControl = _serviceProvider.GetRequiredService<IBusControl>();
            var observer = new ReceiveObserver();
            busControl.ConnectReceiveObserver(observer);

            await busControl.StartAsync();
            var tokenSource = new CancellationTokenSource();
            ThreadPool.QueueUserWorkItem(s =>
            {
                Work(busControl, tokenSource.Token).GetAwaiter().GetResult();
            });
            
            while (true)
            {
                var quit = Console.ReadLine();
                if (quit == "quit")
                {
                    tokenSource.Cancel();
                    break;
                }
            }
        }

        private static async Task Work(IPublishEndpoint publisher, CancellationToken token)
        {
            var correlationId = Guid.NewGuid();
            var enter1Producer = _serviceProvider.GetRequiredService<ITopicProducer<Enter1>>();
            
            await enter1Producer.Produce(new {CorrelationId = correlationId, EnteredText = "1"}, token);

            while (token.IsCancellationRequested == false)
            {
                var cancelled = token.WaitHandle.WaitOne(5000);
                if (cancelled)
                    break;
            }
        }
        
        private static Dictionary<string, string> Configuration
        {
            get
            {
                return new Dictionary<string, string>
                {
                    { "bootstrap.servers", "localhost:9092" },
                    { "group.id", "saga.group.id" }
                };
            }
        }
    }
}

Example TestSaga1StateMachine.cs

    public class TestSaga1StateMachine : MassTransitStateMachine<TestSaga1State>
    {
        public TestSaga1StateMachine()
        {
            InstanceState(_ => _.CurrentState);
            Event(() => Enter1Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => Enter2Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => Enter3Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => EnterEnterEvent, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));

            Initially(
                When(Enter1Event)
                    .Then(context => context.Instance.SaveEnter1(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<Enter2>(new {EnteredText = "2"}))
                    .TransitionTo(Entered1)
                );
            During(Entered1,
                When(Enter2Event)
                    .Then(context => context.Instance.SaveEnter2(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<Enter3>(new {EnteredText = "3"}))
                    .TransitionTo(Entered2)
                );
            During(Entered2,
                When(Enter3Event)
                    .Then(context => context.Instance.SaveEnter3(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<EnterEnter>(new {EnteredText = "Enter"}))
                    .TransitionTo(Entered3)
            );
            During(Entered3,
                When(EnterEnterEvent)
                    .Then(context => context.Instance.Print())
                    .TransitionTo(EnteredEnter)
                    .Finalize());

            SetCompletedWhenFinalized();
        }
        
        public State Entered1 { get; set; }
        public State Entered2 { get; set; }
        public State Entered3 { get; set; }
        public State EnteredEnter { get; set; }
        
        public Event<Enter1> Enter1Event { get; set; }
        public Event<Enter2> Enter2Event { get; set; }
        public Event<Enter3> Enter3Event { get; set; }
        public Event<EnterEnter> EnterEnterEvent { get; set; }
    }

This project is just for my learning. I don't understand how to produce message up there The bus configuration is identical to the one in the documentation. The first Enter1 message is published successfully and the saga receives it, but how to send a message to kafka from the saga is not clear


Solution

  • You would need to create a custom state machine activity, with a dependency on the producer interface (setup when Kafka is configured), in order to produce messages to Kafka topics. I recently did a video on this as part of Season 2.

    You can see an example of producer setup in the unit tests

    services.AddMassTransit(x =>
    {
        x.AddRider(rider =>
        {
            rider.AddProducer<KafkaMessage>(Topic);
    
            rider.UsingKafka((context, k) =>
            {
                k.Host("localhost:9092");
            });
        });
    });
    

    Then, in your custom state machine activity, you'd add a constructor dependency on on ITopicProducer<KafkaMessage> and use that to produce the message. It may look similar to this one:

    public class ProduceEnter2Activity :
        Activity<TestSaga1State>
    {
        readonly ITopicProducer<Enter2> _producer;
    
        public ProduceEnter2Activity(ITopicProducer<Enter2> producer)
        {
            _producer = producer;
        }
    
        public void Probe(ProbeContext context)
        {
            context.CreateScope("notifyMember");
        }
    
        public void Accept(StateMachineVisitor visitor)
        {
            visitor.Visit(this);
        }
    
        public async Task Execute(BehaviorContext<TestSaga1State> context, Behavior<TestSaga1State> next)
        {
            await Execute(context);
    
            await next.Execute(context);
        }
    
        public async Task Execute<T>(BehaviorContext<TestSaga1State, T> context, Behavior<TestSaga1State, T> next)
        {
            await Execute(context);
    
            await next.Execute(context);
        }
    
        public Task Faulted<TException>(BehaviorExceptionContext<TestSaga1State, TException> context, Behavior<TestSaga1State> next)
            where TException : Exception
        {
            return next.Faulted(context);
        }
    
        public Task Faulted<T, TException>(BehaviorExceptionContext<TestSaga1State, T, TException> context, Behavior<TestSaga1State, T> next)
            where TException : Exception
        {
            return next.Faulted(context);
        }
    
        async Task Execute(BehaviorContext<TestSaga1State> context)
        {
            await _producer.Produce(new Enter2(...));
        }
    }
    

    Then, in your state machine, you would use:

    .Activity(x => x.OfInstanceType<ProduceEnter2Activity>())