Search code examples
c#domain-driven-designmicroservicescqrsenterprise-integration

CQRS for streamed data


I am using CQRS segregation and works very well for transactional commands or request-response from one node to remote node.

I have a use case where a command will be issued to a remote node, and this will result in a "stream" data (much like a remote command running, and the server giving us updates in text as it progresses):

// this is sent from requesting node to remote node to initiate the stream
public class LongRunningCommand: ICommand 
{
    Guid Session { get; set; } // the session ID to use
    string CommandLine {get; set; } // the command the remote note will run
}

This data is then sent in a number of packets over a period of time from the remote node to the requesting node:

// this is sent from remote node to requestor in multiple updates over time
public class UpdateProgress: ICommand
{
    Guid Session { get; set; } // possibility to multiplex sessions
    int Sequence { get; set; } // de-dupe/resequencing out of order packets (lower QOS)
    byte[] Payload { get; set; } // the data to be passed to the application
}

This is not really a command, nor is it a request-reply (as there are multiple replies) - it is a long running session of sorts, but I am not sure how this fits in with CQRS.

What would be the best way to order this? Could my requesting node have a Command Handler like the below (where UpdateProgress is the "command" being handled):

public class UpdateProgressCommandHandler : ICommandHandler<UpdateProgress>
{
   public async Task HandleAsync(UpdateProgress message)
   {
      // resequence in handler or chained infrastructure - omitted for brevity
      var window = GetWindowForSession(message.Session);
      var updateFromServer = System.Text.Encoding.UTF8.GetString(message.Payload);
      await window.WriteLine(updateFromServer);
   }
}

The above works (and i think fairly well), but terminology seems a bit funky (the command name UpdateProgress is more of an event than a command).

Or am I better off dropping the notion of commands/queries all together, and go full event bus, and if I did, how would i handle the initial request as that is not an event its more of a command (which wouldn't make sense semantically on an event bus that deals with events - not commands or queries).

Or am i getting caught up in naming convention only? Since first time doing this, appreciate a best-practice view for the above use case.


Solution

  • I'm not sure if I understood you correctly, a Command where it needs to talk to a remote node does not appear to be part of your Domain. This is not say it is not a Command, you can still define that as Command but not within your Domain IMO. You could potentially look into integration events here.

    Without having a full understanding of your domain, here is how you could define your process:

    • Execute Command to modify your domain (something like Status: Pending)

    • Raise an Integration Event from your CommandHandler into a separate worker/ Service Bus

    • The separate worker completes the process and then raises another integration event

    • Your worker subscribes to that event and updates the relevant pieces of your domain (eg. Status: Completed).