Search code examples
scalaapache-sparkfilter

Filter data while reading it from SQL server based on prior dataframe value


I have two massive tables that are on Azure SQL servers that I am trying to get into my notebook. The first table is filtered on date, but to filter the second table, I need to find the minimum of the primary key of the second table, and import it. Basically this is what I am trying:



val date = "make_date(2023,06,01)"

val TableA= spark.read.synapsesql("sqlserver.dbo.table1")
.select("columnA"
,"columnB"
,"DateColumn")
.toDF()
.filter(expr("DateColumn>="+date))
.repartition(20)
.createOrReplaceTempView("TableA")


val min = spark.sql("""select min(columnA) as mincolA from TableA""".stripMargin)



val TableB = spark.read.synapsesql("sqlserver.dbo.tableB")
.select("columnA"
,"columnC")
.filter(expr("columnA >="+min))

This is giving me the following error when trying to filter:


org.apache.spark.sql.catalyst.parser.ParseException:
===SQL===

columnA >= [min: bigint]

How can i fix this? Any help is much appreciated! I don't want to pull in a billion records if it can be avoided


Solution

  • In provided code, type of val min is Dataframe (not a date). For get date, getAs[Date] can be used:

    val min = spark.sql("""select min(columnA) as mincolA from TableA""".stripMargin)
    .first().getAs[Date]("mincolA")