Search code examples
apache-flinkflink-streaming

Apache Flink: What does "Class X does not contain a setter for field Y" mean?


I'm playing around with flink (1.6, 1.7) for the first time and using the data from the github archive at https://www.gharchive.org/ but using that data as a streaming datasource.

My simple example just counts up all the events per user for a daily window and I'm trying to replicate the same example but using TableEnvironment and SQL Support instead.

However, I am encountering the following error:

class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime as per below:

 8-12-04 14:17:02:115  INFO main exploration.StreamingTableApp:32 - Starting Streaming Table Flink App Example...
18-12-04 14:17:02:174  INFO main typeutils.TypeExtractor:1818 - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
18-12-04 14:17:02:176  INFO main typeutils.TypeExtractor:1857 - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
18-12-04 14:17:02:937  INFO main exploration.StreamingTableApp:74 - Finished...

I'm reading the CSV source as a datastream and using Gson to parse out bits of the json line and mapping these attributes to a Tuple.

Does anyone have any ideas / experience with this ?

main method:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Mapped in docker compose file too.
DataStreamSource<String> input = env.readTextFile("/some/path/github/");

// Setup the stream
DataStream<Tuple4<String, Integer, String, Long>> stream = input.map(new GithubTupleConverter())
    .assignTimestampsAndWatermarks(new TupleTimestampExtractor());

StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Table tableFromStream = tEnv.fromDataStream(stream, "user_id, kount, basic_date,event_date");

TupleTimestampExtractor

public class TupleTimestampExtractor
        extends BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, Integer, String, Long>> {
    private static final long serialVersionUID = 3737675541965835242L;

    public TupleTimestampExtractor() {
        super(Time.seconds(30L));
    }

    @Override
    public long extractTimestamp(Tuple4<String, Integer, String, Long> element) {
        return element.getField(3);
    }
}

GithubTupleConverter.java

public class GithubTupleConverter implements MapFunction<String, Tuple4<String, Integer, String, Long>> {

    private static final Gson g = new Gson();

    @Override
    public Tuple4<String, Integer, String, Long> map(String value) throws Exception {
        // Take each line as Json.
        JsonObject o = g.fromJson(value, JsonObject.class);

        // Extract the user id
        String userId = o.get("actor").getAsJsonObject().get("login").getAsString();

        // Extract the event type (commit, pull request, fork event)
        String type = o.get("type").getAsString();

        // Get the event date time
        String dateTime = o.get("created_at").getAsString();

        // Parse date string to Typed type.
        LocalDateTime eventTime = LocalDateTime.parse(dateTime, DateTimeFormatter.ISO_DATE_TIME);

        // Format the date so it can be used in the output.
        DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;

        return Tuple4.of(userId, 1, formatter.format(eventTime), eventTime.toInstant(ZoneOffset.UTC).toEpochMilli());
    }
} 

Solution

  • The logs that you share do not show an error. The logs are on INFO level and no exception is thrown (at least not in the provided logs).

    The log entry just says that the class TimestampedFileInputSplit cannot be treated as a POJO. In general this message indicates that the performance is not optimal but in this particular case it is not a problem.

    Do you get any other error message?