The following code runs without any issues in the beam direct runner. The sqs messages are consumed, but the messages aren't written into the destination location.
Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
.apply("Get message contents", ParDo.of(new SqsMessageToJson()))
.apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
.apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
.apply("Write to GCS", TextIO.write()
.withWindowedWrites()
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
.to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
options.getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
.withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();
Looks like it's an issue from Apache beam itself. We have opened the support ticket to google to look into this issue.