Search code examples
javaamazon-sqstemporal-workflow

Temporal Workflow start workflows with same id in fifo queue and WorkflowTaskTimeout


I'm currently working with the Temporal workflow framework and the Java Temporal SDK. My setup involves an AWS SQS listener that triggers a Temporal workflow. The messages received by the listener contain a sourceId. Based on this sourceId, I need to start Temporal workflows in a queue. However, if there is already a running workflow for the same sourceId, I want to add the new message to a FIFO queue and wait for the previous workflow to complete. Here's an overview of the relevant parts of my code:

@SqsListener(value = QueueName.PROCESSING_QUEUE,
      deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(ProcessingMessage message) {
    var workflowId = createWorkflowId(message.getSourceId());
    var options = WorkflowOptions.newBuilder()
        .setTaskQueue(STARTER_WORKFLOW_QUEUE)
        .setWorkflowId(workflowId)
        .setWorkflowTaskTimeout(Duration.of(1, ChronoUnit.MINUTES))
        .build();
    log.info("Starting workflow with id - {}", workflowId);
    var workflow = workflowClient.newUntypedWorkflowStub("RecordStarterWorkflow", options);
    workflow.signalWithStart("putNewEmployee", new Object[]{message}, new Object[]{});
}

And my RecordStarterWorkflow class

private final LinkedBlockingQueue<SingleEnrollmentProcessingMessage> employeeMessages
      = new LinkedBlockingQueue<>();

@Override
public void startWorkflow() {
    do {
        var message = employeeMessages.poll();
        var childWorkflow = Workflow.newChildWorkflowStub(SomeWorkflow.class, options);
        try {
            childWorkflow.startWorkflow(message);
        } catch (Exception exception) {
            log.error("Workflow failed. Workflow id:{} | Message:{}", options.getWorkflowId(),
                exception.getMessage());
        }
    } while (!employeeMessages.isEmpty());
    log.info("Processing finished");
}

@Override
@SneakyThrows
public void putNewEmployee(ProcessingMessage message) {
    log.debug("Received signal with message: {}", message);
    this.employeeMessages.add(message);
}

The above code works well when processing two sourceId messages. However, as I attempt to scale it and send three or more messages via AWS SQS, I start encountering Timeout exceptions in my workflow execution. This eventually leads to long-running workflows and performance degradation. I'm seeking advice on how to effectively scale my Temporal workflow processing while avoiding these Timeout exceptions. I suspect that the way I'm handling child workflows and their execution might be causing the issue. If anyone has insights, recommendations, or alternative approaches that could help me address this scaling challenge, I would greatly appreciate your input.


Solution

  • Your Workflow is using a LinkedBlockingQueue.

    That can't work, because that causes execution to block entirely, preventing completion of the Workflow Task, which ends up timing out.

    Here is a pertinent snippet from Temporal's Dev Guide:

    Do not use synchronization, locks, or other standard Java blocking concurrency-related classes besides those provided by the Workflow class. There is no need for explicit synchronization because multi-threaded code inside a Workflow is executed one thread at a time and under a global lock.

    • Call Workflow.sleep instead of Thread.sleep.
    • Use Promise and CompletablePromise instead of Future and CompletableFuture.
    • Use WorkflowQueue instead of BlockingQueue.

    The WorkflowQueue class internally uses Workflow.await rather than Java's synchronization primitives, so that wait periods (eg. poll()) do not block completion of the current Workflow Task.

    Once that is fixed, you may also want to reset your WorkflowTaskTimeout to its default value (ie. default is 10 seconds, but just remove the setWorkflowTaskTimeout(...) call).