Search code examples
apache-flinkpyflink

Why does Flink FileSystem sink splits into multiple files


I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See wordcount.py below:

from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output

Before running python wordcount.py, I run echo -e "flink\npyflink\nflink" > /tmp/input to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output:

> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2

While I expect a single file /tmp/output with content:

pyflink,1
flink,2

Actually, I got the above python program by adjusting the below that produces the single file /tmp/output.

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter.

> cat /tmp/output
flink   2
pyflink 1

Any idea why? Thanks!


Solution

  • The first time you ran it without having specified the parallelism, and so you got the default parallelism -- which is greater than 1 (probably 4 or 8, depending on how many cores your computer has).

    Flink is designed to be scalable, and to achieve that, parallel instances of an operator, such as a sink, are decoupled from one another. Imagine, for example, a large cluster with 100s or 1000s of nodes. For this to work well, each instance needs to write to its own file.

    The commas were changed to tabs because you specified .field_delimiter('\t').