I try to run the following SQL query in pyspark (on Spark 1.5.0):
SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2
This is how the pyspark commands look like:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")
b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")
result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()
But it produces this Error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql.
: java.lang.RuntimeException: [1.67] failure: ``union'' expected but identifier CROSS found
SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175)
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:42)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
As far as i know CROSS JOIN as well as Sub-Queries (in FROM) should be supported by Spark SQL. Do you have any suggestions how to fix it?
Thanks Timo
Using HiveContext instead of SQLContext seems to solve the Problem. This solution is working fine:
from pyspark.sql import HiveContext
sqlCtx = HiveContext(sc)
a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")
b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")
result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()
But in my understanding this should also work with SQLContext by Spark SQL as far as Sub-Queries and Cross Join are supported...