Search code examples
apache-flinkflink-streaming

Error converting Flink DataStream to Table after a ProcessFunction call


I'm implementing a data analysis pipeline in Flink and I have a problem converting a DataStream to a Table. I have this table defined from a join between two Kafka sources:

    Table legalFileEventsTable = legalFilesTable.join(eventsTable)
            .where($("id").isEqual($("id_fascicolo")))
            .select(
                    $("id").as("id_fascicolo"),
                    $("id_evento"),
                    $("giudice"),
                    $("nrg"),
                    $("codice_oggetto"),
                    $("ufficio"),
                    $("sezione"),
                    $("data_evento"),
                    $("evento"),
                    $("data_registrazione_evento")
            );

Then I convert the joined table to a DataStream to apply some computation on the data. Here's the code I'm using:

    DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(legalFileEventsTable)
            .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
            .process(new PhaseDurationCounterProcessFunction());
    phasesDurationsDataStream.print();

The PhaseDurationCounterProcessFunction emits a Row like this:

Row outputRow = Row.withNames(RowKind.INSERT);
        outputRow.setField("id_fascicolo", currentState.getId_fascicolo());
        outputRow.setField("nrg", currentState.getNrg());
        outputRow.setField("giudice", currentState.getGiudice());
        outputRow.setField("codice_oggetto", currentState.getCodice_oggetto());
        outputRow.setField("ufficio", currentState.getUfficio());
        outputRow.setField("sezione", currentState.getSezione());
        outputRow.setField("fase", currentState.getPhase());
        outputRow.setField("fase_completata", false);
        outputRow.setField("durata", currentState.getDurationCounter());
        out.collect(outputRow);

After collecting the results from the process function I reconvert the DataStream to a Table and execute the pipeline:

    Table phasesDurationsTable = tEnv.fromChangelogStream(
            phasesDurationsDataStream,
            Schema.newBuilder()
                    .column("id_fascicolo", DataTypes.BIGINT())
                    .column("nrg", DataTypes.STRING())
                    .column("giudice", DataTypes.STRING())
                    .column("codice_oggetto", DataTypes.STRING())
                    .column("ufficio", DataTypes.STRING())
                    .column("sezione", DataTypes.STRING())
                    .column("fase", DataTypes.STRING())
                    .column("fase_completata", DataTypes.BOOLEAN())
                    .column("durata", DataTypes.BIGINT())
                    .primaryKey("id_fascicolo", "fase")
                    .build(),
            ChangelogMode.upsert()
    );
    env.execute();

But during the startup I receive this exception:

Unable to find a field named 'id_fascicolo' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0]

It seems that the row information (name and type) aren't available yet and so the exception is generated. I tried to invoke the env.execute() before the DataStream->Table conversion and in this case the job starts but I have no output if I print the phasesDurationsTable. Any suggestions on how to make this work?


Solution

  • You need to specify the correct type information for Flink because it cannot figure out the schema from the generic Row type.

    Here is an example, given our data stream is producing records like this:

    Row exampleRow = Row.ofKind(RowKind.INSERT, "sensor-1", 32);
    

    We need to define the correct type information the following way:

    TypeInformation<?>[] types = {
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.LONG_TYPE_INFO
    };
    

    Using this we can define the row type information:

    RowTypeInfo rowTypeInfo = new RowTypeInfo(
        types,
        new String[]{"sensor_name", "temperature"}
    );
    

    The last step is to specify the return type of this DataStream:

    DataStream<Row> stream = env.fromSource(...).returns(rowTypeInfo)
    

    Note that when using fromChangelogStream you only need to provide a schema if you want to use a different one from the type info returned by the DataStream, so tEnv.fromChangelogStream(stream) works just fine and will use sensor_name and temperature as the schema by default.