Search code examples
javaamazon-web-servicesamazon-swf

How to add second activity in Amazon SWF hello_sample example


I've successfully implemented the simple Java Amazon SWF example called hello_sample. I created the ActivityWorker executable that polls SWF for activity tasks to process. I created the WorkflowWorker executable that polls SWF for decision tasks and I have a WorkflowStarter executable that kicks off the workflow execution. It works as advertised. What I don't understand is how do I configure and add a second activity to run after the first activity?
WorkflowWorker:

public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
    PollForDecisionTaskRequest task_request =
        new PollForDecisionTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST));

    while (true) {
        System.out.println(
                "WorkflowWorker is polling for a decision task from the tasklist '" +
                Constants.TASKLIST + "' in the domain '" +
                Constants.DOMAIN + "'.");

        DecisionTask task = swf.pollForDecisionTask(task_request);

        String taskToken = task.getTaskToken();
        if (taskToken != null) {
            try {
                executeDecisionTask(taskToken, task.getEvents());
            }
            catch (Throwable th) {
                th.printStackTrace();
            }
        }
    }
}

private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
    List<Decision> decisions = new ArrayList<Decision>();
    String workflow_input = null;
    int scheduled_activities = 0;
    int open_activities = 0;
    boolean activity_completed = false;
    String result = null;

    System.out.println("WorkflowWorker is executing the decision task for the history events: [");
    for (HistoryEvent event : events) {
        System.out.println("  " + event);
        switch(event.getEventType()) {
            case "WorkflowExecutionStarted":
                workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
                break;
            case "ActivityTaskScheduled":
                scheduled_activities++;
                break;
            case "ScheduleActivityTaskFailed":
                scheduled_activities--;
                break;
            case "ActivityTaskStarted":
                scheduled_activities--;
                open_activities++;
                break;
            case "ActivityTaskCompleted":
                open_activities--;
                activity_completed = true;
                result = event.getActivityTaskCompletedEventAttributes().getResult();
                break;
            case "ActivityTaskFailed":
                open_activities--;
                break;
            case "ActivityTaskTimedOut":
                open_activities--;
                break;
        }
    }
    System.out.println("]");

    if (activity_completed) {
        decisions.add(
            new Decision()
                .withDecisionType(DecisionType.CompleteWorkflowExecution)
                .withCompleteWorkflowExecutionDecisionAttributes(
                    new CompleteWorkflowExecutionDecisionAttributes()
                        .withResult(result)));
    }
    else {
        if (open_activities == 0 && scheduled_activities == 0) {
            ScheduleActivityTaskDecisionAttributes attrs =
                new ScheduleActivityTaskDecisionAttributes()
                    .withActivityType(new ActivityType()
                        .withName(Constants.ACTIVITY)
                        .withVersion(Constants.ACTIVITY_VERSION))
                    .withActivityId(UUID.randomUUID().toString())
                    .withInput(workflow_input);

            decisions.add(
                    new Decision()
                        .withDecisionType(DecisionType.ScheduleActivityTask)
                        .withScheduleActivityTaskDecisionAttributes(attrs));
        }
        else {
            // an instance of HelloActivity is already scheduled or running. Do nothing, another
            // task will be scheduled once the activity completes, fails or times out
        }
    }

    System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
    swf.respondDecisionTaskCompleted(
        new RespondDecisionTaskCompletedRequest()
            .withTaskToken(taskToken)
            .withDecisions(decisions));
}

}

ActivityWorker:

public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;

private static String executeActivityTask(String g_species) throws Throwable {
    String s = "   ********   Hello, " + g_species + "!";
    System.out.println(s);

    String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
    String filename = "g_species.txt";
    Path filePath = Paths.get(cwd, filename);
    String filePathName = filePath.toString();

    BufferedWriter output = null;
    try {
        File file = new File (filePathName);
        output = new BufferedWriter(new FileWriter(file));
        output.write(g_species);
    } 
    catch (IOException e) {
        e.printStackTrace();
    } 
    finally {
      if (output != null) {
        output.close();
      }
    }

    return g_species;
}

public static void main(String[] args) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                terminate = true;
                System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
                waitForTermination.await(60, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                // ignore
                System.out.println(e.getMessage());
            }
        }
    });
    try {
        pollAndExecute();
    }
    finally {
        waitForTermination.countDown();
    }
}

public static void pollAndExecute() {
    while (!terminate) {
        System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
                + Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");

        ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST)));

        String taskToken = task.getTaskToken();

        if (taskToken != null) {
            String result = null;
            Throwable error = null;

            try {
                System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
                result = executeActivityTask(task.getInput());
            }
            catch (Throwable th) {
                error = th;
            }

            if (error == null) {
                System.out.println("The activity task succeeded with result '" + result + "'.");
                swf.respondActivityTaskCompleted(
                    new RespondActivityTaskCompletedRequest()
                        .withTaskToken(taskToken)
                        .withResult(result));
            }
            else {
                System.out.println("The activity task failed with the error '"
                        + error.getClass().getSimpleName() + "'.");
                swf.respondActivityTaskFailed(
                    new RespondActivityTaskFailedRequest()
                        .withTaskToken(taskToken)
                        .withReason(error.getClass().getSimpleName())
                        .withDetails(error.getMessage()));
            }
        }
    }
}

}

WorkflowStarter that kicks it all off:

public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";

public static void main(String[] args) {

    String workflow_input = "Amazon SWF";
    if (args.length > 0) {
        workflow_input = args[0];
    }

    System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
            "' with input '" + workflow_input + "'.");

    WorkflowType wf_type = new WorkflowType()
        .withName(Constants.WORKFLOW)
        .withVersion(Constants.WORKFLOW_VERSION);

    Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
        .withDomain(Constants.DOMAIN)
        .withWorkflowType(wf_type)
        .withWorkflowId(WORKFLOW_EXECUTION)
        .withInput(workflow_input)
        .withExecutionStartToCloseTimeout("90"));

    System.out.println("Workflow execution started with the run id '" +
            run.getRunId() + "'.");
}

}


Solution

  • I would recommend to not reinvent the wheel and use the AWS Flow Framework for Java that is officially supported by Amazon. It already implements all the low level details and allows you to focus on a business logic of your workflow directly.

    Here is an example worklow that uses three activities (taken from the developer guide).

    Activities interface:

    import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
    import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;
    
    @ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
                                 defaultTaskStartToCloseTimeoutSeconds = 10)
    @Activities(version="1.0")
    
    public interface GreeterActivities {
       public String getName();
       public String getGreeting(String name);
       public void say(String what);
    }
    

    Activities implementation:

    public class GreeterActivitiesImpl implements GreeterActivities {
       @Override
       public String getName() {
          return "World";
       }
       @Override
       public String getGreeting(String name) {
          return "Hello " + name;
       }
       @Override
       public void say(String what) {
          System.out.println(what);
       }
    }
    

    Workflow interface:

    import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
    import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
    import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;
    
    @Workflow
    @WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
    public interface GreeterWorkflow {
       @Execute(version = "1.0")
       public void greet();
    }
    

    Workflow implementation:

    import com.amazonaws.services.simpleworkflow.flow.core.Promise;
    
    public class GreeterWorkflowImpl implements GreeterWorkflow {
       private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();
    
       public void greet() {
         Promise<String> name = operations.getName();
         Promise<String> greeting = operations.getGreeting(name);
         operations.say(greeting);
       }
    }
    

    The worker that hosts both of them. Obviously it can be broken into separate activity and workflow workers:

    import com.amazonaws.ClientConfiguration;
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.BasicAWSCredentials;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
    import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
    import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
    
    public class GreeterWorker  {
       public static void main(String[] args) throws Exception {
         ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
    
         String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
         String swfSecretKey = System.getenv("AWS_SECRET_KEY");
         AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
    
         AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
         service.setEndpoint("https://swf.us-east-1.amazonaws.com");
    
         String domain = "helloWorldWalkthrough";
         String taskListToPoll = "HelloWorldList";
    
         ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
         aw.addActivitiesImplementation(new GreeterActivitiesImpl());
         aw.start();
    
         WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
         wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
         wfw.start();
       }
    }
    

    The workflow starter:

    import com.amazonaws.ClientConfiguration;
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.BasicAWSCredentials;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
    
    public class GreeterMain {
    
       public static void main(String[] args) throws Exception {
         ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
    
         String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
         String swfSecretKey = System.getenv("AWS_SECRET_KEY");
         AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
    
         AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
         service.setEndpoint("https://swf.us-east-1.amazonaws.com");
    
         String domain = "helloWorldWalkthrough";
    
         GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
         GreeterWorkflowClientExternal greeter = factory.getClient("someID");
         greeter.greet();
       }
    }