Search code examples
pythonlinuxamazon-web-servicesbotoamazon-swf

Using Amazon SWF To communicate between servers


Use Amazon SWF to communicate messages between servers?

  1. On server A I want to run a script A
  2. When that is finished I want to send a message to server B to run a script B
  3. If it completes successfully I want it to clear the job from the workflow queue

I’m having a really hard time working out how I can use Boto and SWF in combination to do this. I am not after some complete code but what I am after is if anyone can explain a little more about what is involved.

  • How do I actually tell server B to check for the completion of script A?
  • How do I make sure server A wont pick up the completion of script A and try and run script B (since server B should run this)?
  • How do I actually notify SWF of script A completion? Is thee a flag, or a message, or what?

I’m pretty confused about all of this. What design should I use?


Solution

  • I think you ask some very good questions which highlight how helpful SWF can be as a service. In short, you don't tell your servers to coordinate work between themselves. Your decider orchestrates all this for you, with the help of SWF service.

    The implementation of your workflow will go as follows:

    1. Registering your workflow and its activities with the service (a one-off).
    2. Implement the decider and workers.
    3. Let your workers and deciders run.
    4. Start a new workflow.

    There is a number of ways to feed credentials into boto.swf's code. For the purposes of this exercise, I recommend exporting them to environment before running the code below:

    export AWS_ACCESS_KEY_ID=<your access key>
    export AWS_SECRET_ACCESS_KEY=<your secret key>
    

    1) To register the domain, workflow and activities execute the following:

    # ab_setup.py
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    ACTIVITY1 = 'ServerAActivity'
    ACTIVITY2 = 'ServerBActivity'
    VERSION = '1.0'
    
    swf.Domain(name=DOMAIN).register()
    swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
    swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
    swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
    

    2) Implement and run deciders and workers.

    # ab_decider.py
    import time
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    ACTIVITY1 = 'ServerAActivity'
    ACTIVITY2 = 'ServerBActivity'
    VERSION = '1.0'
    
    class ABDecider(swf.Decider):
    
        domain = DOMAIN
        task_list = 'default_tasks'
        version = VERSION
    
        def run(self):
            history = self.poll()
            # Print history to familiarize yourself with its format.
            print history
            if 'events' in history:
                # Get a list of non-decision events to see what event came in last.
                workflow_events = [e for e in history['events']
                                   if not e['eventType'].startswith('Decision')]
                decisions = swf.Layer1Decisions()
                # Record latest non-decision event.
                last_event = workflow_events[-1]
                last_event_type = last_event['eventType']
                if last_event_type == 'WorkflowExecutionStarted':
                    # At the start, get the worker to fetch the first assignment.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                       ACTIVITY1, VERSION, task_list='a_tasks')
                elif last_event_type == 'ActivityTaskCompleted':
                    # Take decision based on the name of activity that has just completed.
                    # 1) Get activity's event id.
                    last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                    completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                    # 2) Extract its name.
                    activity_data = history['events'][completed_activity_id]
                    activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                    activity_name = activity_attrs['activityType']['name']
                    # 3) Optionally, get the result from the activity.
                    result = last_event['activityTaskCompletedEventAttributes'].get('result')
    
                    # Take the decision.
                    if activity_name == ACTIVITY1:
                        # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                        decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                            ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                    elif activity_name == ACTIVITY2:
                        # Server B completed activity. We're done.
                        decisions.complete_workflow_execution()
    
                self.complete(decisions=decisions)
                return True
    

    The workers are much simpler, you don't need to use inheritance if you don't want to.

    # ab_worker.py
    import os
    import time
    import boto.swf.layer2 as swf
    
    DOMAIN = 'stackoverflow'
    ACTIVITY1 = 'ServerAActivity'
    ACTIVITY2 = 'ServerBActivity'
    VERSION = '1.0'
    
    class MyBaseWorker(swf.ActivityWorker):
    
        domain = DOMAIN
        version = VERSION
        task_list = None
    
        def run(self):
            activity_task = self.poll()
            print activity_task
            if 'activityId' in activity_task:
                # Get input.
                # Get the method for the requested activity.
                try:
                    self.activity(activity_task.get('input'))
                except Exception, error:
                    self.fail(reason=str(error))
                    raise error
    
                return True
    
        def activity(self, activity_input):
            raise NotImplementedError
    
    class WorkerA(MyBaseWorker):
        task_list = 'a_tasks'
    
        def activity(self, activity_input):
            result = str(time.time())
            print 'worker a reporting time: %s' % result
            self.complete(result=result)
    
    class WorkerB(MyBaseWorker):
        task_list = 'b_tasks'
    
        def activity(self, activity_input):
            result = str(os.getpid())
            print 'worker b returning pid: %s' % result
            self.complete(result=result)
    

    3) Run your deciders and workers. Your decider and workers may be running from separate hosts, or from one and the same machine. Open four terminals and run your actors:

    First your decider

    $ python -i ab_decider.py 
    >>> while ABDecider().run(): pass
    ... 
    

    Then worker A, you could do this from server A:

    $ python -i ab_workers.py 
    >>> while WorkerA().run(): pass
    

    Then worker B, possibly from server B but if you run them all from a laptop it will work just as well:

    $ python -i ab_workers.py 
    >>> while WorkerB().run(): pass
    ... 
    

    4) Finally, kick off the workflow.

    $ python
    Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
    [GCC 4.4.3] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import boto.swf.layer2 as swf
    >>> workflows = swf.Domain(name='stackoverflow').workflows()
    >>> workflows
    [<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
    >>> execution = workflows[0].start(task_list='default_tasks')
    >>> 
    

    Switch back to see what happens with your actors. They might disconnect from the service after one minute of inactivity. If that happens, press arrow-up+enter to re-enter the polling loop.

    You can now go to your AWS management console's SWF panel, check out how the executions are doing and view their history. Alternatively, you can query for it through the command line.

    >>> execution.history()
    [{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
    'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
    'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
    'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
    '1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
    'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
    {'startToCloseTimeout': '300', 'taskList': {'name': ...
    

    That's just an example of a workflow with serial execution of activities but it's also possible for the decider to schedule and coordinate parallel execution of activities.

    I hope this will at least get you started. For a slightly more complex example of a serial workflow, I recommend looking at this.