Search code examples
amazon-s3apache-flinkflink-streaming

Streaming Sink to S3


I am trying create an s3 sink for my streaming output. I figured a BucketingSink would be fine, since it is used for HDFS. But it seems that an S3 url is not recognized as hdfs. I get the following error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 's3' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.

Is there a way to make S3 work for BucketingSink, or is there another option other than BucketingSink that I can use? I am currently running 1.5.2. Would be happy to provide any additional information.

Thank you!

Edit:

My sink creation/using looks like the following:

val s3Sink = new BucketingSink[String]("s3://s3bucket/sessions")
s3Sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
s3Sink.setWriter(new StringWriter[String]())
s3Sink.setBatchSize(200)
s3Sink.setPendingPrefix("sessions-")
s3Sink.setPendingSuffix(".csv")

// Create stream and do stuff here 
stream.addSink(s3Sink)

Solution

  • Probably you have to include hadoop-aws jar to your Flink job. Refer to this link will help: https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#provide-s3-filesystem-dependency