Search code examples
typescriptevent-sourcingeventstoredb

Event Sourcing and cqrs with eventstore db


store db and event-sourcing, but I have doubts regarding projections and cqrs. So far this is the way in which I call my commando and my command handler:

create-user-command

export class CreateUserCommand implements ICommand {
  constructor(
    public readonly userDto: UserStruct,
  ) {}
}

command-handler:

export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(private readonly publisher: EventPublisher) {}

  async execute(command: CreateUserCommand) {
    const { userDto } = command;
    const user = User.create(userDto);
        console.log(user.value)
    if (user.isLeft()) throw user.value;
    const userPublisher = this.publisher.mergeObjectContext(user.value);
        userPublisher.commit()
  }
}

event:

export class UserCreatedEvent implements IEvent {
  static readonly NAME = "UniFtcIdade/user-registered";
  readonly $name = UserCreatedEvent.NAME;
  readonly $version = 0;
  constructor(
    public readonly aggregateId: string,
    public readonly state: { email: string; name: string },
    public readonly date: Date
  ) {
  }
}

domain:

export class User extends AggregateRoot {
  public readonly name: string;
  public readonly email: string;

  private constructor (guid: string, name: string, email: string) {
    super()
    this.apply(new UserCreatedEvent(guid, {email, name}, new Date()));
  }
  static create(
    dto: UserStruct
  ): Either<InvalidNameError | InvalidEmailError, User> {
    const name: Either<InvalidNameError, Name> = Name.create(dto.name);
    const email: Either<InvalidEmailError, Email> = Email.create(dto.email);
    if (name.isLeft()) return left(name.value);
    if (email.isLeft()) return left(email.value);
    const user = new User(v4(),name.value.value, email.value.value);
    return right(user);
  }
}

But I have doubts about how projections would enter this situation. The projection is used to get the current state of the aggregarate ??? I should have a db as mongodb to save the current state, that is, every time I call my command handler and I change the current state in the mongodb ??? Is the projection of the eventstoredb for this? to save the current state of aggregrate ??


Solution

  • In CQRS, when using EventStoreDb, your aggregate must be designed to be restored to a state from Events. Events are stored in a stream with a unique name and identifier (guid). When modifying the aggregate, you must read this stream, and apply each event in sequence to restore the current state, before executing any changes to the aggregate (which generates more events). In order to maintain integrity and handle optimistic concurrency, you should have a simple version check in your aggregate which counts the old events + new events to ascertain to latest version number to be persisted.

    The issues I see above are as follows. Your aggregate has a constructor and a static method which generates events without any validation of the current state i.e.: What happens if I call create twice with the same guid?

    this.apply(new UserCreatedEvent(guid, {email, name}, new Date()));

    You are applying state here directly. Instead, you should raise the event inside your Create method.

    this.raiseEvent(new UserCreatedEvent(guid, {email, name}, new Date()));

    This should be implemented to do the following.

    • Added to a list of uncommitted events
    • this.apply called

    You should then persist the events to the EventStoreDb in your command handler.

    async execute(command: CreateUserCommand) {
        const { userDto } = command;
        const user = eventRepository.Get<User>(command.Id);
        user.Create(userDto); // Can now check current state and fail if required.
        eventRepository.Save(user)
      }
    

    The repository here is simple. It can create an empty User and apply all of the events in order before returning the user. The save should just read a list of uncommitted events and save them to the user stream.

    That's the command side done, For the read side, you can use an out of the box category projection of all Users, and write them to mongo to be read by a different API (not your command handler).