Search code examples
apache-flinkpyflink

Flink: Not able to sink a stream into csv


I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work.

# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()

To run the script:

$ python stream_to_csv.py

I expect records go to /tmp/output folder, however that doesn't happen.

$ ~ ls /tmp/output
(nothing shown here)

Anything I miss?


Solution

  • I shamelessly copy Dian Fu's reply in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Not-able-to-sink-a-stream-into-csv-td43105.html.

    You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details.

    Actually there are output and you could execute command ls -la /tmp/output/, then you will see several files named “.part-xxx”.

    For your job, you need to set the execution.checkpointing.interval in the configuration and sink.rolling-policy.rollover-interval in the property of Filesystem connector.

    The job will look like the following:

    from pyflink.table import EnvironmentSettings, StreamTableEnvironment
    
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    table_env = StreamTableEnvironment.create(environment_settings=env_settings)
    table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
    
    table_env.execute_sql("""
        CREATE TABLE datagen (
            id INT,
            data STRING
        ) WITH (
            'connector' = 'datagen',
            'rows-per-second' = '1'
        )
    """)
    
    table_env.execute_sql("""
        CREATE TABLE print (
            id INT,
            data STRING
        ) WITH (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/output',
            'sink.rolling-policy.rollover-interval' = '10s'
        )
    """)
    
    table_env.execute_sql("""
    INSERT INTO print
    SELECT id, data
    FROM datagen
    """).wait()
    

    [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy