I'm learning how to use Apache Flink and using Python3.5 to write a simple of Word Count.
Here is my Python code:
#!/usr/bin/python3.5
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
import os
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
input = './input'
output = './output'
try:
os.remove(input)
os.remove(output)
except OSError:
pass
fd = os.open(input, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o666)
msg = 'aaa\nbbb\nccc' # <------------- look at here
# msg = 'aaa,bbb,ccc' # <------------- not working as expected
os.write(fd, msg.encode())
t_env.connect(FileSystem().path(input)) \
.with_format(OldCsv() # <-------------- look at here
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path(output)) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("python_job")
It works as expected. After executing this script, I get an file named output
:
aaa 1
bbb 1
ccc 1
As you can see, I write the content of msg
, which is aaa\nbbb\nccc
, into the file input
.
Then, I'm thinking it seems that the input file is under the format of CSV
. So I try to change the variable msg
as aaa,bbb,ccc
.
However, now the output becomes aaa 1
. It seems that the part behind the first comma is lost.
I can't understand why. As it is a CSV
file, why can't I use the comma as the delimiter?
Comma is the default field delimiter, so if you delete this line
.field_delimiter('\t')
you will get what you want, or you could be explicit via
.field_delimiter(',')
The OldCsv
table source is described here in the documentation.