Search code examples
apache-kafkaapache-kafka-streamsksqldb

How to create Kafka Stream that reads from one topic and writes to another


I have INPUT_TOPIC in Kafka that gets data published by some Kafka Connector.

I want to take this data and push it to another Kafka topic, called OUTPUT_TOPIC.

First I go ahead and create a STREAM_A that reads data from INPUT_TOPIC. It will have two fields I am interested in: the timestamp and dimension:

CREATE STREAM STREAM_A (`timestamp` BIGINT, DIMENSION STRUCT<HEIGHT BIGINT, WIDTH BIGINT>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', PARTITIONS=6, VALUE_FORMAT = 'JSON');

After STREAM_A was created I see that it is receiving the data from INPUT_TOPIC.

Since I want this data to be written to a second OUTPUT_TOPIC I create a second stream STREAM_B that SELECT FROM STREAM_A and writes to OUTPUT_TOPIC:

CREATE STREAM STREAM_B WITH (KAFKA_TOPIC='OUTPUT_TOPIC') AS SELECT `timestamp`, DIMENSION FROM STREAM_A;

It works as is. But there are now two streams to deal with. I wonder if instead of creating two streams STREAM_A and STREAM_B I could create a single stream that would read the data from INPUT_TOPIC and write to OUTPUT_TOPIC

Is there a way to create such stream or I would have to have two streams? Would it be possible to consolidate two streams into one so this consolidated stream reads data from INPUT_TOPIC and writes to OUTPUT_TOPIC?


Solution

  • STREAM_A provides a namespace to query INPUT_TOPIC

    STREAM_B is a derived stream that queries against STREAM_A, and there is no other operator for "copy topic to topic" (as bytes) in KSQL. Although, SELECT * FROM may work, in theory, rather than giving every field.

    Note: If you simply want to copy one topic into another without any projection operation or to update partitions/replication, then IMO you'd be wasting space on the cluster, but you could just do the same with simply kStream.stream("input-topic").to("output-topic") in a JVM app, or use alternative methods like MirrorMaker.