Search code examples
javaapache-beamapache-beam-io

Apache beam Text IO writer is not writing unbounded source to file


The following code runs without any issues in the beam direct runner. The sqs messages are consumed, but the messages aren't written into the destination location.


Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);

BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
        .apply("Get message contents", ParDo.of(new SqsMessageToJson()))
        .apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
        .apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        .apply("Write to GCS", TextIO.write()
            .withWindowedWrites()
            .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
            .to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
                options.getShardTemplate(),
                options.getOutputFilenameSuffix())
                .withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
            .withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();

Solution

  • Looks like it's an issue from Apache beam itself. We have opened the support ticket to google to look into this issue.