Search code examples
apache-kafkaapache-flink

Flink Job design - working with hybrid Kafka topic


I have a kafka topic that contains few events types.(This is given)
The events are JSON documents.

Lets call the event types: A,B,C,D,E.

I can tell the type by using a field I have in each and every event.

I want to have a Flink job that will handle events A & B separately (using a session window), C & D should go to another type of window and event D should be dropped.

Can I implement such a design in Flink?

Thanks


Solution

  • If so, you could take advantage of Flink's support for Side Outputs and use that as a means to map each of the distinct types to its own stream and operate on those separately (or union them downstream, etc.)

    Basically:

    • Read your data from a Kafka Topic (via KafkaSource)
    • Map/Process your data via side-outputs to get each specific type
    • Construct your job graph to align with what you need (union and window downstream, etc.)

    This might look something like:

    val events = streamEnv
      .fromSource(KafkaSource.build(...))
      .process(YourTypeSeparatorOperator())
    
    // Example: Getting A & B events
    val a = events.getSideOutput(Tags.a)
    val b = events.getSideOutput(Tags.b)
    
    // Union this stream (and act on it via windowing, etc.)
    val ab = a.union(b)
    
    // Likewise perform operations necessary for C & D types here
    
    // Eventually merge all of these separate streams together if needed
    

    In the above YourTypeSeparatorOperator() would actually use the side-outputs and based on the type of your event, output it to a designated side-output:

    // Example OutputTag
    object Tags{
        val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
        val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
        val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
        val d = OutputTag("d", TypeInformation.of(YourClass::class.java))     
    }
    
    // Usage
    override fun processElement(...) {
       ...
       when (message.type) {
          "a" -> context.output(Tags.a, message)
          "b" -> context.output(Tags.b, message)
          "c" -> context.output(Tags.c, message)
          "d" -> context.output(Tags.d, message)
       }
    }