Search code examples
apache-flinkflink-sql

Flink TableAPI: PartitionedBy columns missing in Parquet files


I’m using filesystem connector to sink data into S3 in parquet format using TableAPI. I observed the partitionedBy columns are missing in the parquet file. Here are the queries I’m using:

CREATE TABLE data_to_sink (
    record_id STRING NOT NULL,
    request_id STRING NOT NULL,
    source_name STRING NOT NULL,
    event_type STRING NOT NULL,
    event_name STRING NOT NULL,
    `date` STRING,
    results_count BIGINT
) PARTITIONED BY (record_id, source_name, `date`) WITH (
    'connector' = 'filesystem',
    'path' = '<S3 path>',
    'format' = 'parquet'
);

INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);

I can see the parquet files being created, but when I verified the schema using parquet-cli tool, the schema doesn’t show record_id, source_name and date fields. I verified Flink's document as well, but didn’t find any setting for this.

Is there any known issue around this?


Solution

  • I fixed this by cloning record_id, source_name columns and then partitioning by those columns.

    CREATE TABLE data_to_sink (
        record_id STRING NOT NULL,
        request_id STRING NOT NULL,
        source_name STRING NOT NULL,
        event_type STRING NOT NULL,
        event_name STRING NOT NULL,
        `date` STRING,
        results_count BIGINT,
        recordId STRING,
        sourceName STRING
    ) PARTITIONED BY (recordId, sourceName, `date`) WITH (
        'connector' = 'filesystem',
        'path' = '<S3 path>',
        'format' = 'parquet'
    );
    
    INSERT INTO data_to_sink
    SELECT record_id, request_id, source_name, event_type, event_name,
    DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count, 
    record_id AS recordId, source_name AS sourceName 
    FROM data_from_source
    GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);