Search code examples
google-cloud-platformgoogle-cloud-spanner

How do solve run time exception: Attempted to get side input window for GlobalWindow from non-global WindowFn


I am struggling to figure out what how I can resolve an issue I am seeing with this data flow job. I saw a similar thread on the apache beam archives question thread but I did not quite understand how to use this information.

Essentially data is being streamed into Big Query (which works), I am trying to write these BQ rows into spanner in the same dataflow job which raises the following runtime exception:

    java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
    org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:47) ....

The relevant section of the data flow graph can be seen here data flow graph and the code I am using to write to spanner is here:

sensorReports
        .apply("WindowSensorReportByMonth",
                Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(5))).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
                        .triggering(AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(Duration.standardMinutes(1)))
                        .discardingFiredPanes())
        .apply("CreateSensorReportMutation", ParDo.of(new RowToMutationTransform()))
        .apply("Write to Spanner",
                SpannerIO.write()
                        .withDatabaseId(propertiesUtils.getSpannerDBId())
                        .withInstanceId(propertiesUtils.getSpannerInstanceId())
                        .withProjectId(propertiesUtils.getSpannerProjectId())
                        .withBatchSizeBytes(0));

Solution

  • SpannerIO.write() internally reads the DB schema using a global window and uses this as a side input, so your non-global-windowed Mutations are clashing with it.

    You could put all your Mutations into a global window before passing to Spanner.IO.write()

    .apply("To Global Window", Window.into(new GlobalWindows()))
    

    but in BEAM versions 2.5-2.8, this will result in either an error or nothing ever being written (as SpannerIO never supported streaming pipelines).

    Edited answer:

    However, BEAM before version 2.9.0 does not support streaming pipelines. V2.4 and earlier did, provided you don't pass a windowed PCollection to it.

    You will be pleased to hear that all is fixed in version 2.9 (release in progress) where the SpannerIO both supports streaming writes and handles the windowing correctly.