Search code examples
scalaapache-flink

Create my own sink with SinkFunction in Scala


I want to create my own Sink using scala for Flink and for that I need to extend the interface SinkFunction.

But I can't override the following invoke method.

default void invoke(IN value, Context context) throws Exception {
    invoke(value);
}

This is my code:

class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
    override def invoke(elements: List[GenericRecord], context: Context) { ... }

This gives the following error:

Type Context takes type parameters

If I change the code to add any type parameter:

class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
    override def invoke(elements: List[GenericRecord], context: Context[Xpto]) { ... }

The error messages is different:

Method `invoke` overrides nothing.

I'm new to Scala, is there something to fix this that I'm missing ?

All the examples that I see in scala uses the following deprecated method:

/**
 * @deprecated Use {@link #invoke(Object, Context)}.
 */
@Deprecated
default void invoke(IN value) throws Exception {}

And StreamingFileSink.java implements using this:

...

@Public // Interface might be extended in the future with additional methods.
interface Context<T> {

    /** Returns the current processing time. */
    long currentProcessingTime();

    /** Returns the current event-time watermark. */
    long currentWatermark();

    /**
     * Returns the timestamp of the current input record or {@code null} if the element does not
     * have an assigned timestamp.
     */
    Long timestamp();
}

Is this <T> wrong placed since it isn't used in anywhere in SinkFunction.Context ?


Solution

  • In this case You can simply go with:

    override def invoke(elements: List[GenericRecord], context: SinkFunction.Context[_]) { ... }
    

    And it should work like a charm.