I am having trouble with resolving this error in Flink (Version 1.11.0):
java: no suitable method found for process(com.xyz.myPackage.operators.windowed.ComputeFeatures)
method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
(cannot infer type-variable(s) R
(argument mismatch; com.xyz.myPackage.operators.windowed.ComputeFeatures cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
(cannot infer type-variable(s) R
(actual and formal argument lists differ in length))
This is how I create a keyed windowed stream:
timestampedStreamElementDataStream
.keyBy(StreamElement::getId)
.window(SlidingEventTimeWindows.of(Time.seconds(600),Time.seconds(60)))
.process(new ComputeFeatures());
And here is how my ComputeFeatures function looks like:
public class ComputeFeatures extends ProcessWindowFunction<
StreamElement,
StreamElement,
Long,
TimeWindow> {
@Override
public void process(Long key,
Context context,
Iterable<StreamElement> elements,
Collector<StreamElement> out) throws Exception {
System.out.println("In windowed function");
}
}
The StreamElement::getId
returns a Long
so everything regarding types should be correct, but it seems that Flink still has trouble inferring a type. I am looking for ideas how to solve this.
NOTE: This issue seems related but it didn't fit my problem: LINK
EDIT 1:
As suggested by David I tried autogenerating the overridden process
function with IntelliJ, but the issue still remains the same. The autogenerated code looks like this in case of specifying types:
public class ComputeFeatures extends ProcessWindowFunction<StreamElement,StreamElement,Long,TimeWindow> {
@Override
public void process(Long aLong,
ProcessWindowFunction<StreamElement, StreamElement, Long, TimeWindow>.Context context,
Iterable<StreamElement> elements,
Collector<StreamElement> out) throws Exception {
}
And like this when I omit type specification:
public class ComputeFeatures extends ProcessWindowFunction {
@Override
public void process(Object o, Context context, Iterable elements, Collector out) throws Exception {
System.out.println("In windowed function");
}
EDIT 2:
Maybe relevant: When I hover over new ComputeFeatures()
IntelliJ displays this infobox:
Required type:
ProcessWindowFunction
<com.xyz.myPackage.entities.StreamElement,
R,
java.lang.Long,
org.apache.flink.streaming.api.windowing.windows.TimeWindow>
Provided:
ComputeFeatures
reason: no instance(s) of type variable(s) R exist so that ComputeFeatures conforms to ProcessWindowFunction<StreamElement, R, Long, TimeWindow>
Eh stupid mistake, the code works as it is, the problem was that IntelliJ imported a wrong ProcessWindowFunction
(the Scala variant). After changing that everything worked as expected