Search code examples
bigdataapache-flinkflink-streaming

In Flink is it possible to have a DataStream<Tuple> where Tuple is the base class of all known Tulples like Tuple2, Tuple3 etc?


I am creating a Flink application that reads strings from a Kafka topic for example "2 5 9" is a value. Then split the string with " " delimiter and create map it to a tuple. In this case the result of the map function would be a DataStream<Tuple3<Integer,Integer,Integer>> which is simple. The problem is that I want my app to be parameterised, meaning that sometimes the data that it will read have 3 dimensions (like "2 5 9"), and another time maybe 2 dimensions so in this case I would need Tuple2.

I thought that I could use the Tuple base class like I'm showing but things didn't work

DataStream<String> strData = env.fromSource(...)

DataStream<Tuple> tupleData = inputData.map(new MapFunction<String, Tuple>() {
    @Override
    public Tuple map(String s) throws Exception {

        String[] tokens = s.split(" ");
        int numOfDimensions = tokens.length;

        Tuple tuple = Tuple.newInstance(numOfDimensions);

        for(int i=0; i<numOfDimensions; i++){
            tuple.setField(Integer.valueOf(tokens[i]), i);
        }

        return tuple;
    }
});


Im getting this error:

InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.

So this solution doesn't seem to work. Is there any alternative for this purpose or maybe I am missing something here?

Thanks

Edit due to Bartosz Mikulski comment:

1.The input data are always integers.

2.I am planning to run one job at a time that receives standard length input. For example today I want to run a job with input length of 2, so the parameter that defines the input length is 2. Tomorrow maybe I want to run a job with input length=3 so I will run the job with input length parameter=3.


Solution

  • Thank you for your clarification. In this case, you need to have a variable-length data structure that always holds Integers. The simplest solution would be to use List<Integer> instead of Tuple. So your map function can look like this:

            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStream<String> s1 = env.fromElements("1 2 3", "4 5 6", "8 9 10 11 12");
            DataStream<List<Long>> s2 = s1.map(text -> Arrays.stream(text.split(" "))
                            .map(Long::parseLong)
                            .collect(Collectors.toList()),
                    new TypeHint<List<Long>>() {}.getTypeInfo()
            );
    
            s2.print();
            env.execute("Flink Java API Skeleton");
    

    General information on handling stream elements

    How to handle homogenous data (variable or fixed length collection of the same type) - your case

    In this case, use any Java collection that can be serialized. For example List (both ArrayList and LinkedList are serializable). Reasons for using List<Integer> instead of Tuple:

    • Lists are used for storing data of the same type. In your case Integer.
    • Lists can be adapted to any input length
    • You can use .size() method to check the length

    How to handle fixed-length heterogeneous data (can store different types)

    You can use tuples, but probably custom POJO (Plain Old Java Object) would be better. Tuples are fixed-length storage for heterogeneous types. This means that Tuple2<Integer, Integer> is a different type than Tuple3<Integer, Integer, Integer> or even Tuple2<Integer, Long> (of course in the case of JVM we have type erasure so both Tuple2 are of "the same" type).

    Flink disallows the usage of bare Tuple as a stream element because it has no semantic meaning. What operator could be applied for DataStream<Tuple>? Maybe one that accepts Tuple, but there can be 25 tuples of different lengths and then each tuple field can have different types.

    So why do we have tuples in Flink? Just for convenience. If you need to return for example 2 or 3 elements in an ad hoc manner from a function use a tuple. You don't need to create your own class to handle that case. However, if you model some data type (for example a student record) then I would just use POJO instead (so implement class Student for this scenario).

    The best approach I find with Flink is to model data as classes on the highest level as it is more redable: DataStream<StudentGrade> then DataStream<Tuple3<String, String, Float>>, where StudentGrade stores name, subject, and grade itself.

    How to handle variable-length heterogeneous data of known subtypes

    Let's say that you know that you can allow different types of events in your application (different subtypes). For example your application reads sensor data from the stream and you accept events like SensorValue(sensorId, value) and SensorMean(sensorId, n). SensorValue is information about readout from the sensor and SensorMean is a command that will calculate the mean of the last n sensor readings and output it to the sink. In that case we know that we have only two possible messages and let's say they are modeled as: value 123 0.34 for SensorValue and mean 123 5 for SensorMean.

    For this scenario the best approach would be to use Algebraic Data Type available in Scala. In Java we can model it with interface and inheritance (less powerful, but it will work), so we would have interface SensorEvent, class SensorValue implements SensorEvent, and class SensorMean implements SensorEvent. Then we can have DataStream<SensorEvent> and Flink will not complain, like in the tuple case.

    How to handle variable-length heterogeneous data of unknown subtypes

    In this case, the easiest solution would be to represent that element as some kind of dynamic object. For example Jackson ObjectNode or Json4s JValue. Try not to use List<Object> or Map<String, Object> as this will be harder to manage down the line.