Search code examples
mapreducebigdataapache-flinkflink-streamingflink-state

In Flink is it possible to use state with a non keyed stream?


Lets assume that I have an input DataStream and want to implement some functionality that requires "memory" so I need ProcessFunction that gives me access to state. Is it possible to do it straight to the DataStream or the only way is to keyBy the initial stream and work in keyed-context?

I'm thinking that one solution would be to keyBy the stream with a hardcoded unique key so the whole input stream ends up in the same group. Then technically I have a KeyedStream and I can normally use keyed state, like I'm showing below with keyBy(x->1). But is this a good solution?

DataStream<Integer> inputStream = env.fromSource(...)

DataStream<Integer> outputStream = inputStream
                 .keyBy(x -> 1)
                 .process(...) //I've got acess to state heree

As I understand that's not a common usecase because the main purpose of flink is to partition the stream, process them seperately and then merge the results. In my scenario thats exactly what I'm doing, but the problem is that the merge step requires state to produce the final "global" result. What I actually want to do is something like this:

DataStream<Integer> inputStream = env.fromElements(1,2,3,4,5,6,7,8,9)

//two groups: group1=[1,2,3,4] & group2=[5,6,7,8,9]
DataStream<Integer> partialResult = inputStream
                            .keyBy(val -> val/5) 
                            .process(<..stateful processing..>)

//Can't do statefull processing here because partialResult is not a KeyedStream
DataStream<Integer> outputStream = partialResult
                          .process(<..statefull processing..>)

outputStream.print();

But Flink doesnt seem to allow me do the final "merge partial results operation" because I can't get access to state in process function as partialResult is not a KeyedStream.

I'm beginner to flink so I hope what I'm writing makes sense. In general I can say that I haven't found a good way to do the "merging" step, especially when it comes to complex logic. Hope someone can give me some info, tips or correct me if I'm missing something

Thank you for your time


Solution

    1. Is "keyBy the stream with a hardcoded unique key" a good idea? Well, normally no, since it forces all data to flow through a single sub-task, so you get no benefit from the full parallelism in your Flink cluster.
    2. If you want to get a global result (e.g. the "best" 3 results, from any results generated in the preceding step) then yes, you'll have to run all records through a single sub-task. So you could have a fixed key value, and use a global window. But note (as the docs state) you need to come up with some kind of "trigger condition", otherwise with a streaming workflow you never know when you really have the best N results, and thus you'd never emit any final result.