Search code examples
nipype

How to connect two simple functions using NiPype?


I am trying to replace our internal pipeline infrastructure with NiPype.

As a test, I want to connect to simple functions where the first gets an input and after some calculation returns an output which gets used for the second function as an input:

import numpy
import nipype.pipeline.engine as pe
from nipype.interfaces.utility import Function
from pipyTestWheels import *


def Connectivity( dtiVolume ):

  print "  CONNECTIVITY1::START"

  print "reading dti Volume " + dtiVolume

  print ">>> performing connectivity *nom*nom*nom*"

  a = numpy.zeros( ( 1000, 1000 ) )

  print "  CONNECTIVITY1::END"

  trkFile = 'another trk file'

  return trkFile

def RegisterFibers( trkFile ):

  print "  REGISTERFIBERS::START"

  print "reading trk File " + trkFile

  print ">>> performing trkFile registration"

  trkFileRegistered = '/tmp/tracksRegistered.trk'

  print "  REGISTERFIBERS::END"

  return trkFileRegistered


def test():

  ConnectivityNode = pe.Node( name='connectivity', interface=Function( function=Connectivity, input_names=['dtiVolume'], output_names=['trkFile'] ) )
  RegisterFibersNode = pe.Node( name='registerFibers', interface=Function( function=RegisterFibers, input_names=['trkFile'], output_names=['trkFileRegistered'] ) )

  pipeline = pe.Workflow( name='testWf' )
  pipeline.add_nodes( [ConnectivityNode, RegisterFibersNode] )
  pipeline.run( dtiVolume='safsafa' )

I get the following error:

Traceback (most recent call last):
  File "pipyTestNipype.py", line 65, in <module>
    test()
  File "pipyTestNipype.py", line 46, in test
    pipeline.run( dtiVolume='safsafa' )
TypeError: run() got an unexpected keyword argument 'dtiVolume'

If I just run pipeline.run(), then I get this error:

INFO:workflow:['execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node registerFibers in dir: /tmp/tmpxpW0lR/testWf/registerFibers
ERROR:workflow:['Node registerFibers failed to run on host ipmi.']
INFO:workflow:Saving crash info to /net/pretoria/local_mount/space/pretoria/2/chb/users/daniel.haehn/Projects/scripts/crash-20120215-101717-daniel.haehn-registerFibers.npz
INFO:workflow:Traceback (most recent call last):
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/plugins/linear.py", line 35, in run
    node.run(updatehash=updatehash)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1141, in run
    self._run_interface(execute=True)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1161, in _run_interface
    self._result = self._run_command(execute)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1253, in _run_command
    result = self._interface.run()
  File "/usr/lib/pymodules/python2.7/nipype/interfaces/base.py", line 775, in run
    runtime = self._run_interface(runtime)
  File "/usr/lib/pymodules/python2.7/nipype/interfaces/utility.py", line 382, in _run_interface
    out = function_handle(**args)
TypeError: RegisterFibers() takes exactly 1 argument (0 given)
Interface Function failed to run. 

INFO:workflow:Executing node connectivity in dir: /tmp/tmptN84LP/testWf/connectivity
ERROR:workflow:['Node connectivity failed to run on host ipmi.']
INFO:workflow:Saving crash info to /net/pretoria/local_mount/space/pretoria/2/chb/users/daniel.haehn/Projects/scripts/crash-20120215-101717-daniel.haehn-connectivity.npz
INFO:workflow:Traceback (most recent call last):
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/plugins/linear.py", line 35, in run
    node.run(updatehash=updatehash)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1141, in run
    self._run_interface(execute=True)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1161, in _run_interface
    self._result = self._run_command(execute)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 1253, in _run_command
    result = self._interface.run()
  File "/usr/lib/pymodules/python2.7/nipype/interfaces/base.py", line 775, in run
    runtime = self._run_interface(runtime)
  File "/usr/lib/pymodules/python2.7/nipype/interfaces/utility.py", line 382, in _run_interface
    out = function_handle(**args)
TypeError: Connectivity() takes exactly 1 argument (0 given)
Interface Function failed to run. 

INFO:workflow:***********************************
ERROR:workflow:could not run node: testWf.registerFibers
INFO:workflow:crashfile: /net/pretoria/local_mount/space/pretoria/2/chb/users/daniel.haehn/Projects/scripts/crash-20120215-101717-daniel.haehn-registerFibers.npz
ERROR:workflow:could not run node: testWf.connectivity
INFO:workflow:crashfile: /net/pretoria/local_mount/space/pretoria/2/chb/users/daniel.haehn/Projects/scripts/crash-20120215-101717-daniel.haehn-connectivity.npz
INFO:workflow:***********************************
Traceback (most recent call last):
  File "pipyTestNipype.py", line 65, in <module>
    test()
  File "pipyTestNipype.py", line 46, in test
    pipeline.run()
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/engine.py", line 467, in run
    runner.run(execgraph, updatehash=updatehash, config=self.config)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/plugins/linear.py", line 49, in run
    report_nodes_not_run(notrun)
  File "/usr/lib/pymodules/python2.7/nipype/pipeline/plugins/base.py", line 81, in report_nodes_not_run
    raise RuntimeError('Workflow did not execute cleanly. Check log for details')
RuntimeError: Workflow did not execute cleanly. Check log for details

What shall I do?

Thanks!


Solution

  • Try this:

    import nipype.pipeline.engine as pe
    from nipype.interfaces.utility import Function
    
    
    def Connectivity(dtiVolume):
        import numpy
        print "  CONNECTIVITY1::START"
        print "reading dti Volume " + dtiVolume
        print ">>> performing connectivity *nom*nom*nom*"
        a = numpy.zeros((1000, 1000))
        print "  CONNECTIVITY1::END"
        trkFile = 'another trk file'
        return trkFile
    
    
    def RegisterFibers(trkFile):
        print "  REGISTERFIBERS::START"
        print "reading trk File " + trkFile
        print ">>> performing trkFile registration"
        trkFileRegistered = '/tmp/tracksRegistered.trk'
        print "  REGISTERFIBERS::END"
        return trkFileRegistered
    
    
    def test():
        ConnectivityNode = pe.Node(name='connectivity', interface=Function(function=Connectivity, input_names=['dtiVolume'], output_names=['trkFile']))
        ConnectivityNode.inputs.dtiVolume = 'safsafa'
        RegisterFibersNode = pe.Node(name='registerFibers', interface=Function(function=RegisterFibers, input_names=['trkFile'], output_names=['trkFileRegistered']))
    
        pipeline = pe.Workflow(name='testWf')
        pipeline.connect(ConnectivityNode, 'trkFile', RegisterFibersNode, 'trkFile')
        pipeline.run()
    

    Basically you did not connect the two nodes together and you were trying to set inputs as a parameter of the run() method instead of setting them at the node level. Hope this helps.