Search code examples
apache-sparkpysparkapache-spark-sqlspark-jdbc

PySpark pyspark.sql.DataFrameReader.jdbc() doesn't accept datetime type upperbound argument as document says


I found in the document for jdbc function in PySpark 3.0.1 at https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader, it says:

column – the name of a column of numeric, date, or timestamp type that will be used for partitioning.

I thought it accepts a datetime column to partition the query.

So I tried this on EMR-6.2.0 (PySpark 3.0.1):

sql_conn_params = get_spark_conn_params()  # my function
sql_conn_params['column'] ='EVENT_CAPTURED'
sql_conn_params['numPartitions'] = 8
# sql_conn_params['upperBound'] = datetime.strptime('2016-01-01', '%Y-%m-%d') # another trial
# sql_conn_params['lowerBound'] = datetime.strptime(''2016-01-10', '%Y-%m-%d')
sql_conn_params['upperBound'] = '2016-01-01 00:00:00'
sql_conn_params['lowerBound'] = '2016-01-10 00:00:00'
df = (spark.read.jdbc(
    table=tablize(sql),
    **sql_conn_params
))
df.show()

I got this error:

invalid literal for int() with base 10: '2016-01-01 00:00:00'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 625, in jdbc
    return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
ValueError: invalid literal for int() with base 10: '2016-01-01 00:00:00'

I looked at the source code here https://github.com/apache/spark/blob/master/python/pyspark/sql/readwriter.py#L865 and found it doesn't support datetime type as document says.

My question is:

It doesn't support datetime type partition column in PySpark as the code shows, but why the document says it supports it?

Thanks,

Yan


Solution

  • It supports.

    The issue here is that the spark.read.jdbc method currently only supports parameters upper/lower bounds for integral type columns.

    But you can use load method and DataFrameReader.option to specifiy upperBound and lowerBound for other column types date/timestamp :

    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://server/db") \
        .option("dbtable", "table_name") \
        .option("user", "user") \
        .option("password", "xxxx") \
        .option("partitionColumn", "EVENT_CAPTURED") \
        .option("lowerBound", "2016-01-01 00:00:00") \
        .option("upperBound", "2016-01-10 00:00:00") \
        .option("numPartitions", "8") \
        .load()
    

    Or by passing dict of options:

    df = spark.read.format("jdbc") \
        .options(*sql_conn_params)\
        .load()
    

    You can see all available options and examples here: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html