An Apache Beam PTransform
can have with_outputs
and with_output_types
appended to it. Eg,
pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int])
and
(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings')
)
(Both of these examples are taken from Apache Beam documentation, if you want some context.)
But I cannot seem to find any documentation on how to combine them. For instance, can I do something like this?
(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings')
.with_output_types(str, IndexError, str)
)
Disclaimer: I could be wrong, since you have not described the actual problem/error. Furthermore, the DirectRunner
(which is used in the playground) completely ignores any typehints! In order to verify that this indeed solves your issue, this has to be executed in a runner which actually considers typehints (e.g., Dataflow
).
Assuming the error you encounter is
TypeError: PTransform.with_output_types() takes 2 positional arguments but 4 were given
If you look at the documentation of with_output_types
then it only expects a single typehint. However, you are providing 3 individual ones. You need to wrap your typehints into a Tuple
, e.g.
import apache_beam as beam
from typing import Tuple # <- this is the important piece
class DoFnWithOutputs(beam.DoFn):
def process(self, element):
if element == 1:
yield "one"
else:
yield beam.pvalue.TaggedOutput("not_one", False)
with beam.Pipeline() as pipeline:
input_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 1])
being_one, not_being_one = (
input_data
| "DoFn" >> beam.ParDo(DoFnWithOutputs())
.with_outputs("not_one", main="one")
.with_output_types(Tuple[bool, str]) # Note the wrapping within 'Tuple'
)
(
being_one
| "print 1" >> beam.Map(print)
)
(
not_being_one
| "print != 1" >> beam.Map(print)
)
which you can execute directly within the Beam playground.