Search code examples
scalaapache-flink

Flink StreamingFileSink RowFormatBuilder withBucketAssigner returns Any?


Why does this configuration result in Any type? I can't call .build()! My flink version is 1.10.0 and scala version is 2.11 Link to screenshot

    val sink = StreamingFileSink
      .forRowFormat(new Path("s3a://123"), csvEncoder)
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
          .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
          .withMaxPartSize(128 * 1024 * 1024)
          .build()
      )
      .withBucketAssigner(
        new BucketAssigner[UserEvent, String] {
          override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
          override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
        }
      ) // this returns Any!!!
      .build() // can't call .build()

Solution

  • The problem is the type inference of Scala in combination with the self-type idiom used by the StreamingFileSink builders.

    As a quick fix you could insert a cast:

    val sink = StreamingFileSink
      .forRowFormat(new Path("s3a://123"), csvEncoder)
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
          .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
          .withMaxPartSize(128 * 1024 * 1024)
          .build()
      )
      .withBucketAssigner(
        new BucketAssigner[UserEvent, String] {
          override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
          override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
        }
      ).asInstanceOf[StreamingFileSink.RowFormatBuilder[UserEvent, String, _]]
      .build()
    

    The proper fix requires changes to Flink. You can track FLINK-16684 to get notified when the problem is properly fixed.

    Update

    The problem has been fixed with Flink 1.10.1 and 1.11.0.