Search code examples
pythonapache-flinkflink-streamingpyflink

How can you load a CSV into PyFlink as a Streaming Table Source?


I am trying to setup a simple playground environment to use the Flink Python Table API. The Jobs I am ultimately trying to write will feed off of a Kafka or Kenesis queue, but that makes playing around with ideas (and tests) very difficult.

I can happily load from a CSV and process it in Batch mode. But I cannot get it to work in Streaming Mode. How would I do something similar but in a StreamingExecutionEnvironment (primarily so I can play around with windows).

I understand that I need to get the system to use EventTime (because ProcTime would all come in at once), but I cannot find anyway to set this up. In principle I should be able to set one of the columns of the CSV to be the event time, but it is not clear form the docs how to do this (or if it is possible).

To get the Batch execution tests running I used the below code, which reads from an input.csv and outputs to an output.csv.

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

and input.csv is

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

So my question is how could I set it up so that it reads from the same CSV, but uses the first column as the event time and allow me to write code like:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

Any help would be appreciated, I cant work this out from the docs. I am using python 3.7 and flink 1.11.1 .


Solution

  • If you use the descriptor API, you can specify a field is the event-time field through the schema:

    .with_schema(  # declare the schema of the table
                 Schema()
                 .field("rowtime", DataTypes.TIMESTAMP())
                 .rowtime(
                    Rowtime()
                    .timestamps_from_field("time")
                    .watermarks_periodic_bounded(60000))
                 .field("a", DataTypes.STRING())
                 .field("b", DataTypes.STRING())
                 .field("c", DataTypes.STRING())
             )
    

    But I still recommend you to use DDL, on the one hand it is easier to use, on the other hand there are some bugs in the existing Descriptor API, the community is discussing refactoring the Descriptor API