I have 3 different topics with 3 Avro files in schema registry, I want to stream these topics and join them together and write them into one topic. the problem is the key I want to join is different with the key I write the data into each topic.
Let's say we have these 3 Avro files:
Alarm:
{
"type" : "record",
"name" : "Alarm",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "alarm_id",
"type" : "string",
"doc" : "Unique identifier of the alarm."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "Unique identifier of the network element ID that produces the alarm."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the alarm was generated."
}, {
"name" : "severity",
"type" : [ "null", "string" ],
"doc" : "The severity field is the default severity associated to the alarm ",
"default" : null
}]
}
Incident:
{
"type" : "record",
"name" : "Incident",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "incident_id",
"type" : "string",
"doc" : "Unique identifier of the incident."
}, {
"name" : "incident_type",
"type" : [ "null", "string" ],
"doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
"default" : null
}, {
"name" : "alarm_source_id",
"type" : "string",
"doc" : "Respective Alarm"
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the incident was generated on the node."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "ID of specific network element."
}]
}
Maintenance:
{
"type" : "record",
"name" : "Maintenance",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "maintenance_id",
"type" : "string",
"doc" : "The message number is the unique ID for every maintenance"
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "The NE ID is the network element ID on which the maintenance is done."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}, {
"name" : "end_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}]
}
I have 3 topics in my Kafka for each of these Avro (ley's say alarm_raw, incident_raw, maintenance_raw) and whenever I wanted to write into these topics I am using ne_id as a key (so the topic partitioned by ne_id). now I want to join these 3 topics and get a new record and write it into a new topic. The problem is I want to join Alarm and Incident based on alarm_id and alarm_source_id and join alarm and maintenance based on ne_id. I want to avoid creating a new topic and re-assign a new key. Is there anyway that I specify the key while I am joining?
It depends what kind of join you want to use (c.f. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
For KStream-KStream join, there is currently (v0.10.2
and earlier) no other way than setting a new key (e.g., by using selectKey()
) and do a repartitioning.
For KStream-KTable join, Kafka 0.10.2
(will be released in the next weeks) contains a new feature called GlobalKTables
(c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams). This allows you to do a non-key join on the KTable (i.e., a KStream-GlobalKTable join and thus you do not need to repartition the data in you GlobalKTable).
Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.
There are plans to add a KTable-GlobalKTable join, too. This might become available in 0.10.3
. There are no plans to add "global" KStream-KStream joins though.