I am new at Kafka's universe and I am really stucked here. So, any help would be very much appreciated.
I have created a table out of a kafka stream, using the below KSQL statement:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
where "streamA" is a stream created by "topicA"
I am currently using:
My pom.xml looks like:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Packaging -->
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<!-- Versioning -->
<groupId>some.name</groupId>
<artifactId>kafka.project</artifactId>
<version>2020.2.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.9.RELEASE</version>
<relativePath />
</parent>
<!-- Meta-data -->
<name>[${project.artifactId}]</name>
<description>Kafka Project</description>
<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Build settings -->
<build>
<!-- Plugins -->
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
So, have two questions:
Thank you in advance
UPDATE Thank you for your suggestion Shrey Jakhmola (What's the way of running KSQL from spring boot app), but I have a big dataset which needs to be accessed in a regular basis. I don't think this solution would be ideal.
@Joshua Oliphant, yes this table is generated by a stream which is created from a topic.
- Is there any way to access that table via Kafka Streams API?
Table calc
will be backed by a changelog topic called CALC
. You are free to consume this topic in your application if you need. Either using the standard consumer or Kafka Streams.
However, if all you're wanting to do is query the current state of the table, then you can do so using ksqlDB's pull queries. These allow you to pull rows back from the table being built by ksqlDB. The functionality is basic, as its not part of the core streaming SQL that ksqlDB provides, but meets some use-cases.
If you need something beyond this, then there are other options open to you:
CREATE SINK CONNECTOR
to export the data to postgres).
- Could I do something similar (e.g. creating that table) through my application instead of KSQL?
If you want to cut ksqlDB out of the equation, then yes, ksqlDB is internally using KAfka streams, so anything you can do with ksqlDB, you can also do directly with Kafka Streams.
SQL like:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
Would map to something like (rough code):
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("streamA", Consumed.with(<appropriate serde>))
.groupBy(<a mapper that returns id and datetime as new key>)
.count()
.filter(<filter>);
.toStream()
.to("CALC");
new KafkaStreams(builder.build(), props, clients).start();