Search code examples
javaapache-kafkaapache-flink

Flink - How to serialize a POJO to Kafka Sink


I'm trying to KafkaSink my DataStream and for that I'm using the below code:

     KafkaSink<Events> sink = KafkaSink.<Events>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(OUTPUT_KAFKA_TOPIC)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

Here, SimpleStringSchema() is not suitable as I'm returning a POJO of Events. Here is the POJO I've been using.

    public Events(Date windowStart, Date windowEnd,
                    String metric, String eventId,
                    long count) {
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
        this.metric = metric;
        this.eventId = eventId;
        this.count = count;
    }

    public Date getWindowStart() {
        return windowStart;
    }

    public void setWindowStart(Date windowStart) {
        this.windowStart = windowStart;
    }

    public Date getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(Date windowEnd) {
        this.windowEnd = windowEnd;
    }

    public String getMetric() {
        return metric;
    }

    public void setMetric(String metric) {
        this.metric = metric;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("Events{");
        sb.append("windowStart=").append(windowStart);
        sb.append(", windowEnd=").append(windowEnd);
        sb.append(", metric=").append(metric);
        sb.append(", eventId=").append(eventId);
        sb.append(", count=").append(count);
        sb.append("}");

        return sb.toString();
    }

For the POJO, I'm not able to come up with the SerializationSchema, that can be used here. I tried the below:

    public class EventsSerializationSchema implements DeserializationSchema<Events>, SerializationSchema<Events> {

        private static final ObjectMapper objectMapper = new ObjectMapper();
        private String topic;

        public EventsSerializationSchema(){
        }

        public EventsSerializationSchema(String topic) {
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(
                final Events events, @Nullable final Long timestamp) {
            try {
                //if topic is null, default topic will be used
                return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(events));
            } catch (JsonProcessingException e) {
                throw new IllegalArgumentException("Could not serialize record: " + events, e);
            }
        }
    }

But, this is not working as I'm not sure on how to serialize this. Can someone please help on this? P.S: As I'm using Flink 1.14, FlinkKafkaPublisher is deprecated in this version.

Thanks in Advance


Solution

  • EventsSerializationSchema is implementing the wrong interface. You want to implement either SerializationSchema or KafkaSerializationSchema, depending on whether you'd rather implement

    byte[] serialize(T element)

    or

    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp).

    See KafkaProducerJob.java and UsageRecordSerializationSchema.java for an example.