I am trying to run my Beam code on Spark for a POC. I am running the application on Google Cloud Dataproc for testing. It is a very simple test to read from a PubSub topic and write the message to a bucket on Google Cloud Storage. Dataproc cluster has the right version for Spark and is enabled to access other GCP API's.
I tried with FileIO aswell but that did not work either. I tried publishing to another PubSub topic instead of writing and that worked but that is not my use case. I tried printing before writing with TextIO and that confirmed that I can read messages from PubSub.
Here is the pipeline:
PCollection<String> messages = pipeline
.apply(PubsubIO.readStrings().fromSubscription(sub))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());
pipeline.run();
I don't see any logs on Dataproc job output. No errors or anything at all. There is no file on bucket either.
I found that this is a problem of triggering. Here is the detailed discussion:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E
I fixed this by changing my windowing transform into an early firing trigger:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())