We use a databricks cluster, that is shutdown after 30 mins of inactivity(13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)). My objective is to read a redshift table and write it to snowflake, I am using the following code:
df = spark.read \
.format("redshift") \
.option("url", jdbc_url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "schem_name.table_name") \
.option("partitionColumn", "date_col1")\
.option("lowerBound", "2023-11-05")\
.option("upperBound", "2024-03-23")\
.option("numPartitions", "500")\
.load()\
.filter("date_col1>dateadd(month ,-6,current_date)")\
.filter(col("country_col").isin('India', 'China', 'Japan', 'Germany', 'United Kingdom', 'Brazil', 'United States', 'Canada'))
df1 = df.repartition(900)#Data is skedwed for that partition column, so repartitioning to 1* num cores in cluster for even dist
df1.write.format("snowflake") \
.option("host", host_sfl) \
.option("user", user_sfl) \
.option('role', role_sfl) \
.option("password", password_sfl) \
.option("database", database_sfl) \
.option("sfWarehouse", warehouse_sfl) \
.option("schema",'schema_name')\
.option("dbtable",'target_table_name')\
.mode('Overwrite') \
.save()
It throws the following error, despite not having used query option in my code:
IllegalArgumentException: requirement failed:
Options 'query' and 'partitionColumn' can not be specified together.
Please define the query using `dbtable` option instead and make sure to qualify
the partition columns using the supplied subquery alias to resolve any ambiguity.
Example :
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
When I comment out the repartition and write to snowflake code and just do a count, it gives me the correct count.
Here is another observation: If I change the above code to JDBC instead of redshift in .format("redshift") after doing the count, the code works.
I don't know what is happening here. The job keeps failing the first time the cluster is restarted, and I have to do a count first manually and change it to JDBC to work, Please let me know if I am missing something obvious. I have gone through a lot of documentation and couldn't find what I need.
So this is what I ended up doing that worked.
count_df = spark.read \
.format("com.databricks.spark.redshift") \
.option("dbtable", tbl1) \
.option("url", url) \
.option("user", user) \
.option("password", pwd) \
.load()\
.limit(1)
count_df.count()```
And then the code in the question started working, a dummy count action with a different driver parameter(com.databricks.spark.redshift) before running the code in the question.