Search code examples
drake

Is there a way to set multiple outputs from the same function while authoring a LeafSystem in drake?


I am trying to output multiple vector results computed in a single function as different outputs ports of the same system.

I would like to do something like,

class TestSplit(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)
        self.DeclareVectorOutputPort('port-1', BasicVector(1), self.SetOutputReference)
        self.DeclareVectorOutputPort('port-2', BasicVector(1), self.SetOutputReference)

    def SetOutputReference(self, context, output):
        res1, res2 = foo()
        self.GetOutputPort("port-1").set_value(res1)
        self.GetOutputPort("port-2").set_value(res2)

However, it seems like this is currently not possible. I am aware that I could concatenate res1, res2 and then use the Demultiplexer in the diagram to split the outputs, but I would like to know if there is a way to directly set it somehow.

Seems relevant: https://stackoverflow.com/a/76027180/21652759


EDIT: The comments already answer the question. Adding another version of acheiving the same.

class Option4(LeafSystem):
    """Provides two output ports from a single CalcSumDiff function.
    Option 4 uses caching so that CalcOutput reuses the tuple of values
    computed by the CalcAddSub helper function.
    """

    def __init__(self):
        LeafSystem.__init__(self)
        self._pair = self.DeclareVectorInputPort("pair", BasicVector(2))
        self._add_sub = self.DeclareCacheEntry(
            description="add_sub",
            value_producer=ValueProducer(
                allocate=lambda: AbstractValue.Make(tuple()),
                calc=self.CalcAddSub),
            prerequisites_of_calc={self._pair.ticket()})
        self.DeclareVectorOutputPort(
            "add", BasicVector(1), partial(self.CalcOutput, port_index=0),
            prerequisites_of_calc={self._add_sub.ticket()})
        self.DeclareVectorOutputPort(
            "sub", BasicVector(1), partial(self.CalcOutput, port_index=1),
            prerequisites_of_calc={self._add_sub.ticket()})

    def CalcAddSub(self, context, output):
        u = self.GetInputPort("pair").Eval(context)
        add = np.array([u[0] + u[1]])
        sub = np.array([u[0] - u[1]])
        output.set_value((add, sub))

    def CalcOutput(self, context, output, port_index):
        y0, y1 = self._add_sub.Eval(context)
        if port_index == 0:
            output.set_value(y0)
        if port_index == 1:
            output.set_value(y1)

Solution

  • There are two options.

    The first option is the boring way -- compute the "single function" twice, and throw away half of its results for each output port. It looks like this:

    class Option1(LeafSystem):
        """Provides two output ports from a single CalcSumDiff function.
        Option 1 does not use caching, so both ouput ports calculate both values
        each time and discard one of them.
        """
    
        def __init__(self):
            LeafSystem.__init__(self)
            self.DeclareVectorInputPort("pair", BasicVector(2))
            self.DeclareVectorOutputPort("add", BasicVector(1), self.CalcAdd)
            self.DeclareVectorOutputPort("sub", BasicVector(1), self.CalcSub)
    
        def CalcAddSub(self, context):
            u = self.GetInputPort("pair").Eval(context)
            add = np.array([u[0] + u[1]])
            sub = np.array([u[0] - u[1]])
            return (add, sub)
    
        def CalcAdd(self, context, output):
            y0, _ = self.CalcAddSub(context)
            output.set_value(y0)
    
        def CalcSub(self, context, output):
            _, y1 = self.CalcAddSub(context)
            output.set_value(y1)
    

    The second option stores the intermediate result in cache entry, which allows the output ports to share the intermediate calculation, only computing it once each time the the input port changes:

    class Option2(LeafSystem):
        """Provides two output ports from a single CalcSumDiff function.
        Option 2 uses caching so that CalcAdd and CalcSub reuse the tuple of values
        computed by the CalcAddSub helper function.
        """
    
        def __init__(self):
            LeafSystem.__init__(self)
            self._pair = self.DeclareVectorInputPort("pair", BasicVector(2))
            self._add_sub = self.DeclareCacheEntry(
                description="add_sub",
                value_producer=ValueProducer(
                    allocate=lambda: AbstractValue.Make(tuple()),
                    calc=self.CalcAddSub),
                prerequisites_of_calc={self._pair.ticket()})
            self.DeclareVectorOutputPort(
                "add", BasicVector(1), self.CalcAdd,
                prerequisites_of_calc={self._add_sub.ticket()})
            self.DeclareVectorOutputPort(
                "sub", BasicVector(1), self.CalcSub,
                prerequisites_of_calc={self._add_sub.ticket()})
    
        def CalcAddSub(self, context, output):
            u = self.GetInputPort("pair").Eval(context)
            add = np.array([u[0] + u[1]])
            sub = np.array([u[0] - u[1]])
            output.set_value((add, sub))
    
        def CalcAdd(self, context, output):
            y0, _ = self._add_sub.Eval(context)
            output.set_value(y0)
    
        def CalcSub(self, context, output):
            _, y1 = self._add_sub.Eval(context)
            output.set_value(y1)
    

    Here's another spelling that has fewer functions.

    class Option3(LeafSystem):
        """Provides three outputs from a single CalcOutput function.
        """
    
        def __init__(self):
            LeafSystem.__init__(self)
            self._input = self.DeclareVectorInputPort("input", BasicVector(2))
            self._cache = self.DeclareCacheEntry(
                description="cache",
                value_producer=ValueProducer(
                    allocate=lambda: AbstractValue.Make(dict()),
                    calc=lambda context, output: output.set_value(
                        self.CalcOutput(context))))
            self.DeclareVectorOutputPort(
                "add", BasicVector(1),
                lambda context, output: output.set_value(
                    self._cache.Eval(context)["add"]),
                prerequisites_of_calc={self._cache.ticket()})
            self.DeclareVectorOutputPort(
                "sub", BasicVector(1),
                lambda context, output: output.set_value(
                    self._cache.Eval(context)["sub"]),
                prerequisites_of_calc={self._cache.ticket()})
            self.DeclareVectorOutputPort(
                "mul", BasicVector(1),
                lambda context, output: output.set_value(
                    self._cache.Eval(context)["mul"]),
                prerequisites_of_calc={self._cache.ticket()})
    
        def CalcOutput(self, context):
            u = self.GetInputPort("input").Eval(context)
            return dict(
                add=np.array([u[0] + u[1]]),
                sub=np.array([u[0] - u[1]]),
                mul=np.array([u[0] * u[1]]),
            )