I found in the document for jdbc function in PySpark 3.0.1
at
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader, it says:
column – the name of a column of numeric, date, or timestamp type that will be used for partitioning.
I thought it accepts a datetime column to partition the query.
So I tried this on EMR-6.2.0
(PySpark 3.0.1):
sql_conn_params = get_spark_conn_params() # my function
sql_conn_params['column'] ='EVENT_CAPTURED'
sql_conn_params['numPartitions'] = 8
# sql_conn_params['upperBound'] = datetime.strptime('2016-01-01', '%Y-%m-%d') # another trial
# sql_conn_params['lowerBound'] = datetime.strptime(''2016-01-10', '%Y-%m-%d')
sql_conn_params['upperBound'] = '2016-01-01 00:00:00'
sql_conn_params['lowerBound'] = '2016-01-10 00:00:00'
df = (spark.read.jdbc(
table=tablize(sql),
**sql_conn_params
))
df.show()
I got this error:
invalid literal for int() with base 10: '2016-01-01 00:00:00'
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 625, in jdbc
return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
ValueError: invalid literal for int() with base 10: '2016-01-01 00:00:00'
I looked at the source code here https://github.com/apache/spark/blob/master/python/pyspark/sql/readwriter.py#L865 and found it doesn't support datetime type as document says.
My question is:
It doesn't support datetime type partition column in PySpark as the code shows, but why the document says it supports it?
Thanks,
Yan
It supports.
The issue here is that the spark.read.jdbc
method currently only supports parameters upper/lower bounds for integral type columns.
But you can use load
method and DataFrameReader.option
to specifiy upperBound
and lowerBound
for other column types date/timestamp :
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://server/db") \
.option("dbtable", "table_name") \
.option("user", "user") \
.option("password", "xxxx") \
.option("partitionColumn", "EVENT_CAPTURED") \
.option("lowerBound", "2016-01-01 00:00:00") \
.option("upperBound", "2016-01-10 00:00:00") \
.option("numPartitions", "8") \
.load()
Or by passing dict of options:
df = spark.read.format("jdbc") \
.options(*sql_conn_params)\
.load()
You can see all available options and examples here: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html