I am new to Apache Flink. I am trying to stream data from Kafka, do something on Flink and publish the data to some other topic in Kafka.
Below are the dependencies added
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Code
public class App
{
public static void main( String[] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("webapp-analytics")
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<String> finalStream = source.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.contains("clicked");
}
});
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
finalStream.sinkTo(sink);
env.execute("Kafka");
}
}
Now the issue what I am facing here is finalStream.sinkTo();
method is not accepting the KafkaSink object as an argument. It is expecting the argument as Sink<String, ?, ?, ?>
What am I missing here?
From dependencies, it looks like you're mixing Flink 1.14.6 with Flink 1.17.1. Make everything 1.17.1, and you should be fine.