Search code examples
apache-flinkflink-streamingflink-sql

How to read state generate by flink sql code


I have this table

/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
   `tenantId` string,
   `id` string,
   `name` string,
   `headers` MAP<STRING, BYTES> METADATA ,
   `hard_deleted` boolean,
   `kafka_key` STRING,
   `ts` timestamp(3) METADATA  FROM 'timestamp'VIRTUAL,
   `procTime` AS PROCTIME(),
    WATERMARK FOR ts AS ts
) WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = 'kafka:29092',
   'properties.group.id' = 'group_id_1',
   'topic-pattern' = '^topic(_backfill)?$',
   'value.format' = 'json',
   'format' = 'json',
   'key.format' = 'raw',
   'key.fields' = 'kafka_key',
   'value.fields-include' = 'EXCEPT_KEY',
   'scan.startup.mode' = 'earliest-offset',
   'json.timestamp-format.standard' = 'ISO-8601',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
);

I am trying to runt this query

with latest as (SELECT s.*
               FROM (SELECT s.*, ROW_NUMBER() OVER (PARTITION BY cast(`headers`['tenantId'] as varchar), id ORDER BY ts desc) AS row_num
                     FROM eoj_entitlement_source s) s
               WHERE row_num = 1
    )

select *
from latest

I am trying to examine the state that this dedup generates. I am to see the db folder that get generated, inside which are sst files.

I have rocksdb installed locally, when I get

./rocksdb_sst_dump --file=/location/db --command=scan --read_num=5 --output_hex

I get the output like

'35000000180000000000000000742D310000000083653800000000008200' seq:2, type:1 => 0000005800000000000000002800000030000000742D312365380086E5F326E894010000742D310000000083653800000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459
'53000000180000000000000000742D310000000083653700000000008200' seq:1, type:1 => 0000005800000000000000002800000030000000742D312365360086492525E894010000742D310000000083653700000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459
'7E000000180000000000000000742D310000000083653900000000008200' seq:3, type:1 => 0000005800000000000000002800000030000000742D312365390086CB0527E894010000742D310000000083653900000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459

All these looks like hex data, I have tried to deserialize this data, I can see partial results. Some chars are not printable in the hex.

Question: What serializer is used internal when flink write this data to state ?


Solution

  • The deduplication operators use the RowSerializer, since they are storing ValueState<Row>. The format is described in the implementation.