Search code examples
rebus

Rebus: Multiple sagas handling same message -OR- best practice for request/reply in saga


I have a scenario where I'm processing a message in a Saga and might need more information and thus send a request message for which I expect a reply.

The issue is that the reply does not have information unique to the current saga - it has an ID that I would like to use but multiple sagas might be requesting this ID at the same time and thus Rebus disallows it (exception: Correlation property 'Bla' has value '86' in existing saga data with ID 2d12a863-12ed-4632-82d8-290e041c4eed).

If there is no way to have a single message be handled by multiple sagas the alternative for me would be to be able to match the reply to the requesting saga. As far as I can tell there used to be support for this, but it was removed in a later version.

I've tried implementing this using the rbs2-corr-id header, and it works in my tests, however it feels like a hack.

Is there a better way to do it? Without modifying the messages?

I've considered using another saga to act as a sort of proxy by correlating on the ID that might be shared and having a list of correlation IDs for the original saga. I worry however that there might be concurrency issues causing the original saga to wait for the proxy saga.

The following code should show the problem:

public class Message
{
    public Guid Id { get; set; }
    public int OtherId { get; set; }
}

public class Request
{
    public int OtherId { get; set; }
}

public class Response
{
    public int OtherId { get; set; }
    public string MissingInfo { get; set; }
}

public class SagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    public Guid MessageId { get; set; }
    public int OtherId { get; set; }
}

public class MySaga : Saga<SagaData>, IAmInitiatedBy<Message>, IHandleMessages<Response>
{
    IBus _bus;

    public MySaga(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(Message message)
    {
        Data.OtherId = message.OtherId;

        // Send Request expecting someone to .Reply(new Response { OtherId = ,... })
        await _bus.Send(new Request { OtherId = message.OtherId });
    }

    public async Task Handle(Response message)
    {
        // Do something with message.MissingInfo
    }

    protected override void CorrelateMessages(ICorrelationConfig<SagaData> config)
    {
        config.Correlate((Message m) => m.Id, s => s.MessageId);

        // This works as long as only one saga has this ID
        config.Correlate((Response m) => m.OtherId, s => s.OtherId);
    }
}

Solution

  • I've tried implementing this using the rbs2-corr-id header, and it works in my tests, however it feels like a hack.

    Well... it's a clever hack. 😅 I think this is actually the best you can do: Take advantage of the fact that the correlation ID of the request is under your control, and the reply will carry back the same correlation ID.

    How about setting the correlation ID to the ID you'd prefer to see again, when you receive the reply?

    And then you correlate your reply with something like

    protected override void CorrelateMessages(ICorrelationConfig<InviteToTeamByEmail> config)
    {
        config.CorrelateHeader<YourReply>(Headers.CorrelationId, d => d.Bla);
    }
    
    

    (assuming that the name of the correlation property was actually Bla... 😆)