Search code examples
pythonpipelineneuraxle

How to correctly implement a Neuraxle pipeline step that filters data_inputs?


I am trying to implement a BaseStep in neuraxle (0.5.2) that filters data_input (and expected_output accordingly).

class DataFrameQuery(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
    def __init__(self, query):
        super().__init__()
        self.query = query
    
    def transform(self, data_input):
        data_input, expected_output = data_input
        # verify that input and output are either pd.DataFrame or pd.Series
        # ... [redacted] ...
        new_data_input = data_input.query(self.query)
        if all(output is None for output in expected_output):
            new_expected_output = [None] * len(new_data_input)
        else:
            new_expected_output = expected_output.loc[new_data_input.index]
        return new_data_input, new_expected_output

This naturally (in most cases) will lead to a change in len(data_inputs) (and expected_outputs). In the recent version of neuraxle, I am getting an AssertionError:

data_input = pd.DataFrame([{"A": 1, "B": 1}, {"A": 2, "B": 2}], index=[1, 2])
expected_output = pd.Series([1, 2], index=[1, 2])
pipeline = Pipeline([
    DataFrameQuery("A == 1")
])
pipeline.fit_transform(data_input, expected_output)
AssertionError: InputAndOutputTransformerMixin: 
    Caching broken because there is a different len of current ids, and data inputs. 
    Please use InputAndOutputTransformerWrapper if you plan to change the len of the data inputs.

From my understanding, this is where Neuraxle's Handler Methods should come into play. However, so far I have not found one to use which enables me to update the current_ids for the inputs and outputs after the transformation (I guess it should be _did_transform, but that doesn't appear to get called).

Generally:

  • What is the correct way to update current_ids for both inputs and expected outputs after a transformation (in the same step)?
  • What are the aspects to watch out for when applying side-effects on the data_container? Are the identifiers e.g. used to split data for SIMD-parallelism? Are the new identifiers generally expected to be a sequence of integers?

Edit: I have also tried setting the savers and using the InputAndOutputTransformerWrapper as described here. Still getting the following error (might be because I am not sure where to call the handle_transform):

AssertionError: InputAndOutputTransformerWrapper: 
    Caching broken because there is a different len of current ids, and data inputs.
    Please resample the current ids using handler methods, or create new ones by setting the wrapped step saver to HashlibMd5ValueHasher using the BaseStep.set_savers method.

Edit: For now I have solved the problem as follows:


class OutputShapeChangingStep(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
    def __init__(self, idx):
        super().__init__()
        self.idx = idx
        
    def _update_data_container_shape(self, data_container):
        assert len(data_container.expected_outputs) == len(data_container.data_inputs)
        data_container.set_current_ids(range(len(data_container.data_inputs)))
        data_container = self.hash_data_container(data_container)
        return data_container
    
    def _set_data_inputs_and_expected_outputs(self, data_container, new_inputs, new_expected_outputs) -> DataContainer:
        data_container.set_data_inputs(new_inputs)
        data_container.set_expected_outputs(new_expected_outputs)
        data_container = self._update_data_container_shape(data_container)
        return data_container
    
    def transform(self, data_inputs):
        data_inputs, expected_outputs = data_inputs
        return data_inputs[self.idx], expected_outputs[self.idx]

I am likely "wrongly" overriding the _set_data_inputs_and_expected_outputs of InputAndOutputTransformerMixin in this case (would _transform_data_container be the better choice?), but like this updating the current_ids (and rehashing the container) appears possible. However, I'd still be interested in how to do this more in-line with Neuraxle's API expectations.


Solution

  • Personally, my favorite way to do it is to only use handler methods. It is much cleaner in my opinion.

    Usage example with handler methods :

    class WindowTimeSeries(ForceHandleMixin, BaseTransformer):
       def __init__(self):
          BaseTransformer.__init__(self)
          ForceHandleMixin.__init__(self)
    
       def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext) -> DataContainer:
          di = data_container.data_inputs
          new_di, new_eo = np.array_split(np.array(di), 2)
    
          return DataContainer(
            summary_id=data_container.summary_id,
            data_inputs=new_di,
            expected_outputs=new_eo
          )
    

    This way, the current ids will get recreated, and hashed with the default behavior. Note: the summary id is the most important thing. It is created at the beginning, and it is rehashed with hyperparams... If needed, you could also generate new current ids with a custom saver like the HashlibMd5ValueHasher.

    Edit, there was indeed a bug. This is fixed here : https://github.com/Neuraxio/Neuraxle/pull/379

    Usage example :

    step = InputAndOutputTransformerWrapper(WindowTimeSeriesForOutputTransformerWrapper()) \
        .set_hashers([HashlibMd5ValueHasher()])
    step = StepThatInheritsFromInputAndOutputTransformerMixin() \
         .set_hashers([HashlibMd5ValueHasher()])