I want to read multi directories with Table API in PyFlink,
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
if __name__ == 'main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env \
.get_config() \
.get_configuration() \
.set_string("default.parallelism", "1")
ddl = """
CREATE TABLE test (
a INT,
b STRING
) WITH (
'connector' = 'filesystem',
'path' = '{path}',
'format' = 'csv',
'csv.ignore-first-line' = 'true',
'csv.ignore-parse-errors' = 'true',
'csv.array-element-delimiter' = ';'
)
""".format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')
table_env.execute_sql(ddl)
But failed with the following error:
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.
I'm sure these three directories exists and I have permissions to access it:
/opt/data/day=2021-11-14,
/opt/data/day=2021-11-15,
/opt/data/day=2021-11-16
If not able to read multi directories, I have to create three tables, and union them, which is much more verbose.
Any suggestion is appreciative. Thank you
Just using
'path' = '/opt/data'
Should be sufficient. The filesystem connector is also able to read the partition field and perform filtering based on it. For example you can define the table with this schema:
CREATE TABLE test (
a INT,
b STRING,
day DATE
) PARTITIONED BY (day) WITH (
'connector' = 'filesystem',
'path' = '/opt/data',
[...]
)
And then the following query:
SELECT * FROM test WHERE day = '2021-11-14'
Will read only the file /opt/data/day=2021-11-14