Basic task
I have 2 identical streams in Kafka in Avro format. I'm trying to do basic left join with those 2 streams.
Keys
For keys in both topics I'm using timestamp rounded to milliseconds since both stream have data originating from IoT device that is generating measurement exactly every 20ms and both devices are synchronized to UTC Time Now.
Done so far
I've been able to produce Kafka stream that's transforming one stream to topic following this tutorial, but unfortunately basic stream-stream join tutorial doesn't exists on Confluent developer page.
Avro Java serialization classes
I've generated 3 SpecificAvroSerde
classes based on 2 inputs and output.
Although input streams are identical I've created separate schemas/classes in case streams would have different schemas in future.
Avro Java classes are generated during build time whiteout problems.
So this is schema of input, output and Joined stream:
{
"namespace": "pmu.serialization.avro",
"name": "RawPMU_214",
"type": "record",
"fields": [
{"name": "pmu_id", "type": "int"},
{"name": "time", "type":"string"},
{"name": "time_rounded", "type":"string"},
{"name": "stream_id","type":"int"},
{"name": "stat", "type":"string"},
{"name": "ph_i1_r","type":"float"},
{"name": "ph_i1_j","type":"float"},
{"name": "ph_i2_r","type":"float"},
{"name": "ph_i2_j","type":"float"},
{"name": "ph_i3_r","type":"float"},
{"name": "ph_i3_j","type":"float"},
{"name": "ph_v4_r","type":"float"},
{"name": "ph_v4_j","type":"float"},
{"name": "ph_v5_r","type":"float"},
{"name": "ph_v5_j","type":"float"},
{"name": "ph_v6_r","type":"float"},
{"name": "ph_v6_j","type":"float"},
{"name": "ph_7_r","type":"float"},
{"name": "ph_7_j","type":"float"},
{"name": "ph_8_r","type":"float"},
{"name": "ph_8_j","type":"float"},
{"name": "analog","type":"string"},
{"name": "digital","type":"string"},
{"name": "frequency","type":"float"},
{"name": "rocof","type":"int"},
{"name": "orderCount","type":"int"}
]
}
Code
Key problem is that I don't know how to properly implement this part with value joiner:
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
I tried various answers but I haven't really found any example of full Java code for stream-stream join for SpecificAvroSerde
.
Full code at this point:
package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class TransformStream_join {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
// Define input PMU topics
final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
final String outputTopic = allProps.getProperty("output.topic.name");
KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
return builder.build();
}
private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
raw_pmu214AvroSerde.configure((Map)allProps, false);
return raw_pmu214AvroSerde;
}
private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
raw_pmu218AvroSerde.configure((Map)allProps, false);
return raw_pmu218AvroSerde;
}
private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
raw_outAvroSerde.configure((Map)allProps, false);
return raw_outAvroSerde;
}
public void createTopics(Properties allProps) {
AdminClient client = AdminClient.create(allProps);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu1"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu2"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
TransformStream ts = new TransformStream();
Properties allProps = ts.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
Topology topology = ts.buildTopology(allProps);
ts.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
KStream join: I've simplified joining stream code since I've created joiner class
KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
PMUJoiner class
package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {
public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
return RawPMU_Joined.newBuilder()
// PMU 218
.setTimeRounded1(pmu218Stream.getTimeRounded())
.setOrderCount1(pmu218Stream.getOrderCount())
.setPhI1R1(pmu218Stream.getPhI1R())
.setPhI1J1(pmu218Stream.getPhI1J())
.setPhI2R1(pmu218Stream.getPhI2R())
.setPhI2J1(pmu218Stream.getPhI2J())
.setPhI3R1(pmu218Stream.getPhI3R())
.setPhI3J1(pmu218Stream.getPhI3J())
.setPhV4R1(pmu218Stream.getPhV4R())
.setPhV4J1(pmu218Stream.getPhV4J())
.setPhV5R1(pmu218Stream.getPhV5R())
.setPhV5J1(pmu218Stream.getPhV5J())
.setPhV6R1(pmu218Stream.getPhV6R())
.setPhV6J1(pmu218Stream.getPhV6J())
.setPh7R1(pmu218Stream.getPh7R())
.setPh7J1(pmu218Stream.getPh7J())
.setPh8R1(pmu218Stream.getPh8R())
.setPh8J1(pmu218Stream.getPh8J())
//PMU 214
.setTimeRounded2(pmu214Stream.getTimeRounded())
.setOrderCount2(pmu214Stream.getOrderCount())
.setPhI1R2(pmu214Stream.getPhI1R())
.setPhI1J2(pmu214Stream.getPhI1J())
.setPhI2R2(pmu214Stream.getPhI2R())
.setPhI2J2(pmu214Stream.getPhI2J())
.setPhI3R2(pmu214Stream.getPhI3R())
.setPhI3J2(pmu214Stream.getPhI3J())
.setPhV4R2(pmu214Stream.getPhV4R())
.setPhV4J2(pmu214Stream.getPhV4J())
.setPhV5R2(pmu214Stream.getPhV5R())
.setPhV5J2(pmu214Stream.getPhV5J())
.setPhV6R2(pmu214Stream.getPhV6R())
.setPhV6J2(pmu214Stream.getPhV6J())
.setPh7R2(pmu214Stream.getPh7R())
.setPh7J2(pmu214Stream.getPh7J())
.setPh8R2(pmu214Stream.getPh8R())
.setPh8J2(pmu214Stream.getPh8J())
.build();
}
}
Error
...pmuStream01/src/main/java/io/confluent/developer/JoinPMUStreams.java:46: error: no suitable method found for join(org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>,io.confluent.developer.PMUJoiner,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,pmu.serialization.avro.RawPMU_218>) KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, ^ method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows) is not applicable (cannot infer type-variable(s) VO,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.StreamJoined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VT>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>) is not applicable (cannot infer type-variable(s) GK,GV,RV (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>,org.apache.kafka.streams.kstream.Named) is not applicable (cannot infer type-variable(s) GK,GV,RV (argument mismatch; org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218> cannot be converted to org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>))
Don't know why's that happening since I believe I've properly supplied all arguments with correct return types.
I'd suggest starting with this - a joiner function that accepts two Avro objects and returns a third (optionally, Avro) one.
(leftValue, rightValue) -> {
RawPMU_Joined j = new RawPMU_Joined();
j.set...
return j;
}
There are generic Avro examples you can follow in the confluent-examples repo on Github; there shouldn't need to be one for specific records as it's just a different object you're returning, however it wouldn't be a string.