I want to setup a program where I can send a data stream with kafka to flink.
DataStream<Integer> input =
env.fromSource(
KafkaSource.<Integer>builder()
.setBootstrapServers(
parameterTool
.getProperties()
.getProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setBounded(OffsetsInitializer.latest())
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setTopics(parameterTool.getRequired("input-topic"))
.build(),
WatermarkStrategy.noWatermarks(),
"kafka-source");
When I try to compile the code above the following failure occurs:
Exception in thread "main" java.lang.Error: Unresolved compilation problem:
The method valueOnly(DeserializationSchema<V>) in the type KafkaRecordDeserializationSchema is not applicable for the arguments (Class<NumberDeserializers.IntegerDeserializer>)
There is a problem with the valueOnly
, but I don't get what the problem is and how to solve.
Perhaps you picked up the wrong imports.
I think perhaps you wanted org.apache.kafka.common.serialization.IntegerDeserializer
rather than the IntegerDeserializer
from Jackson. The class you pass to valueOnly
should implement org.apache.kafka.common.serialization.Deserializer
.