Search code examples
javaapache-sparkapache-spark-sqlspark-structured-streaming

How to define a join condition in stream-batch streaming join?


I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3.

I am trying to join static data frame i.e. meta-data with another streaming dataframe as below:

 Dataset<Row> streamingDs  = //read from kafka topic
 Dataset<Row> staticDf=  //read from oracle meta-data table.


Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code"
                      );

Even though I have respective columns data in the dataframes its giving below error.

Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column c.code = i.industry_code cannot be resolved on the left side of the join. The left-side columns: [id, tranasctionDate, companyName,code];

I tried as below:

Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code",
                      "inner"
                      );

This gives below error:

The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)


Solution

  • tl;dr c.code = i.industry_code is considered the name of a column to join on (not a join expression).


    Change the code to be as follows:

    streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
      .where("c.code = i.industry_code")