Search code examples

How to connect a sink to a external waveform port in REDHAWK?

I'm trying to write a unit test for a REDHAWK waveform. I would like to use stream sources to input data and stream/message sinks to store the output. I have written unit tests for components this way, but wanted to create a test for a waveform as well. I found a solution for connecting a StreamSource to a waveform's port, but have not been able to determine how to connect a sink to a waveform port.

For a source and a component (where self.comp is the component), normally one can use the following to connect them:

src = StreamSource(streamId='strm1', format='short')

For a source and a waveform (where is the waveform), I was able to get the following to work:

src = StreamSource(streamId='strm1', format='short')

However, for a sink I would normally call connect on the component:

sink = StreamSink('short')
self.comp.connect(sink, usesPortName='dataShort_out')

I tried to use a similar approach as for the source case by getting the port from the waveform as below:

sink = StreamSink('short')'dataShort_out').connectPort(sink, 'outputConn')

However, this gives the error:

File "/usr/local/redhawk/core/lib/python/ossie/cf/", line 86, in connectPort
  return self._obj.invoke("connectPort", _0_CF.Port._d_connectPort, args)
BAD_PARAM: CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType, CORBA.COMPLETED_NO, ["Expecting object reference, got <class 'bulkio.sandbox.streamsink.StreamSink'>", "Operation 'connectPort' parameter 0"])

I am not sure how I can get a CORBA obj ref for the sink to use here. Or is there another approach I can use here to connect the port to the sink?

I am using REDHAWK 2.2.2 on Centos 7.


  • I think I have found a solution to my own question. I ended up creating a new class that manages port connections that works for both sinks and sources. I called it ConnectionManager (hopefully it won't be confused with the ossie.utils.model.connection.ConnectionManager class.

    class ConnectionManager:
        def __init__(self):
            self.connections = list()
        def clear(self):
            del self.connections[:]
        def connect(self, usesPort, providesPort, id):
            usesPort.connectPort(providesPort, id)
            self.connections.append( (usesPort, id))
        def disconnectAll(self):
            for port, id in self.connections:

    Here's an example using a StreamSource ( is a ConnectionManager):

    strm = sb.StreamSource(streamID='strm1', format='short')'shortOut'),

    And an example using a StreamSink:

    sink = sb.StreamSink('short')'dataShort_out'),

    My unit test setUp method has a call to and the tearDown method a call to to clean up the connections after each test.

    The only thing I don't understand is the names of the ports for the sink and source classes. Using the {format}{In|Out} names work, but I don't know why.