Search code examples
springdomain-driven-designcqrsevent-sourcingaxon

Value Object as @AggregateIdentifier and @TargetAggregateIdentifier


First I apologize for the very long post. There is quite some code to be showed to have a detailed understanding of the problem hence lost of stuff to post... Please be so kind to read all of it :-)

I am trying to develop a event sourcing based application using the Axon framework together with a Spring Boot application. I provided a number of class definitions bellow making up the aggregate, command and event implementations.

I wrote a very simple test application (Spring) which does no more than sending a CreateAppointmentCommand to Axon. This create command uses a manually assigned AppointmentId (which is a subclass a AbstractId) and returns this ID for later use. Nothing wrong with that, the constructor command handler in Appointment is called as expected and the corresponding ApppointmentCreatedEvent is fired and also handled as expected by the Appoitment class. So far, so good. The problem arises when I send a ConfirmAppointmentCommand with the ID returned by the create command. Under these circumstances I receive an error message :

Command 'ConfirmAppointmentCommand' resulted in org.axonframework.commandhandling.CommandExecutionException(Provided id of the wrong type for class Appointment. Expected: class AppointmentId, got class java.lang.String)

I don't understand a few things in this setup related to this error message:

  1. Why do the create command and event work as expected while they are using the same approach (at least to my understanding so far) compared to the confirm command/event?
  2. Why is Axon complaining about an AppointmentId as the identifier for (asumably the aggregate) while the corresponding code (see below) annotates both String types for the @AggregateIdentier and the @TargetAggregateIdentier?
  3. Am I allowed to store an aggregate directly to a persistent store using the same code for both the aggregate and the entity (in this case a JPA repository managed by Spring and linked to a relational database) while being used by Axon (I don't think I should use the State-Stored Aggregates approach described in the reference guide because I still want my solution to be event driven for the creating and updating of appointments)?
  4. Is this the correct approach to keep the state of the aggregate up to date using the event mechanism and is it ok to have a different Spring @Component class which is implementing a series of @EventHandler methods to do the CRUD operations toward the relational database. In the latter, the create event is handled as expected an the appointment gets stored in the database. The confirm events are not being triggered due to the previous error message.
  5. Refering to item 4 I am a bit confused about what will happen if Axon is restarted and starts emitting the different events to the event handler in 4. Won't this result in a lot of database errors because the appointment are still in place in the database, or in the worst case endless duplicates of the same appointments? In other words, there seems to be something wrong with the approach I am using in this project and in my understanding of event driven applications/services.

Please check to different class definitions below for more detailled information. First I have the root aggregate Appointment which will be used as JPA entity at the same time.

@Aggregate
@Entity
@Table(name = "t_appointment")
public final class Appointment extends AbstractEntity<AppointmentId> {

    //JPA annotated class members left out for brevity

    @PersistenceConstructor
    private Appointment() {
        super(null);
        //Sets all remaining class members to null.
    }

    @CommandHandler
    private Appointment(CreateAppointmentCommand command) {
        super(command.getAggregateId());
        validateFields(getEntityId(), ...);
        AggregateLifecycle.apply(new AppointmentCreatedEvent(getEntityId(), ...);
    }

    @EventSourcingHandler
    private void on(AppointmentCreatedEvent event) {
        validateFields(event.getAggregateId(), ...);
        initFields(event.getAggregateId(), ...);
    }

    private void validateFields(AppointmentId appointmentId, ...) {
        //Check if all arguments are within the required boundaries.
    }

    private void initFields(AppointmentId appointmentId, ...) {
        //Set all class level variables to passed in value.
    }

    @CommandHandler
    private void handle(ConfirmAppointmentCommand command) {
        AggregateLifecycle.apply(new AppointmentConfirmedEvent(command.getAggregateId()));
    }

    @EventSourcingHandler
    private void on(AppointmentConfirmedEvent event) {
        confirm();
    }

    public void confirm() {
        changeState(State.CONFIRMED);
    }   

    //Similar state changing command/event handlers left out for brevity.

    private void changeState(State newState) {
        switch (state) {
        ...
        }
    }

    //All getter methods left out for brevity. The aggregate does NOT provide any setters.

    @Override
    public String toString() {
        return "Appointment [...]";
    }
}

The AbstractEntity class is a base class for all JPA entities and aggregates. This class has the following definition.

@MappedSuperclass
@SuppressWarnings("serial")
public abstract class AbstractEntity<ENTITY_ID extends AbstractId> implements Serializable{

    @EmbeddedId
    private ENTITY_ID entityId;

    @AggregateIdentifier
    private String targetId;


    protected AbstractEntity(ENTITY_ID id) {
        this.LOG = LogManager.getLogger(getClass());
        this.entityId = id;
        this.targetId = id != null ? id.getId() : null;
    }

    public final ENTITY_ID getEntityId() {
        return entityId;
    }

    @Override
    public final int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((entityId == null) ? 0 : entityId.hashCode());
        return result;
    }

    @Override
    public final boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AbstractEntity<?> other = (AbstractEntity<?>) obj;
        if (entityId == null) {
            if (other.entityId != null)
                return false;
        } else if (!entityId.equals(other.entityId))
            return false;
        return true;
    }
}

The entityId (which will be used as the primary key for the JPA entities) is a 'complex' value object having the following base class definition.

@MappedSuperclass
@SuppressWarnings("serial")
public abstract class AbstractId implements Serializable{

    @Column(name = "id")
    private String id;


    protected AbstractId() {
        this.id = UUID.randomUUID().toString();
    }

    public final String getId() {
        return id;
    }

    @Override
    public final int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((id == null) ? 0 : id.hashCode());
        return result;
    }

    @Override
    public final boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AbstractId other = (AbstractId) obj;
        if (id == null) {
            if (other.id != null)
                return false;
        } else if (!id.equals(other.id))
            return false;
        return true;
    }

    public final String toString() {
        return id;
    }
}

Within the aggregate, a number of commands and events are used. Each command is a subclass of Command.

@SuppressWarnings("serial")
public abstract class Command<AGGREGATE_ID extends AbstractId> implements Serializable{

    private AGGREGATE_ID aggregateId;

    @TargetAggregateIdentifier
    private String targetId;

    protected Command(AGGREGATE_ID aggregateId) {
        if(aggregateId == null) {
            throw new InvalidArgumentException(...);
        }   
        this.aggregateId = aggregateId;
        this.targetId = aggregateId != null ? aggregateId.getId() : null;
    }

    public final AGGREGATE_ID getAggregateId() {
        return aggregateId;
    }   
}

A specified command class (which causes difficulties in my approach) is the ConfirmAppointmentCommand which is actually no more than a concrete implementation of the base Command class. The implementation is therefore very straight forward.

public final class ConfirmAppointmentCommand extends Command<AppointmentId> {
    private static final long serialVersionUID = 6618106729289153342L;

    public ConfirmAppointmentCommand(AppointmentId appointmentId) {
        super(appointmentId);       
    }   
}

The CreateAppointmentCommand is very similar to the ConfirmAppointmentCommand and is defined as follows.

public final class CreateAppointmentCommand extends Command<AppointmentId> {
    private static final long serialVersionUID = -5445719522854349344L;

    //Some additional class members left out for brevity.

    public CreateAppointmentCommand(AppointmentId appointmentId, ...) {
        super(appointmentId);

        //Check to verify the provided method arguments are left out.

        //Set all verified class members to the corresponding values.
    }

    //Getters for all class members, no setters are being implemented.

}

For the different events used in the project, a similar approach is used. All events subclass a base DomainEvent class as defined below.

    @SuppressWarnings("serial")
    public abstract class DomainEvent<T extends AbstractId> implements Serializable{

        private T aggregateId;


        protected DomainEvent(T aggregateId) {
            if(aggregateId == null) {
                throw new InvalidArgumentException(ErrorCodes.AGGREGATE_ID_MISSING);
            }           
            this.aggregateId = aggregateId;
        }

        public final T getAggregateId() {
            return aggregateId;
        }   
    }

The AppointmentCreatedEvent is pretty straight forward.

public final class AppointmentCreatedEvent extends DomainEvent<AppointmentId> {
    private static final long serialVersionUID = -5265970306200850734L;

    //Class members left out for brevity

    public AppointmentCreatedEvent(AppointmentId appointmentId, ...) {
        super(appointmentId);

        //Check to verify the provided method arguments are left out.

        //Set all verified class members to the corresponding values.
    }

    //Getters for all class members, no setters are being implemented.
}

And finally for completeness, the AppointmentConfirmedEvent.

public final class AppointmentConfirmedEvent extends DomainEvent<AppointmentId> {
    private static final long serialVersionUID = 5415394808454635999L;

    public AppointmentConfirmedEvent(AppointmentId appointmentId) {
        super(appointmentId);       
    }
}

Few, you made it until the end of the post. Thanks for that in the first please! Could you please advise me about where things are going wrong or what I am doing wrong?

Greatful regards, Kurt


Solution

  • Question 3 From your third question I notice you do not want to use Axon's State-Stored Aggregate approach, but Event Sourcing instead. On the other hand, you are storing the Aggregate as a state object too by making it an entity.

    What is your intent with this? If this is to use the Appointment to be returned to interested parties, than you should know that you are not following CQRS on that matter.

    The @Aggregate annotated class within Axon typically points towards the Command Model. It's thus purely used to ingest commands, decide whether that command's expression of intent can be performed and publish events as a result to this.

    Added, you state you are putting this in a Spring Boot application. From there I am assuming you are using the axon-spring-boot-starter dependency as well. When using Axon's Spring auto configuration, the @Aggregate works as a "Spring Stereotype". On top of this if the @Aggregate annotated object is also annotated with @Entity, than the auto configuration assumes you want to store the Aggregate as-is. Thus, it'll default to having a State-Stored Aggregate; something you state isn't what you want.

    Questions 1 and 2 The create command is likely working as this is the initiation point of the Aggregate. Hence, it isn't retrieving an existing format based on an identifier yet.

    Secondly, the exception you receive, although wrapped in a CommandExecutionException, originally comes from likely your database. Doing a quick search for the text Provided id of the wrong type for class in Axon's code base doesn't lead to anything. Note that Axon will assume ID's to be convertible to String always. Hence a dedicated toString() method can be beneficial to not append undesired information to the String.

    It's this part where Allard is asking more info about, as it's likely to do with the fact that the Aggregate in essence is State-Stored right now. Thus, the exception bubbles up from the JPA implementation used by the GenericJpaRepository (it's this repo. which Axon auto configures for you given the current set up) for the given Aggregate.

    Questions 4 and 5 It is indeed completely fine to update your Aggregate through @EventSourcingHandler annotated methods and have a distinct Spring Component within your application which handles the events to update projections. I'd regard this as "the way to go" when doing CQRS through Axon.

    The last concern you're having requires me to make an assumption. I am guessing you haven't configured anything specific around the Event Processor being used. This means Axon will auto configure a TrackingEventProcessor for you. One of the things this implementation does is store it's progress of "how far it is with handling events in the event stream" in a Token. These tokens should in turn be stored alongside your projections, as they define how up-to-date your projections are when it comes to the entire event stream.

    If you notice that the event handlers from within the Event Handling Component to be called every time on start up, that to me signals that the token_entry table is either not there or it's being cleared on every start up.

    Concluding Quite a mouthful here, definitely hope this helps you out Kurt! If anything is unclear, please comment on my answer; I'll update my response accordingly.