Search code examples
javaapache-sparkapache-spark-sqlspark-structured-streamingapache-pulsar

Feed the result of one query to another in the same Spark Structured Streaming app


I have just started working on Spark Structured Streaming and came up with an implementation question.

So I am using Apache Pulsar to stream data, and wanted to know if it is possible to run different queries in the same program, and either have the results joined, feed the result of one query to the other without putting the result into another topic or sink.

To give an example, for schema,

root
 |-- amount: long (nullable = true)
 |-- cardNum: string (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

The processor code

public class CardTransactionStreamProcessor {

    public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
    public static final String TOPIC = "spark/tutorial/card-txn";
    public static final String SUB = "my-sub";

    public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {

        SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");

        sparkSession.sparkContext().setLogLevel("error");

        Dataset<Row> lines = sparkSession.readStream()
                .format("pulsar")
                .option("service.url", PULSAR_SERVICE_URL)
                .option("topic", TOPIC)
                .option("startingOffsets", "earliest")
                .load();

        lines.printSchema();

        Dataset<CardTransactionDTO> cardTransactionDTODataset = lines.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));

        cardTransactionDTODataset.printSchema();

        // Top spends merchant vise
        cardTransactionDTODataset.groupBy("merchant")
                .agg(sum("amount").alias("amount"))
                .sort("merchant")
                .writeStream().outputMode("complete")
                .format("console")
                .start();

        // Top spends category vise
        cardTransactionDTODataset.groupBy("category")
                .agg(sum("amount").alias("amount"))
                .sort("category")
                .writeStream().outputMode("complete")
                .format("console").start();

        sparkSession.streams().awaitAnyTermination();
    }
}

In the above sample, I'd like to understand, either how to take the output of first query and feed it to the second query, or join the results of both the queries forming a single DataFrame, that I may take to sink, say pulsar topic itself.


Solution

  • Yes, it's possible using Dataset<T>.

    If you want to combine the output of two queries which are in the Dataset<T>, you can combine using union method present in the Dataset.

    Try this updated code:

    package org.example;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
    import org.apache.spark.sql.streaming.DataStreamWriter;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import static org.apache.spark.sql.functions.sum;
    import ....
    
    public class CardTransactionStreamProcessor {
    
        public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
        public static final String TOPIC = "spark/tutorial/card-txn";
        public static final String SUB = "my-sub";
    
        public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {
    
            SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
    
            sparkSession.sparkContext().setLogLevel("error");
    
            Dataset<Row> lines = sparkSession.readStream().format("pulsar").option("service.url", PULSAR_SERVICE_URL)
                    .option("topic", TOPIC).option("startingOffsets", "earliest").load();
    
            lines.printSchema();
    
            // Top spends merchant vise
            Dataset<Row> spendsByMerchantWise = lines.groupBy("merchant").agg(sum("amount").alias("amount"))
                    .sort("merchant");
    
            // Top spends category vise
            Dataset<Row> spendsByCatgoryWise = lines.groupBy("category").agg(sum("amount").alias("amount"))
                    .sort("category");
    
            // union of rows from both Data sets.
            Dataset<Row> union = spendsByMerchantWise.union(spendsByCatgoryWise);
    
            // convert to Dataset<CardTransactionDTO>
            Dataset<CardTransactionDTO> cardTransactionDTODataset = union
                    .as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
    
            cardTransactionDTODataset.writeStream().outputMode("complete").format("console").start();
    
            sparkSession.streams().awaitAnyTermination();
        }
    }