Search code examples

Flink streaming Kinesis to Hudi not writing any data

I'm trying out PyFlink for streaming data from Kinesis into Hudi format, but can't figure out why it is not writing any data. I hope that maybe someone can provide any pointers.

Versions: Flink 1.15.4, Python 3.7, Hudi 0.13.0

I use streaming table environment

configuration = Configuration()
env_settings = (
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

This is my schema

def get_test_table_schema() -> Schema:
    return (Schema.new_builder()
            .column("uuid", DataTypes.STRING().not_null())
            .column("updated_at", DataTypes.TIMESTAMP().not_null())
            .column("shard_key", DataTypes.STRING().not_null())

This is how I define the output sink

        .option("path", "file://mypath/hudi/test.table")
        .option("table.primaryKey", "uuid")
        .option("table.preCombineField", "updated_at")

Manually inserting some data works and creates the files in the output path

result = table_env.execute_sql("""insert into test_table values
    ('1',TIMESTAMP '1970-01-01 00:00:01', 'par1'),
    ('2',TIMESTAMP '1970-01-01 00:00:02', 'par1'),
    ('3',TIMESTAMP '1970-01-01 00:00:03', 'par2');

The job in Flink UI finishes and stream_write: test_table tasks shows some data was written.

Manual data with few examples in Flink UI

However, if I stream the records from a source such as Kinesis:

table_env.create_table("kinesis", get_kinesis_source())
input_table = table_env.from_path("kinesis")

I can see the records are retrieved, but never written to Hudi

Kinesis source in Flink UI

There are no exceptions, nothing seems broken. I must be missing something obvious.


  • Looks like I was not just pushing enough data to the pipeline. I also reduced write.batch.size from default 256MB to 1MB and started seeing the updates in the target path.

            .option("path", "file://mypath/hudi/test.table")
            .option("table.primaryKey", "uuid")
            .option("table.preCombineField", "updated_at")
            .option("write.batch.size", "1")