Search code examples
pythonpython-3.xcsvapache-flink

Why can't the comma be used as the delimiter in Apache Flink


I'm learning how to use Apache Flink and using Python3.5 to write a simple of Word Count.

Here is my Python code:

#!/usr/bin/python3.5

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
import os

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

input = './input'
output = './output'
try:
    os.remove(input)
    os.remove(output)
except OSError:
    pass
fd = os.open(input, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o666)

msg = 'aaa\nbbb\nccc'      # <------------- look at here
# msg = 'aaa,bbb,ccc'      # <------------- not working as expected
os.write(fd, msg.encode())

t_env.connect(FileSystem().path(input)) \
    .with_format(OldCsv()            # <-------------- look at here
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path(output)) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("python_job")

It works as expected. After executing this script, I get an file named output:

aaa     1
bbb     1
ccc     1

As you can see, I write the content of msg, which is aaa\nbbb\nccc, into the file input.

Then, I'm thinking it seems that the input file is under the format of CSV. So I try to change the variable msg as aaa,bbb,ccc.

However, now the output becomes aaa 1. It seems that the part behind the first comma is lost.

I can't understand why. As it is a CSV file, why can't I use the comma as the delimiter?


Solution

  • Comma is the default field delimiter, so if you delete this line

    .field_delimiter('\t')
    

    you will get what you want, or you could be explicit via

    .field_delimiter(',')
    

    The OldCsv table source is described here in the documentation.