Search code examples
apache-kafkaapache-kafka-streamskafka-join

Kafka stream join with a specific key as input


I have 3 different topics with 3 Avro files in schema registry, I want to stream these topics and join them together and write them into one topic. the problem is the key I want to join is different with the key I write the data into each topic.

Let's say we have these 3 Avro files:
Alarm:

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

Incident:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

Maintenance:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

I have 3 topics in my Kafka for each of these Avro (ley's say alarm_raw, incident_raw, maintenance_raw) and whenever I wanted to write into these topics I am using ne_id as a key (so the topic partitioned by ne_id). now I want to join these 3 topics and get a new record and write it into a new topic. The problem is I want to join Alarm and Incident based on alarm_id and alarm_source_id and join alarm and maintenance based on ne_id. I want to avoid creating a new topic and re-assign a new key. Is there anyway that I specify the key while I am joining?


Solution

  • It depends what kind of join you want to use (c.f. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)

    For KStream-KStream join, there is currently (v0.10.2 and earlier) no other way than setting a new key (e.g., by using selectKey()) and do a repartitioning.

    For KStream-KTable join, Kafka 0.10.2 (will be released in the next weeks) contains a new feature called GlobalKTables (c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams). This allows you to do a non-key join on the KTable (i.e., a KStream-GlobalKTable join and thus you do not need to repartition the data in you GlobalKTable).

    Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.

    There are plans to add a KTable-GlobalKTable join, too. This might become available in 0.10.3. There are no plans to add "global" KStream-KStream joins though.