Search code examples
node.jsrxjscqrsmediatormediatr

Dispatching CQRS messages in Node.js


I want to do CQRS for a Node app.

I'm not a Node person, I'm from .NET, which has an excellent library called MediatR which dispatches commands/queries to a mediator that can be used to decouple requests and handlers. So it allows very simple and elegant CQRS.

In the Node world I found many libraries/blogs but they always include Event Sourcing too. I'm not interested in ES.

I can model the commands and queries nicely, but then what? They need to be dispatched somewhere, in a decoupled way to avoid a mess.

From what I know so far about the Node platform, a possible solution is to use the observer pattern (via the RxJs library), so a controller can dispatch messages (i.e. CQRS requests) to an observer, which then publishes corresponding events for subscribers (i.e. request handlers). This decouples controllers and services in a DDD-like design. Though I'm not sure how to pass results back to the controller.

Is this how other people are doing it? Is there a better way in Node?


Solution

  • TL:DR : You don't need some fancy framework, especially when you're only doing intra-process communication, to apply CQRS architecture. The native EventEmitter from the events module is enough. If you want inter-process communication servicebus does a really good job. To take a look at an implementation exemple (of the following long version answer) you can dive into the code of this repository : simple node cqrs

    Let's take an example of a very simple Chat application where you can send messages if the Chat is not closed, and like/unlike messages.

    Our main aggregate (or aggregate root, conceptually), is the Chat (writeModel/domain/chat.js):

    const Chat = ({ id, isClosed } = {}) =>
      Object.freeze({
        id,
        isClosed,
      });
    

    Then, we have a Message aggregate (writeModel/domain/message.js) :

    const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
      Object.freeze({
        id,
        chatId,
        userId,
        content,
        sentAt,
        messageLikes,
      });
    

    The behavior for sending a message could be (writeModel/domain/chat.js) :

    const invariant = require('invariant');
    const { Message } = require('./message');
    
    const Chat = ({ id, isClosed } = {}) =>
      Object.freeze({
        id,
        isClosed,
      });
    
    const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
      invariant(!chatState.isClosed, "can't post in a closed chat");
      return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
    };
    

    We now need the commands (writeModel/domain/commands.js) :

    const commands = {
      types: {
        SEND_MESSAGE: '[chat] send a message',
      },
      sendMessage({ chatId, userId, content, sentAt }) {
        return Object.freeze({
          type: commands.types.SEND_MESSAGE,
          payload: {
            chatId,
            userId,
            content,
            sentAt,
          },
        });
      },
    };
    
    module.exports = {
      commands,
    };
    

    Since we're in javascript, we do not have interface to provide abstraction so we use higher order functions (writeModel/domain/getChatOfId.js) :

    const { Chat } = require('./message');
    
    const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
      try {
        const chatState = await getChatOfId(id);
        if (typeof chatState === 'undefined') {
          throw chatState;
        }
        return chatState;
      } catch (e) {
        throw new Error(`chat with id ${id} was not found`);
      }
    };
    
    module.exports = {
      getChatOfId,
    };
    

    (writeModel/domain/saveMessage.js) :

    const { Message } = require('./message');
    
    const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
    
    module.exports = {
      saveMessage,
    };
    

    We need now to implement our commandHandlers (the application service layer) :

    (writeModel/commandHandlers/handleSendMessage.js)

    const { sendMessage } = require('../domain/chat');
    
    const handleSendMessage = ({
      getChatOfId,
      getNextMessageId,
      saveMessage,
    }) => async sendMessageCommandPayload => {
      const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
      const chat = await getChatOfId(chatId);
      return saveMessage(
        sendMessage({
          chatState: chat,
          messageId: getNextMessageId(),
          userId,
          content,
          sentAt,
        }),
      );
    };
    
    module.exports = {
      handleSendMessage,
    };
    

    Since we don't have interface in javascript, we use higher order functions to apply the Dependency Inversion Principle via injection of the dependencies at runtime.

    We can then implement the composition root of the write model : (`writeModel/index.js) :

    const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
    const { commands } = require('./domain/commands');
    
    const SimpleNodeCQRSwriteModel = ({
      dispatchCommand,
      handleCommand,
      getChatOfId,
      getNextMessageId,
      saveMessage,
    }) => {
      handleCommand(
        commands.types.SEND_MESSAGE,
        handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
      );
    };
    
    module.exports = {
      SimpleNodeCQRSwriteModel,
    };
    

    Your commands and command handler are not tied together, you can then provide an implementation of these functions at runtime, with an in memory database and the node EventEmitter for example (writeModel/infrastructure/inMemory/index.js) :

    const uuid = require('uuid/v1');
    const { saveMessage } = require('../../domain/saveMessage');
    const { getChatOfId } = require('../../domain/getChatOfId');
    const { getNextMessageId } = require('../../domain/getNextMessageId');
    
    const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
      const listeners = [];
    
      const db = {
        ...initialDbState,
      };
    
      const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
    
      const updateDb = updater => {
        updater();
        listeners.map(listener => listener(db));
      };
    
      const saveMessageInMemory = saveMessage(async messageState => {
        updateDb(() => (db.messages[messageState.id] = messageState));
      });
    
      const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
    
      const getNextMessageUuid = getNextMessageId(uuid);
    
      return {
        addOnDbUpdatedListener,
        saveMessage: saveMessageInMemory,
        getChatOfId: getChatOfIdFromMemory,
        getNextMessageId: getNextMessageUuid,
      };
    };
    
    module.exports = {
      InMemoryRepository,
    };
    

    And our TestWriteModel tying it all together :

    const EventEmitter = require('events');
    const { SimpleNodeCQRSwriteModel } = require('../writeModel');
    const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
    
    const TestWriteModel = () => {
      const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
      const commandEmitter = new EventEmitter();
      const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
      const handleCommand = (commandType, commandHandler) => {
        commandEmitter.on(commandType, commandHandler);
      };
      return SimpleNodeCQRSwriteModel({
        dispatchCommand,
        handleCommand,
        getChatOfId,
        getNextMessageId,
        saveMessage,
      });
    };
    

    You can dive into the code (with a very simple read model) in this repository : simple node cqrs