Search code examples
apache-flinkpyflink

How to read multi directories with Table API in PyFlink?


I want to read multi directories with Table API in PyFlink,

from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode


if __name__ == 'main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    env.set_parallelism(1)

    table_env = StreamTableEnvironment.create(stream_execution_environment=env)
    table_env \
        .get_config() \
        .get_configuration() \
        .set_string("default.parallelism", "1")


    ddl = """
        CREATE TABLE test (
            a INT,
            b STRING
        ) WITH (
            'connector' = 'filesystem',          
            'path' = '{path}', 
            'format' = 'csv',
            'csv.ignore-first-line' = 'true',
            'csv.ignore-parse-errors' = 'true',
            'csv.array-element-delimiter' = ';'
        )
    """.format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')

    table_env.execute_sql(ddl)

But failed with the following error:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.

I'm sure these three directories exists and I have permissions to access it:
/opt/data/day=2021-11-14,
/opt/data/day=2021-11-15,
/opt/data/day=2021-11-16

If not able to read multi directories, I have to create three tables, and union them, which is much more verbose.

Any suggestion is appreciative. Thank you


Solution

  • Just using

    'path' = '/opt/data'
    

    Should be sufficient. The filesystem connector is also able to read the partition field and perform filtering based on it. For example you can define the table with this schema:

    CREATE TABLE test (
            a INT,
            b STRING,
            day DATE
    ) PARTITIONED BY (day) WITH (
            'connector' = 'filesystem',          
            'path' = '/opt/data', 
            [...]
    )
    

    And then the following query:

    SELECT * FROM test WHERE day = '2021-11-14'
    

    Will read only the file /opt/data/day=2021-11-14