Search code examples
apache-kafkaapache-flink

Flink - KafkaSink not writing data to kafka topic


I'm trying to read JSON events from Kafka, aggregate it on a eventId and its category and write them to a different kafka topic through flink. The program is able to read messages from kafka, but KafkaSink is not writing the data back to the other kafka topic. I'm not sure on the mistake I'm doing. Can someone please check and let me know, where I'm wrong. Here is the code I'm using.

KafkaSource<EventMessage> source = KafkaSource.<EventMessage>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setTopics(INPUT_KAFKA_TOPIC)
        .setGroupId(LOCAL_GROUP)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new InputDeserializationSchema())
        .build();

WindowAssigner<Object, TimeWindow> windowAssigner = TumblingEventTimeWindows.of(WINDOW_SIZE);

DataStream<EventMessage> eventStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Event Source");

DataStream<EventSummary> events =
eventStream
        .keyBy(eventMessage -> eventMessage.getCategory() + eventMessage.getEventId())
        .window(windowAssigner)
        .aggregate(new EventAggregator())
        .name("EventAggregator test >> ");

KafkaSink<EventSummary> sink = KafkaSink.<EventSummary>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(OUTPUT_KAFKA_TOPIC)
                .setValueSerializationSchema(new OutputSummarySerializationSchema())
                .build())
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

events.sinkTo(sink);

These are the POJO's I've created for input message and output.

# EventMessage POJO
public class EventMessage implements Serializable {
    private Long timestamp;
    private int eventValue;
    private String eventId;
    private String category;

    public EventMessage() { }

    public EventMessage(Long timestamp, int eventValue, String eventId, String category) {
        this.timestamp = timestamp;
        this.eventValue = eventValue;
        this.eventId = eventId;
        this.category = category;
    }
    .....
}

# EventSummary POJO
public class EventSummary {

    public EventMessage eventMessage;
    public int sum;
    public int count;

    public EventSummary() { }
    ....
}

These are the deserialization and serialization schemas I'm using.

public class InputDeserializationSchema implements DeserializationSchema<EventMessage> {

    static ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public EventMessage deserialize(byte[] bytes) throws IOException {

        return objectMapper.readValue(bytes, EventMessage.class);
    }

    @Override
    public boolean isEndOfStream(EventMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation<EventMessage> getProducedType() {
        return TypeInformation.of(EventMessage.class);
    }
}

public class OutputSummarySerializationSchema implements SerializationSchema<EventSummary> {

    static ObjectMapper objectMapper = new ObjectMapper();

    Logger logger = LoggerFactory.getLogger(OutputSummarySerializationSchema.class);

    @Override
    public byte[] serialize(EventSummary eventSummary) {
        if (objectMapper == null) {
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            objectMapper = new ObjectMapper();
        }
        try {
            String json = objectMapper.writeValueAsString(eventSummary);
            return json.getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

I'm using this aggregator for aggregating the JSON messages.

public class EventAggregator implements AggregateFunction<EventMessage, EventSummary, EventSummary> {

    private static final Logger log = LoggerFactory.getLogger(EventAggregator.class);
    @Override
    public EventSummary createAccumulator() {
        return new EventSummary();
    }

    @Override
    public EventSummary add(EventMessage eventMessage, EventSummary eventSummary) {
        eventSummary.eventMessage = eventMessage;
        eventSummary.count += 1;
        eventSummary.sum += eventMessage.getEventValue();

        return eventSummary;
    }

    @Override
    public EventSummary getResult(EventSummary eventSummary) {
        return eventSummary;
    }

    @Override
    public EventSummary merge(EventSummary summary1, EventSummary summary2) {
        return new EventSummary(null,
                summary1.sum + summary2.sum,
                summary1.count + summary2.count);
    }
}

Can someone help me on this?

Thanks in advance.


Solution

  • In order for event time windowing to work, you must specify a proper WatermarkStrategy. Otherwise, the windows will never close, and no results will be produced.

    The role that watermarks play is to mark a place in a stream, and indicate that the stream is, at that point, complete through some specific timestamp. Until receiving this indicator of stream completeness, windows continue to wait for more events to be assigned to them.

    To simply the debugging the watermarks, you might switch to a PrintSink until you get the watermarking working properly. Or to simplify debugging the KafkaSink, you could switch to using processing time windows until the sink is working.