Search code examples
flink-streamingflink-sql

Flink SQL CSV Streaming Continuously


I am creating 2 flink sql tables, 1 for CSV Filesystem & other for kafka. The Goal is to continuously monitor the filesystem folder and push new csv files records to the kafka topic. But the query that i have written below, pushes the csv file records once and the flink job goes into "Finished" mode and any new file that come info the folder are not being processed. Can someone please tell me how to create flink sql continuous streaming with source and csv filesystem and target as Kafka.

Flink SQL To create a Source-table

CREATE TABLE son_hsb_source_filesystem_csv_bulk(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'filesystem', --Don't Change this
    'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
    'format' = 'csv', --Don't Change this
    'format.ignore-parse-errors' = 'true', --Don't Change this
    'csv.ignore-parse-errors' = 'true', --Don't Change this
    'csv.allow-comments' = 'true' --Don't Change this
);

Flink SQL to create a target table


CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'kafka',  --Don't Change this
    'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests',  -- Add any topic name you want
    'scan.startup.mode' = 'earliest-offset',  --Don't Change this
    'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
    'format' = 'json',  --Don't Change this
    'json.fail-on-missing-field' = 'false', --Don't Change this
    'json.ignore-parse-errors' = 'true' --Don't Change this
);

Flink SQL to create a Streaming Job # This runs once and goes into Finished mode

INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk

How to define a Streaming job that always stays in "Running" State and looks for new files. Please suggest.


Solution

  • The documentation indicates that this feature hasn't been implemented yet for streaming file system sources:

    File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.