I have this table
/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
`tenantId` string,
`id` string,
`name` string,
`headers` MAP<STRING, BYTES> METADATA ,
`hard_deleted` boolean,
`kafka_key` STRING,
`ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
`procTime` AS PROCTIME(),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'group_id_1',
'topic-pattern' = '^topic(_backfill)?$',
'value.format' = 'json',
'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'kafka_key',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I am trying to runt this query
with latest as (SELECT s.*
FROM (SELECT s.*, ROW_NUMBER() OVER (PARTITION BY cast(`headers`['tenantId'] as varchar), id ORDER BY ts desc) AS row_num
FROM eoj_entitlement_source s) s
WHERE row_num = 1
)
select *
from latest
I am trying to examine the state that this dedup generates. I am to see the db
folder that get generated, inside which are sst files.
I have rocksdb installed locally, when I get
./rocksdb_sst_dump --file=/location/db --command=scan --read_num=5 --output_hex
I get the output like
'35000000180000000000000000742D310000000083653800000000008200' seq:2, type:1 => 0000005800000000000000002800000030000000742D312365380086E5F326E894010000742D310000000083653800000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459
'53000000180000000000000000742D310000000083653700000000008200' seq:1, type:1 => 0000005800000000000000002800000030000000742D312365360086492525E894010000742D310000000083653700000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459
'7E000000180000000000000000742D310000000083653900000000008200' seq:3, type:1 => 0000005800000000000000002800000030000000742D312365390086CB0527E894010000742D310000000083653900000000008200040000000000006931000000000082080000002000000000000000000000004944454E54495459
All these looks like hex data, I have tried to deserialize this data, I can see partial results. Some chars are not printable in the hex.
Question: What serializer is used internal when flink write this data to state ?
The deduplication operators use the RowSerializer
, since they are storing ValueState<Row>
. The format is described in the implementation.