I have a kafka topic that contains few events types.(This is given)
The events are JSON documents.
Lets call the event types: A,B,C,D,E.
I can tell the type by using a field I have in each and every event.
I want to have a Flink job that will handle events A & B separately (using a session window), C & D should go to another type of window and event D should be dropped.
Can I implement such a design in Flink?
Thanks
If so, you could take advantage of Flink's support for Side Outputs and use that as a means to map each of the distinct types to its own stream and operate on those separately (or union them downstream, etc.)
Basically:
This might look something like:
val events = streamEnv
.fromSource(KafkaSource.build(...))
.process(YourTypeSeparatorOperator())
// Example: Getting A & B events
val a = events.getSideOutput(Tags.a)
val b = events.getSideOutput(Tags.b)
// Union this stream (and act on it via windowing, etc.)
val ab = a.union(b)
// Likewise perform operations necessary for C & D types here
// Eventually merge all of these separate streams together if needed
In the above YourTypeSeparatorOperator()
would actually use the side-outputs and based on the type of your event, output it to a designated side-output:
// Example OutputTag
object Tags{
val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
val d = OutputTag("d", TypeInformation.of(YourClass::class.java))
}
// Usage
override fun processElement(...) {
...
when (message.type) {
"a" -> context.output(Tags.a, message)
"b" -> context.output(Tags.b, message)
"c" -> context.output(Tags.c, message)
"d" -> context.output(Tags.d, message)
}
}