Search code examples
pythoncluster-computingdistributed-computingpipelineruffus

Ruffus pipeline with internal inputs


I'd like to create a pipeline with the Ruffus package for Python and I am struggling with its simplest concepts. Two tasks should be executed one after the other. The second task depends on output of the first task. In Ruffus documentation everything is designed for import/export from/to external files. I'd like to handle internal data types like dictionaries.

The problem is that @follows doesn't take inputs and @transform doesn't take dicts. Am I missing something?

def task1():
    # generate dict
    properties = {'status': 'original'}
    return properties

@follows(task1)
def task2(properties):
    # update dict
    properties['status'] = 'updated'
    return properties

Eventually the pipeline should combine a set of functions in a class that update the class object on the go.


Solution

  • You should only use the Ruffus decorators when there are input/output files. For example, if task1 generates file1.txt and this is the input for task2, which generates file2.txt then you could write a pipeline as follows:

    @originate('file1.txt')
    def task1(output):
        with open(output,'w') as out_file:
            # write stuff to out_file
    
    @follows(task1)
    @transform(task1, suffix('1.txt'),'2.txt')
    def task2(input_,output):
        with open(input_) as in_file, open(output,'w') as out_file:
            # read stuff from in_file and write stuff to out_file
    

    If you just want to take a dictionary as an input, you don't need Ruffus, you can just order the code appropriately (as it will run sequentially) or call task1 in task2:

    def task1():
        properties = {'status': 'original'}
        return properties
    
    def task2():
        properties = task1()
        properties['status'] = 'updated'
        return properties