I am currently using the code below to write parquet via Avro. This code writes it to a file system but I want to write to S3.
try {
StopWatch sw = StopWatch.createStarted();
Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
final String parquetFile = "parquet/data.parquet";
final Path path = new Path(parquetFile);
ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withConf(new org.apache.hadoop.conf.Configuration())
.withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
for (Map<String, Object> row : message.getTransformedMessage()) {
StopWatch stopWatch = StopWatch.createStarted();
final GenericRecord record = new GenericData.Record(avroSchema);
row.forEach((k, v) -> {
record.put(k, v);
//todo: Write to S3. We should probably write via the AWS objects. This does not show that.
System.out.println("Total Time: " + sw);
} catch (Exception e) {
//do somethign here. retryable? non-retryable? Wrap this excetion in one of these?
This writes to a file fine, but how do I get it to stream it into the AmazonS3 api? I have found some code on the web using the Hadoop-aws jar, but that requires some Windows exe files to work and, of course, we want to avoid that. Currently I am using only:
So the question is, is there a way to intercept the output stream on the AvroParquetWriter so I can stream it to S3? The main reason I want to do this is for retries. S3 automagically retries up to 3 times. This would help us out a lot.
This does depend on the hadoop-aws jar, so if you're not willing to use that I'm not sure I can help you. I am, however, running on a mac and do not have any windows exe files, so I'm not sure where you say those are coming from. The AvroParquetWriter already depends on Hadoop, so even if this extra dependency is unacceptable to you it may not be a big deal to others:
You can use an AvroParquetWriter to stream directly to S3 by passing it a Hadoop Path that is created with a URI parameter and setting the proper configs.
val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)
val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)
val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()
I used the following dependencies (sbt format):
"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"