I was testing some spark behavior but I'm getting an error which I'm not understanding. With this simple piece of code, which starts two streams, one rate and one that is reading from hdfs.
val df = spark.readStream
.format("rate")
.option("rowsPerSecond", "1")
.load() // Utilizza il valore di n qui
val schema = StructType(
Array(StructField("num", IntegerType), StructField("k", IntegerType))
)
df.withColumn("k", lit(10)).createOrReplaceTempView("temp")
spark.readStream.schema(schema).parquet("core/src/test/resources/hdfs/test").createOrReplaceTempView("temp2")
spark.sql("select 10 k,5 num").write.mode(SaveMode.Overwrite).parquet("core/src/test/resources/hdfs/test")
scheduler.schedule(
() => {
spark.sql("select 10 k , 10 num").write.mode(SaveMode.Append).parquet("core/src/test/resources/hdfs/test")
""
},
10,
TimeUnit.SECONDS
)
During join I'm getting: Caused by: org.apache.spark.sql.AnalysisException: Stream-stream join without equality predicate is not supported, but I'm using a simple inner join.
spark
.sql("""select * from temp t join temp2 t2 on t.k = t2.k """)
.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(15000))
.format("console")
.start()
What I'm doing wrong? thanks
The problem was that spark was optimizing the query plan, transforming the join condition to a simple filter, final query plan was:
Join Inner
:- Project [timestamp#0, value#1L, 10 AS k#4]
: +- StreamingDataSourceV2Relation [timestamp#0, value#1L], org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1@51721270, RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default, 0, 0
+- Filter (isnotnull(k#29) AND (10 = k#29))
+- Relation [num#28,k#29] parquet
+- Filter (isnotnull(k#29) AND (10 = k#29)) -> has been extracted from join since "lit(10)" was a fixed value.
To workaround that I've replaced lit with a fake random extraction
df.withColumn("k", when(rand() < 0.5,10).otherwise(10)).createOrReplaceTempView("temp")
This way no optimization is applied and the plan is the expected one with inner condition applied
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@56dd845b, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2466/410097454@add5533
+- StreamingSymmetricHashJoin [k#4], [k#71], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/C:/Users/dbelvedere/AppData/Local/Temp/temporary-b43c4981-b812-4423-b91f-67aa9946cb40/state, runId = d4ac08d3-a7fe-4ba0-9751-82ecaaa4a42f, opId = 0, ver = 1, numPartitions = 1], 0, state cleanup [ left = null, right = null ], 2
:- *(1) Project [timestamp#0, value#1L, CASE WHEN (rand(-5424392504616971981) < 1.0E-7) THEN 10 ELSE 10 END AS k#4]
: +- *(1) Project [timestamp#0, value#1L]
: +- MicroBatchScan[timestamp#0, value#1L] class org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1
+- Exchange hashpartitioning(k#71, 1), ENSURE_REQUIREMENTS, [id=#266]
+- *(2) Filter isnotnull(k#71)
+- *(2) ColumnarToRow
+- FileScan parquet [num#70,k#71] Batched: true, DataFilters: [isnotnull(k#71)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/dbelvedere/IdeaProjects/bee/core/src/test/resources/hdf..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<num:int,k:int>