I have oldest version Flink and want to update it to latest stable version. BucketingSink
is depricated in latest version and I try to change it with StreamingFileSink
. To init it I use StreamingFileSink.forBulkFormat
but have the error:
type arguments [T] do not conform to method forSpecificRecord's type parameter bo
unds [T <: org.apache.avro.specific.SpecificRecordBase]
[ERROR] .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
Also I cant find how to set bucketer:DateTimeBucketer[T], inactiveBucketThreshold, writer: Writer[T]
Could u help me find the right way.
Old code:
trait Runner[T <: SpecificRecordBase] extends Serializable {
def createHdfsSink(conf: FlinkConfig, path: String): BucketingSink[T] = {
val bucketer = new DateTimeBucketer[T]
val sink = new BucketingSink[T](s"${conf.output}/$path")
sink
.setBatchSize(toBytes(conf.batchSize))
.setBucketer(bucketer)
.setInactiveBucketThreshold(toMillis(conf.inactiveBucketThreshold))
.setWriter(writer)
.setPendingPrefix(pendingPrefix)
.setBatchRolloverInterval(conf.fileOpenIntervalTime)
}
New code with errors:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
.build()
// TODO: .withOutputFileConfig()
sink
}
I think, you should use forReflectRecord(Class<T> type)
method instead, which will use reflection to create the schema for the type and use that schema to write the records. The custom bucket assigner is configured during StreamingFileSink
setup and is specified via .withBucketAssigner(BucketAssigner<IN, ID> assigner)
method call.
So, finally your StreamingFileSink
builder will look as follows:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](classOf[T]) )
.withBucketAssigner(bucketAssigner)
.build()
sink
}