Search code examples
jsonapache-kafkaapache-flinkpyflinkflink-table-api

Consuming json message


I created a flink application using the table API to ingest data from a kafka topic (that I generate myself). The dataset is youtube stats from kaggle. I can see in confluent's UI that the topic is getting the message from my producer, and it looks correct. I also ran a json validation online for one of the messages and it showed it was valid.

The flink application should apply some transformations and write to another kafka topic, but for some reason, it fails to deserialize the json payload coming from the source. One example of such exception is like the one below:

Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = youtube_stats, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1675701408811, serialized key size = 12, serialized value size = 1955, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@3dd62069, value = [B@2aa4839f).
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
        ... 14 more
Caused by: java.io.IOException: Failed to deserialize JSON '{"video_id": "2kyS6SvSYSE", "trending_date": "17.14.11", "title": "WE WANT TO TALK ABOUT OUR MARRIAGE", "channel_title": "CaseyNeistat", "category_id": 22, "publish_time": "2017-11-13T17:13:01.000Z", "tags": "SHANtell martin", "views": 748374, "likes": 57527, "dislikes": 2966, "comment_count": 15954, "thumbnail_link": "https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg", "comments_disabled": false, "ratings_disabled": false, "video_error_or_removed": false, "description": "SHANTELL'S CHANNEL - https://www.youtube.com/shantellmartin\\nCANDICE - https://www.lovebilly.com\\n\\nfilmed this video in 4k on this -- http://amzn.to/2sTDnRZ\\nwith this lens -- http://amzn.to/2rUJOmD\\nbig drone - http://someurl.com/h4ft3oy\\nOTHER GEAR ---  http://amzn.to/2o3GLX5\\nSony CAMERA http://amzn.to/2nOBmnv\\nOLD CAMERA; http://amzn.to/2o2cQBT\\nMAIN LENS; http://amzn.to/2od5gBJ\\nBIG SONY CAMERA; http://amzn.to/2nrdJRO\\nBIG Canon CAMERA; http://someurl.com/jn4q4vz\\nBENDY TRIPOD THING; http://someurl.com/gw3ylz2\\nYOU NEED THIS FOR THE BENDY TRIPOD; http://someurl.com/j8mzzua\\nWIDE LENS; http://someurl.com/jkfcm8t\\nMORE EXPENSIVE WIDE LENS; http://someurl.com/zrdgtou\\nSMALL CAMERA; http://someurl.com/hrrzhor\\nMICROPHONE; http://someurl.com/zefm4jy\\nOTHER MICROPHONE; http://someurl.com/jxgpj86\\nOLD DRONE (cheaper but still great);http://someurl.com/zcfmnmd\\n\\nfollow me; on http://instagram.com/caseyneistat\\non https://www.facebook.com/cneistat\\non https://twitter.com/CaseyNeistat\\n\\namazing intro song by https://soundcloud.com/discoteeth\\n\\nad disclosure.  THIS IS NOT AN AD.  not selling or promoting anything.  but samsung did produce the Shantell Video as a 'GALAXY PROJECT' which is an initiative that enables creators like Shantell and me to make projects we might otherwise not have the opportunity to make.  hope that's clear.  if not ask in the comments and i'll answer any specifics.", "event_time": "2023-02-06 16:36:48"}'.
        at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:122)
        at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:51)
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
        ... 15 more
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x17a2276 (above 0x0010ffff) at char #1, byte #7)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:255)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2389)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:677)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
        at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserializeToJsonNode(JsonRowDataDeserializationSchema.java:127)
        at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:116)
        ... 19 more

Some parts of the code:

fields = [
            ("video_id", "STRING"),
            ("trending_date", "STRING"),
            ("title", "STRING"),
            ("channel_title", "STRING"),
            ("category_id", "STRING"),
            ("publish_time", "STRING"),
            ("tags", "STRING"),
            ("`views`", "BIGINT"),
            ("likes", "BIGINT"),
            ("dislikes", "BIGINT"),
            ("comment_count", "BIGINT"),
            ("thumbnail_link", "STRING"),
            ("comments_disabled", "BOOLEAN"),
            ("ratings_disabled", "BOOLEAN"),
            ("video_error_or_removed", "BOOLEAN"),
            ("description", "STRING"),
            ("event_time", "TIMESTAMP(3)"),
            ("WATERMARK FOR event_time AS event_time - INTERVAL '10' second", "")
        ]

table_ddl = """
CREATE TABLE {kwargs["table_name"]} (
                {','.join([f"{ft[0]} {ft[1]}" for ft in fields])}
            ) WITH (
                'connector' = 'kafka',
                'topic' = '{kwargs["topic"]}',
                'properties.bootstrap.servers' = '{kwargs["brokers"]}',
                'scan.startup.mode' = 'earliest-offset',
                'value.format' = '{kwargs["value_format"]}',
                'value.json.fail-on-missing-field' = 'false',
                'value.json.ignore-parse-errors' = 'true'
            )
"""
table_env.execute_sql(table_ddl)

sink_table = # pretty similar to the other

tumble_window = (
            Tumble.over(lit(self.agg_every_n_secs).seconds)
            .on(col("event_time"))
            .alias("w")
        )
        stats_table = (
            table_env.from_path(table_name).window(tumble_window)
            .group_by(col("w"), col("channel_title"))
            .select(
                col("channel_title"),
                col("likes").max.alias("likes"),
                col("dislikes").max.alias("dislikes"),
                (
                    col("likes").cast(DataTypes.FLOAT()).max
                    / col("dislikes").if_null(1).cast(DataTypes.FLOAT()).max
                ).alias("like_dislike_ratio"),
                col("views").sum.alias("total_views"),
                col("w").end.cast(DataTypes.TIMESTAMP(3)).alias("proctime"),
            )
        )
stats_table.execute_insert(sink_table_name).wait()

With this option set on the table ddl 'value.json.ignore-parse-errors' = 'true', the exception went away but nothing was pushed to the topic


Solution

  • So even though the UI showed the messages being produced and seemed correct, when the using the console consumer, I noticed that at the beginning of the messages, there was indeed a weird character. So I realized that I needed to update my producer to produce the messages like this:

    self.producer.produce(
                topic=self.topic,
                key=self.string_serializer(data["channel_title"]),
                value=self.string_serializer(json.dumps(data), SerializationContext(self.topic, MessageField.VALUE)),
            )
    

    Instead of directly passing the value like I did before and shown below:

    # value=self.json_serializer(data, SerializationContext(self.topic, MessageField.VALUE))