Search code examples
javaapache-flinkflink-streaming

Apache Flink Process Function errors


I try to test simple Process Function of Apache Flink with java api.

  • IDE: Visual Studio code 1.87.1

  • Flink: 1.18.1

CountWithTimestamp.java

public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

CountWithTimeoutFunction.java

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Long>, Tuple2<String, Long>>{

    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        // TODO Auto-generated method stub
        state = this.getRuntimeContext().getState(new ValueStateDescriptor<>("My_State", CountWithTimestamp.class));
    }
    
    @Override
    public void processElement(Tuple2<String, Long> value, Context ctx, 
        Collector<Tuple2<String, Long>> out) throws Exception {
        // TODO Auto-generated method stub
        CountWithTimestamp current = state.value();
        
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }
        
        current.count++;
        current.lastModified = ctx.timestamp();
        
        state.update(current);
        
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        // TODO Auto-generated method stub
        CountWithTimestamp result = state.value();
        
        if (timestamp == result.lastModified + 60000) {
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

JavaFlinkTest.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class JavaFlinkTest {

    public static void main( String[] args ) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Long>> ds = env.fromElements(
            "To be, or not to be, that is the question:",
            "Whether 'tis nobler in the mind to suffer",
            "The slings and arrows of outrageous fortune",
            "Or to take arms against a sea of troubles,"
            ).flatMap(new Splitter())
             .keyBy(value -> value.f0)
             .process(new CountWithTimeoutFunction(), TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));  --> This line throws Errors

        ds.print();

        env.execute("Window WordCount");
        env.close();
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Long>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Long>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Long>(word, 1L));
            }
        }
    }
}

But the errors occur in KeyedStream.process method. And the error messages are,

The method process(ProcessFunction<Tuple2<String,Long>,R>, TypeInformation<R>) in the type KeyedStream<Tuple2<String,Long>,String> is not applicable for the arguments (CountWithTimeoutFunction, TypeInformation<Tuple2<String,Long>>)Java(67108979)
Go to Super Implementation

<R> SingleOutputStreamOperator<R> org.apache.flink.streaming.api.datastream.KeyedStream.process(ProcessFunction<Tuple2<String, Long>, R> processFunction, TypeInformation<R> outputType)
Deprecated. Use KeyedStream.process(KeyedProcessFunction, TypeInformation)

Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream.

The function will be called for every element in the input streams and can produce zero or more output elements. Contrary to the DataStream.flatMap(FlatMapFunction) function, this function can also query the time and set timers. When reacting to the firing of set timers the function can directly emit elements and/or register yet more timers.

Overrides: process(...) in DataStream

Type Parameters:

The type of elements emitted by the ProcessFunction.
Parameters:

processFunction The ProcessFunction that is called for each element in the stream.

outputType TypeInformation for the result type of the function.

Returns:

The transformed DataStream.

According to error messages, KeyedStream.process need two parameters, the ProcessFunction and R. I try to insert the R parameter but I have no idea what is wrong. Please include KeyedStream.process function examples including R parameter.


Solution

  • This bit of code:

    CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Long>, Tuple2<String, Long>>
    

    Says that your KeyedProcessFunction has a key of type Tuple. But your key selector is .keyBy(value -> value.f0), which returns the first element of the Tuple<String, Long>, so your key is a String.