Search code examples
apache-kafkaksqldb

KsqlDB Converting from TextNode to Array


TL;DR: In ksqlDB, is there a way to convert a TextNode into an Array<VARCHAR> so an EXPLODE can be performed without error?

Brand new to ksqlDB and running into an odd issue. I'm ETLing off debezium -> ksqldb and the data is flowing which is great. The problem is, when I go to use EXPLODE function it is failing to parse since what I want to be an ARRAY is actually a TEXTNODE. Here is a simplified datastructure coming off Postgres where data is a JSONB inside postgres:

{
  "id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
  "data": [
        "d728fef0-9eec-4dec-b9b6-04b5444431f6",
        "7a475d25-ec73-41c3-9fbc-0a62e96d887a"
  ]
}

My debezium connector is using KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

Setup and outputs(topic output which has extra fields that I ignore):

SET 'auto.offset.reset' = 'earliest';

CREATE SOURCE CONNECTOR forms_reader WITH (
    'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname' = 'db',
    'database.port' = '5432',
    'database.user' = 'master',
    'database.password' = 'secret',
    'database.dbname' = 'forms',
    'database.server.name' = 'forms',
    'table.whitelist' = 'public.response_version',
    'transforms' = 'unwrap',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.unwrap.drop.tombstones' = 'false',
    'transforms.unwrap.delete.handling.mode' = 'rewrite'
);

print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}

Create streams:

CREATE STREAM response_versions(
    id VARCHAR KEY, 
    data ARRAY<String>)
    WITH(kafka_topic='forms.public.response_version', value_format='JSON');

CREATE STREAM response_fields
    WITH(value_format='JSON')
    AS SELECT id AS rv_id,
           EXPLODE(data) AS data_field
        FROM response_versions EMIT CHANGES;

Error in logs on second stream creation:

org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA

Anyone have a solution to this? Ideally I'd like to get the TextNode into an Array but i'm not sure how.

Thanks, Patrick


Solution

  • The error is caused by the response_versions stream which cannot read the string into an array. To do the conversion, you'll have to create that stream to read a string, and then convert the string to an array.

    For conversion str -> arry, I had to remove the brackets with regexp_replace, then split the string to an array with regexp_split_to_array.

    CREATE STREAM response_versions(
        id VARCHAR KEY, 
        data STRING)
        WITH(kafka_topic='forms.public.response_version', value_format='JSON');
    
    CREATE STREAM response_fields
        WITH(value_format='JSON')
        AS SELECT 
           id AS rv_id, 
           EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\\[|\\]', ''), '\\s*,\\s*')) as AS data_field
    FROM response_versions EMIT CHANGES;