package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:19092")
.setTopics("1000pepeusdt","1000pepeusdtt")
.setStartingOffsets(OffsetsInitializer.earliest())
**\\the error is on this line**.setValueOnlyDeserializer(new JsonDeserializationSchema<>(TradeEvent.class))
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.print();
// Execute the job
env.execute("Kafka Consumer Example");
}
}
This is what I'm trying to
consume: {
"e":"aggTrade"
"E":1684547379069
"a":30161410
"s":"1000PEPEUSDT"
"p":"0.0015036"
"q":"665070"
"f":70012793
"l":70012793
"T":1684547378913
"m":true
}
You're on the right track. One way you can resolve this is being calling:
.setValueOnlyDeserializer(new TradeEventDeserializationSchema())
Your TradeEventDeserializationSchema
would be a custom deserializer, which would use something like Jackson's ObjectMapper
to deserialize each incoming event, e.g.:
public class TradeEventDeserializationSchema extends AbstractDeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
/**
* For performance reasons it's better to create on ObjectMapper in this open method rather than
* creating a new ObjectMapper for every record.
*/
@Override
public void open(InitializationContext context) {
// JavaTimeModule is needed for Java 8 data time (Instant) support
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
}
/**
* If our deserialize method needed access to the information in the Kafka headers of a
* KafkaConsumerRecord, we would have implemented a KafkaRecordDeserializationSchema instead of
* extending AbstractDeserializationSchema.
*/
@Override
public Event deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Event.class);
}
}
Event is this case would be your event POJO.
You can find more details and an example at https://docs.immerok.cloud/docs/how-to-guides/development/deserializing-json-from-kafka-to-apache-flink-pojo/