Search code examples
sql-serverapache-sparkjdbcazure-databricks

Pushdown query in (Spark and) Databricks doesn't work for more complex sql queries?


I'm new to databricks so hope my question is not too off. I'm trying to run the following sql pushdown query in databricks notebook to get data from an on-premise sql server using following python code:

pushdown_query1 = """(select * from dbo.myTabel X
                        INNER JOIN
                            (select Interval_Time, max(Run_Time) as Run_Time 
                            from dbo.myTabel 
                            group by Interval_Time) Y 
                                on X.Interval_Time = y.Interval_Time 
                                and X.Run_Time = y.Run_Time
                    WHERE RUNTYPE='TYPE_A') df"""

pushdown_query2 = """(select * from dbo.myTabel where RUNTYPE='TYPE_A') df"""

# connection information
jdbc_hostname = '<...>'
database = '<..>'
port = '1433'
username = '<...>'
password = '<...>'

connection_details = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }

jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(jdbc_hostname, port, database)
# load stpas regionsolution data
df = spark.read.format("jdbc") \
          .option("url",jdbc_url) \
          .option("dbtable", pushdown_query1 )\
          .option("user", username) \
          .option("password", password) \
          .load()

df.show(100, truncate=False)

I don't have any issue when I choose pushdown_query2 and I get what I want, but pushdown_query1 returns an error, which I'm not sure if it's because I have JOINs and Sub-queries in the statement or there are other issues.

error after choosing pushdown_query1:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-2825826808916915> in <module>
     18             .option("dbtable", query)\
     19             .option("user", username) \
---> 20             .option("password", password) \
     21             .load()
     22 

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o586.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: The column 'Interval_Time' was specified multiple times for 'df'.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1632)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:600)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:522)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:247)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:222)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:444)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • You are getting the error because you are doing the join on the same table and using '*' in the select statement. If you specify the columns explicitly based on the aliases you specify for each queries then you won't see the error that you are getting.

    In your case the column Interval_Time seems to be getting duplicated as you are selecting that in the both the queries used in the joins. So specify the columns explicitly and it should work.