Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

Stream-stream join without equality predicate is not supported, but equality predicate is present


I was testing some spark behavior but I'm getting an error which I'm not understanding. With this simple piece of code, which starts two streams, one rate and one that is reading from hdfs.

    val df = spark.readStream
      .format("rate")
      .option("rowsPerSecond", "1")
      .load() // Utilizza il valore di n qui

    val schema = StructType(
      Array(StructField("num", IntegerType), StructField("k", IntegerType))
    )
    df.withColumn("k", lit(10)).createOrReplaceTempView("temp")

    spark.readStream.schema(schema).parquet("core/src/test/resources/hdfs/test").createOrReplaceTempView("temp2")
    spark.sql("select 10 k,5 num").write.mode(SaveMode.Overwrite).parquet("core/src/test/resources/hdfs/test")

    scheduler.schedule(
      () => {
        spark.sql("select 10 k , 10 num").write.mode(SaveMode.Append).parquet("core/src/test/resources/hdfs/test")

        ""
      },
      10,
      TimeUnit.SECONDS
    )

During join I'm getting: Caused by: org.apache.spark.sql.AnalysisException: Stream-stream join without equality predicate is not supported, but I'm using a simple inner join.

    spark
      .sql("""select * from temp t join temp2 t2 on t.k = t2.k """)
      .writeStream
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(15000))
      .format("console")
      .start()

What I'm doing wrong? thanks


Solution

  • The problem was that spark was optimizing the query plan, transforming the join condition to a simple filter, final query plan was:

    Join Inner
    :- Project [timestamp#0, value#1L, 10 AS k#4]
    :  +- StreamingDataSourceV2Relation [timestamp#0, value#1L], org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1@51721270, RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default, 0, 0
    +- Filter (isnotnull(k#29) AND (10 = k#29))
       +- Relation [num#28,k#29] parquet
    

    +- Filter (isnotnull(k#29) AND (10 = k#29)) -> has been extracted from join since "lit(10)" was a fixed value.

    To workaround that I've replaced lit with a fake random extraction

        df.withColumn("k", when(rand() < 0.5,10).otherwise(10)).createOrReplaceTempView("temp")
    

    This way no optimization is applied and the plan is the expected one with inner condition applied

    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@56dd845b, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2466/410097454@add5533
    +- StreamingSymmetricHashJoin [k#4], [k#71], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/C:/Users/dbelvedere/AppData/Local/Temp/temporary-b43c4981-b812-4423-b91f-67aa9946cb40/state, runId = d4ac08d3-a7fe-4ba0-9751-82ecaaa4a42f, opId = 0, ver = 1, numPartitions = 1], 0, state cleanup [ left = null, right = null ], 2
       :- *(1) Project [timestamp#0, value#1L, CASE WHEN (rand(-5424392504616971981) < 1.0E-7) THEN 10 ELSE 10 END AS k#4]
       :  +- *(1) Project [timestamp#0, value#1L]
       :     +- MicroBatchScan[timestamp#0, value#1L] class org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1
       +- Exchange hashpartitioning(k#71, 1), ENSURE_REQUIREMENTS, [id=#266]
          +- *(2) Filter isnotnull(k#71)
             +- *(2) ColumnarToRow
                +- FileScan parquet [num#70,k#71] Batched: true, DataFilters: [isnotnull(k#71)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/dbelvedere/IdeaProjects/bee/core/src/test/resources/hdf..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<num:int,k:int>