I want to create my own Sink using scala for Flink and for that I need to extend the interface SinkFunction.
But I can't override the following invoke method.
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
This is my code:
class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context) { ... }
This gives the following error:
Type Context takes type parameters
If I change the code to add any type parameter:
class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context[Xpto]) { ... }
The error messages is different:
Method `invoke` overrides nothing.
I'm new to Scala, is there something to fix this that I'm missing ?
All the examples that I see in scala uses the following deprecated method:
/**
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {}
And StreamingFileSink.java implements using this:
...
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}
Is this <T>
wrong placed since it isn't used in anywhere in SinkFunction.Context ?
In this case You can simply go with:
override def invoke(elements: List[GenericRecord], context: SinkFunction.Context[_]) { ... }
And it should work like a charm.