Search code examples
javajoinstreamingapache-beambeam-sql

How to fix "Joining unbounded PCollections is currently only supported for non-global windows with triggers" in Apache Beam


I'm trying to join 2 unbounded sources using Apache Beam Java SDK. While Joining Im getting the below error message.

Exception in thread "main" java.lang.UnsupportedOperationException: Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), accumulationMode=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST} is not supported at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access$1500(BeamJoinRel.java:98) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:330) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:308) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:67) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:48) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:49) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:65) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:100) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:167) at xyz.xyz.main(xyz.java:64)

I have tried using both Fixed & Sliding Window along with triggering (pastEndOfWindow & pastFirstElementInPane) with zero allowed lateness. Tried both Accumalate & Discard fired panes. I get the same error message everytime.

Below are 2 snippets i tried using both fixed & sliding window.

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

I simply wanted to implement a sql transform with a sliding window, Trigger with delay and allow lateness. Kindly guide me through to implement it.

Thanks, Gowtham


Solution

  • From the comment, if I understand it correctly, the desired behavior is:

    • join two streams;
    • emit results every 30 seconds in real world time;
    • if the data cannot be matched, wait for the corresponding matching record for 30 min max;
    • drop the records after 30 min;

    Basically it's kind of continuous sliding matching of the last 30 min of data in both streams, and results are emitted every 30 seconds.

    Good news is that it should be possible to implement in Beam Java (probably in Python as well). Bad news it would probably be non-trivial in Java and I don't think it's possible at all in SQL at the moment.

    What it would probably look like:

    • input should be in global window;
    • have a stateful ParDo (or this) which keeps track of all seen elements by storing them in a state cell:
      • you will probably need to use either a side-input or apply a CoGroupByKey beforehand to have access to elements from both inputs in the same ParDo;
      • side-inputs and CoGroupByKey have different semantics and might not be easy to work with;
    • on each input manually check the state for the matching records;
    • either emit results right away or keep them in another state cell;
    • have a timer that would purge old unmatched records:
      • you might need to manually keep track of timestamps and other things;
    • apply desired window/trigger to the output if needed;

    I suggest you read through this example, it does the timer and state part of what you need (it waits for matching records, keeps the unmatched records in the state, and clears state on timer firing) and uses a CoGroupByKey. You might have a better idea of how it works after you understand this example.