Search code examples

How to read multi directories with Table API in PyFlink?

I want to read multi directories with Table API in PyFlink,

from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

if __name__ == 'main__':
    env = StreamExecutionEnvironment.get_execution_environment()

    table_env = StreamTableEnvironment.create(stream_execution_environment=env)
    table_env \
        .get_config() \
        .get_configuration() \
        .set_string("default.parallelism", "1")

    ddl = """
        CREATE TABLE test (
            a INT,
            b STRING
        ) WITH (
            'connector' = 'filesystem',          
            'path' = '{path}', 
            'format' = 'csv',
            'csv.ignore-first-line' = 'true',
            'csv.ignore-parse-errors' = 'true',
            'csv.array-element-delimiter' = ';'


But failed with the following error:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.

I'm sure these three directories exists and I have permissions to access it:

If not able to read multi directories, I have to create three tables, and union them, which is much more verbose.

Any suggestion is appreciative. Thank you


  • Just using

    'path' = '/opt/data'

    Should be sufficient. The filesystem connector is also able to read the partition field and perform filtering based on it. For example you can define the table with this schema:

    CREATE TABLE test (
            a INT,
            b STRING,
            day DATE
            'connector' = 'filesystem',          
            'path' = '/opt/data', 

    And then the following query:

    SELECT * FROM test WHERE day = '2021-11-14'

    Will read only the file /opt/data/day=2021-11-14