We are using the Axon Framework to implement the Saga Pattern in Java. Axon uses two tables (ASSOCIATION_VALUE_ENTRY
and SAGA_ENTRY
) to store all the necessary information after each step of the saga. And at the end of the process (if it is correct, or, in case of error, all the compensations have been executed), it deletes the registers.
If for any reason, after an error, the compensations cannot be executed, we are able to resume the execution at the point where it failed, based on the stored information. Until here, everything is ok.
The issue came when we wanted to improve the resilience of the process and we checked what happened if the service died during the execution of a saga. According to the above, we expected the information of the execution to be persisted in the tables, but they were empty: the information only appeared when the process couldn't continue due to an error in a compensation (and no final delete action was executed).
Analyzing the source code of the Axon's JpaSagaStore
class implementation, the interactions with the database (insert, update and delete) are persisted with a flush instead of a commit. The global commit is managed in the AbstractUnitOfWork
class (as far as we understand). And here is where we have the doubts:
READ_UNCOMMITED
state. The only way to see them in the database would be activating the READ_UNCOMMITED
isolation level, with the problematic of the 'dirty reads', right? There would be any additional consideration/issue to have into account?READ_UNCOMMITED
mode (due to internal policies).EDIT:
Summarizing it a lot, all starts with this method
public void startSaga(SagaWorkflow sagaWorkflow, Serializable sagaInput) {
StartSagaEvt startSagaEvt = StartSagaEvt.builder().sagaWorkflow(sagaWorkflow).sagaInput(sagaInput).build();
eventBus.publish(GenericEventMessage.asEventMessage(startSagaEvt));
}
Where:
After this, Axon performs all its 'magic' and finally arrives to the internal code:
AnnotatedSagaRepository.doCreateInstance
--> AnnotatedSagaRepository.storeSaga
--> [...] --> JpaSagaStore.insertSaga
public void insertSaga(Class<?> sagaType, String sagaIdentifier, Object saga, Set<AssociationValue> associationValues) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
AbstractSagaEntry<?> entry = createSagaEntry(saga, sagaIdentifier, serializer);
entityManager.persist(entry);
for (AssociationValue associationValue : associationValues) {
storeAssociationValue(entityManager, sagaType, sagaIdentifier, associationValue);
}
if (logger.isDebugEnabled()) {
logger.debug("Storing saga id {} as {}", sagaIdentifier, serializedSagaAsString(entry));
}
if (useExplicitFlush) {
entityManager.flush();
}
}
The same applies for the update and delete phases. As far as I know, all the handle of the commit/rollback is performed in the class AbstractUnitOfWork, that intervenes just at the end of the complete saga flow.
This leads me to the following considerations/questions: what sense has to keep the transaction open during the whole process instead of committing after each step? If for any reason the process fails, goes down, the database is not accessible,... all the saved information is lost.
There must be a design reason for this behavior, but I'm not able to see it. Or maybe there is a configuration to change it (hopefully, although I doubt it).
Thanks in advance for any comment!
EDIT 2
Effectively, we are using it as a kind of state machine, where the saga flow is a sequence of steps, each one with an action and a compensation, and we jump from one to another until reach an "END" status.
@Saga
class GenericSaga {
private EventBus eventBus;
private CustomCommandGateway commandGateway;
[...]
@StartSaga
@SagaEventHandler(associationProperty = "sagaId")
public void startStep(StartSagaEvt startSagaEvt) {
// Initializes de GenericSaga and associate several properties with SagaLifecycle.associateWith(key, value);
[...]
// Transit to the next (first) step
eventBus.publish(GenericEventMessage.asEventMessage(new StepSagaEvt(startSagaEvt)));
}
@SagaEventHandler(associationProperty = "sagaId")
public void nextStep(StepSagaEvt stepSagaEvt) {
// Identifies what is the next step in the defined flow, considering if it should be executed sequentially or concurrently, or if it is the end of the flow and then call the SagaLifecycle.end()
[...]
// Also checks if it has to execute the compensation logic of the step
[...]
// Execute
Serializable actionOutput = commandGateway.sendAndWaitEx(stepAction.getActionInput());
}
@SagaEventHandler(associationProperty = "sagaId")
public void resumeSaga(ResumeSagaEvt resumeSagaEvt) {
// Recover information from the execution that we want to resume
[...]
// Transit to the next step
eventBus.publish(GenericEventMessage.asEventMessage(new StepSagaEvt(resumeSagaEvt)));
}
}
As you can see, we don't have an endSaga annotation, and maybe that's the problem. But in our current situation we have kicked forward, and be have defined our custom implementation of the JpaSagaStore, in order to force a local transaction in the insertSaga and updateSaga methods.
Based on my understanding, I think you are somehow misusing the Saga
component from Axon Framework. I assume from your question that you are trying to build a form of a 'state machine' using your own SagaWorkflow
object. If that is the case, I have to say this is not how Axon intends the usage of Sagas.
To add to that, let me give you a pseudo-sample of what a Saga should look like.
@Saga
class SagaWorkflow {
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "yourProperty")
public void on(SagaInputEvent event) {
// validate, associate with another property and fire a command
SagaLifecycle.associateWith("associationPropertyKey", "associationPropertyValue");
commandGateway.send(new GivenCommand());
}
@SagaEventHandler(associationProperty = "associationPropertyValue")
public void on(AnotherEvent event) {
// validate and fire a command or finish the saga
SagaLifecycle.end();
}
@EndSaga
@SagaEventHandler(associationProperty = "anyProperty")
public void on(FinishSagaEvent event) {
// check if you need to fire extra commands to tell others it's finished or just do it silently
}
}
@Saga
Annotation will make sure Axon Framework handles the whole Saga process for you, storing (serializing) it to the database when each (Saga)EventHandler
is executed@SagaEventHandler
will make sure the 'Event Handling method' reacts to a given Event, only if it contains the associationProperty
as part of the Event (to understand it better, I will share our docs link)@EndSaga
will tell Axon Framework to finalize the Saga after the execution of the method (finalizing means deleting it from the database)SagaLifecycle
provides several 'utilities' methods to interact with the Saga's lifecycle and associationsCommandGateway
transient because the Saga is serialized and stored on the database. You would not Axon to serializer any external component, like the gateway, as wellOf course, there is more to it. You can check Axon's docs for that. But I hope this gives you enough material and ideas to use Sagas within Axon Framework better!
KR