Search code examples
apache-flinkdebeziumflink-sqlpyflink

Flink failed to deserialize JSON produced by Debezium


I'm trying to use Flink to consume the change event log produced by Debezium. The JSON was this:

{
    "schema":{

    },
    "payload":{
        "before":null,
        "after":{
            "team_config_id":3800,
            "team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
            "team_config_sfdc_account_id":null,
            "team_config_sfdc_account_url":null,
            "team_config_business_type":5,
            "team_config_dpsa_status":0,
            "team_config_desc":null,
            "team_config_company_id":null,
            "team_config_hm_count_stages":null,
            "team_config_assign_credits_times":null,
            "team_config_real_renew_date":null,
            "team_config_action_date":null,
            "team_config_last_action_date":null,
            "team_config_business_tier_notification":"{}",
            "team_config_create_date":1670724933000,
            "team_config_update_date":1670724933000,
            "team_config_rediscovery_tier":0,
            "team_config_rediscovery_tier_notification":"{}",
            "team_config_sfdc_industry":null,
            "team_config_sfdc_market_segment":null,
            "team_config_unterminated_note_id":0
        },
        "source":{

        },
        "op":"c",
        "ts_ms":1670724933149,
        "transaction":null
    }
}

And I've tried two ways to declare the input schema.

The first way was directly parse the JSON data :

create table team_config_source (
      `payload` ROW <
        `after` ROW <
          ...
          team_config_create_date timestamp(3),
          team_config_update_date timestamp(3),
          ...
        >
      >
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'json'
    )

But Flink would throw an error org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date caused by java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0. Doesn't Flink support timestamp in this format?

I've also tried another way, using the built-in debezium format:

create table team_config_source (
      team_config_create_id int,
      ...
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'debezium-json'
    )

But Flink come up with another error java.io.IOException: Corrupt Debezium JSON message caused by java.lang.NullPointerException. I found somebody said that update event shouldn't has null as before value, but this message was a create event.

Could anyone help to check my DDL?


Solution

  • I am an a Flink expert but TIMESTAMP in Flink is not Epoch time, it is in datetime format.

    In this case you can define table like:

    team_config_create_bigint BIGINT,
    team_config_update_bigint BIGINT,
    ...
    team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
    team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))