I want to do CQRS for a Node app.
I'm not a Node person, I'm from .NET, which has an excellent library called MediatR which dispatches commands/queries to a mediator that can be used to decouple requests and handlers. So it allows very simple and elegant CQRS.
In the Node world I found many libraries/blogs but they always include Event Sourcing too. I'm not interested in ES.
I can model the commands and queries nicely, but then what? They need to be dispatched somewhere, in a decoupled way to avoid a mess.
From what I know so far about the Node platform, a possible solution is to use the observer pattern (via the RxJs library), so a controller can dispatch messages (i.e. CQRS requests) to an observer, which then publishes corresponding events for subscribers (i.e. request handlers). This decouples controllers and services in a DDD-like design. Though I'm not sure how to pass results back to the controller.
Is this how other people are doing it? Is there a better way in Node?
TL:DR : You don't need some fancy framework, especially when you're only doing intra-process communication, to apply CQRS architecture. The native EventEmitter
from the events
module is enough. If you want inter-process communication servicebus
does a really good job. To take a look at an implementation exemple (of the following long version answer) you can dive into the code of this repository : simple node cqrs
Let's take an example of a very simple Chat application where you can send messages if the Chat is not closed, and like/unlike messages.
Our main aggregate (or aggregate root, conceptually), is the Chat
(writeModel/domain/chat.js
):
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
Then, we have a Message
aggregate (writeModel/domain/message.js
) :
const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
Object.freeze({
id,
chatId,
userId,
content,
sentAt,
messageLikes,
});
The behavior for sending a message could be (writeModel/domain/chat.js
) :
const invariant = require('invariant');
const { Message } = require('./message');
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
invariant(!chatState.isClosed, "can't post in a closed chat");
return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};
We now need the commands (writeModel/domain/commands.js
) :
const commands = {
types: {
SEND_MESSAGE: '[chat] send a message',
},
sendMessage({ chatId, userId, content, sentAt }) {
return Object.freeze({
type: commands.types.SEND_MESSAGE,
payload: {
chatId,
userId,
content,
sentAt,
},
});
},
};
module.exports = {
commands,
};
Since we're in javascript, we do not have interface
to provide abstraction so we use higher order functions
(writeModel/domain/getChatOfId.js
) :
const { Chat } = require('./message');
const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
try {
const chatState = await getChatOfId(id);
if (typeof chatState === 'undefined') {
throw chatState;
}
return chatState;
} catch (e) {
throw new Error(`chat with id ${id} was not found`);
}
};
module.exports = {
getChatOfId,
};
(writeModel/domain/saveMessage.js
) :
const { Message } = require('./message');
const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
module.exports = {
saveMessage,
};
We need now to implement our commandHandlers
(the application service layer) :
(writeModel/commandHandlers/handleSendMessage.js
)
const { sendMessage } = require('../domain/chat');
const handleSendMessage = ({
getChatOfId,
getNextMessageId,
saveMessage,
}) => async sendMessageCommandPayload => {
const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
const chat = await getChatOfId(chatId);
return saveMessage(
sendMessage({
chatState: chat,
messageId: getNextMessageId(),
userId,
content,
sentAt,
}),
);
};
module.exports = {
handleSendMessage,
};
Since we don't have interface
in javascript, we use higher order functions
to apply the Dependency Inversion Principle via injection of the dependencies at runtime.
We can then implement the composition root of the write model : (`writeModel/index.js) :
const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');
const SimpleNodeCQRSwriteModel = ({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
}) => {
handleCommand(
commands.types.SEND_MESSAGE,
handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
);
};
module.exports = {
SimpleNodeCQRSwriteModel,
};
Your commands
and command handler
are not tied together, you can then provide an implementation of these functions at runtime, with an in memory database and the node EventEmitter
for example (writeModel/infrastructure/inMemory/index.js
) :
const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');
const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
const listeners = [];
const db = {
...initialDbState,
};
const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
const updateDb = updater => {
updater();
listeners.map(listener => listener(db));
};
const saveMessageInMemory = saveMessage(async messageState => {
updateDb(() => (db.messages[messageState.id] = messageState));
});
const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
const getNextMessageUuid = getNextMessageId(uuid);
return {
addOnDbUpdatedListener,
saveMessage: saveMessageInMemory,
getChatOfId: getChatOfIdFromMemory,
getNextMessageId: getNextMessageUuid,
};
};
module.exports = {
InMemoryRepository,
};
And our TestWriteModel
tying it all together :
const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
const TestWriteModel = () => {
const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
const commandEmitter = new EventEmitter();
const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
const handleCommand = (commandType, commandHandler) => {
commandEmitter.on(commandType, commandHandler);
};
return SimpleNodeCQRSwriteModel({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
});
};
You can dive into the code (with a very simple read model
) in this repository : simple node cqrs