Search code examples
apache-beamgoogle-cloud-dataproc

Apache Beam TextIO does not work with Spark Runner


I am trying to run my Beam code on Spark for a POC. I am running the application on Google Cloud Dataproc for testing. It is a very simple test to read from a PubSub topic and write the message to a bucket on Google Cloud Storage. Dataproc cluster has the right version for Spark and is enabled to access other GCP API's.

I tried with FileIO aswell but that did not work either. I tried publishing to another PubSub topic instead of writing and that worked but that is not my use case. I tried printing before writing with TextIO and that confirmed that I can read messages from PubSub.

Here is the pipeline:

PCollection<String> messages = pipeline
    .apply(PubsubIO.readStrings().fromSubscription(sub))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
 messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());

pipeline.run();

I don't see any logs on Dataproc job output. No errors or anything at all. There is no file on bucket either.


Solution

  • I found that this is a problem of triggering. Here is the detailed discussion:
    https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E

    I fixed this by changing my windowing transform into an early firing trigger:

    .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
               .triggering(AfterWatermark.pastEndOfWindow()
                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                         .alignedTo(Duration.standardSeconds(10))))
                    .withAllowedLateness(Duration.standardSeconds(10))
                    .discardingFiredPanes())