Search code examples
javaspring-bootapache-kafkastreamksqldb

Is there a way to access a table created via KSQL (kafka) through spring-boot?


I am new at Kafka's universe and I am really stucked here. So, any help would be very much appreciated.

I have created a table out of a kafka stream, using the below KSQL statement:

CREATE TABLE calc AS 
SELECT id, datetime, count(*) 
FROM streamA 
GROUP BY id, datetime 
HAVING count(*) = total;

where "streamA" is a stream created by "topicA"

I am currently using:

  • Java 8,
  • Spring Boot v2.2.9

My pom.xml looks like:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- Packaging -->
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>

    <properties>
        <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    </properties>
    <!-- Versioning -->
    <groupId>some.name</groupId>
    <artifactId>kafka.project</artifactId>
    <version>2020.2.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.9.RELEASE</version>
        <relativePath />
    </parent>

    <!-- Meta-data -->
    <name>[${project.artifactId}]</name>
    <description>Kafka Project</description>

    <!-- Dependencies -->
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!-- Build settings -->
    <build>
        <!-- Plugins -->
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

So, have two questions:

  1. Is there any way to access that table via Kafka Streams API?
  2. Could I do something similar (e.g. creating that table) through my application instead of KSQL?

Thank you in advance

UPDATE Thank you for your suggestion Shrey Jakhmola (What's the way of running KSQL from spring boot app), but I have a big dataset which needs to be accessed in a regular basis. I don't think this solution would be ideal.

@Joshua Oliphant, yes this table is generated by a stream which is created from a topic.


Solution

    1. Is there any way to access that table via Kafka Streams API?

    Table calc will be backed by a changelog topic called CALC. You are free to consume this topic in your application if you need. Either using the standard consumer or Kafka Streams.

    However, if all you're wanting to do is query the current state of the table, then you can do so using ksqlDB's pull queries. These allow you to pull rows back from the table being built by ksqlDB. The functionality is basic, as its not part of the core streaming SQL that ksqlDB provides, but meets some use-cases.

    If you need something beyond this, then there are other options open to you:

    1. You can pump the result into a more traditional sql system of your choice, e.g. postgres, and query that. (You can use ksql's CREATE SINK CONNECTOR to export the data to postgres).
    2. You can consume the data in your own app using the standard Kafka client. (Though this only works well if each instance of your app can hold all the data in the table).
    3. You can use Kafka Streams within your app to consume the table. This has the benefit that multiple instances of your app can cluster together, so that each only consumes a portion of the table's data. You may then want to make use of Kafka Streams Interactive Queries to access the current state of the table.ation will load
    1. Could I do something similar (e.g. creating that table) through my application instead of KSQL?

    If you want to cut ksqlDB out of the equation, then yes, ksqlDB is internally using KAfka streams, so anything you can do with ksqlDB, you can also do directly with Kafka Streams.

    SQL like:

    CREATE TABLE calc AS 
       SELECT id, datetime, count(*) 
       FROM streamA 
       GROUP BY id, datetime 
       HAVING count(*) = total;
    

    Would map to something like (rough code):

    StreamsBuilder builder = new StreamsBuilder();
    
    builder
       .stream("streamA", Consumed.with(<appropriate serde>))
       .groupBy(<a mapper that returns id and datetime as new key>)
       .count()
       .filter(<filter>);
       .toStream()
       .to("CALC");
    
    new KafkaStreams(builder.build(), props, clients).start();