I have INPUT_TOPIC
in Kafka that gets data published by some Kafka Connector.
I want to take this data and push it to another Kafka topic, called OUTPUT_TOPIC
.
First I go ahead and create a STREAM_A
that reads data from INPUT_TOPIC
. It will have two fields I am interested in: the timestamp
and dimension
:
CREATE STREAM STREAM_A (`timestamp` BIGINT, DIMENSION STRUCT<HEIGHT BIGINT, WIDTH BIGINT>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', PARTITIONS=6, VALUE_FORMAT = 'JSON');
After STREAM_A
was created I see that it is receiving the data from INPUT_TOPIC
.
Since I want this data to be written to a second OUTPUT_TOPIC
I create a second stream STREAM_B
that SELECT FROM STREAM_A
and writes to OUTPUT_TOPIC
:
CREATE STREAM STREAM_B WITH (KAFKA_TOPIC='OUTPUT_TOPIC') AS SELECT `timestamp`, DIMENSION FROM STREAM_A;
It works as is. But there are now two streams to deal with. I wonder if instead of creating two streams STREAM_A
and STREAM_B
I could create a single stream that would read the data from INPUT_TOPIC
and write to OUTPUT_TOPIC
Is there a way to create such stream or I would have to have two streams?
Would it be possible to consolidate two streams into one so this consolidated stream reads data from INPUT_TOPIC
and writes to OUTPUT_TOPIC
?
STREAM_A
provides a namespace to query INPUT_TOPIC
STREAM_B
is a derived stream that queries against STREAM_A
, and there is no other operator for "copy topic to topic" (as bytes) in KSQL. Although, SELECT * FROM
may work, in theory, rather than giving every field.
Note: If you simply want to copy one topic into another without any projection operation or to update partitions/replication, then IMO you'd be wasting space on the cluster, but you could just do the same with simply kStream.stream("input-topic").to("output-topic")
in a JVM app, or use alternative methods like MirrorMaker.