I have a pipeline with a set of PTransforms and my method is getting very long.
I'd like to write my DoFns and my composite transforms in a separate package and use them back in my main method. With python it's pretty straightforward, how can I achieve that with Scio? I don't see any example of doing that. :(
withFixedWindows(
FIXED_WINDOW_DURATION,
options = WindowOptions(
trigger = groupedWithinTrigger,
timestampCombiner = TimestampCombiner.END_OF_WINDOW,
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
allowedLateness = Duration.ZERO
)
)
.sumByKey
// How to write this in an another file and use it here?
.transform("Format Output") {
_
.withWindow[IntervalWindow]
.withTimestamp
}
If I understand your question correctly, you want to bundle your map, groupBy, ...
transformations in a separate package, and use them in your main pipeline.
One way would be to use applyTransform
, but then you would end up using PTransforms, which are not scala-friendly.
You can simply write a function that receives an SCollection and returns the transformed one, like:
def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???
But if you intend to write your own Source/Sink, take a look at the ScioIO class