Search code examples
scalaapache-sparkgreenplum

How to specify subquery in the option "dbtable" in Spark-jdbc application while reading data from a table on Greenplum?


I am trying to read data from a table on Greenplum into HDFS using Spark. I gave a subquery in options to read the greenplum table as below.

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"

println("ExecQuery: " + execQuery)

val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
                     .option("dbtable", execQuery)
                     .option("user", devUsrName).option("password", devPwd)
                     .option("partitionColumn","id")
                     .option("lowerBound", 165512)
                     .option("upperBound", 11521481695656862L)
                     .option("numPartitions",300).load()

When I run the code, I see the following exception:

Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)

The exception show: public as the dbname and the subquery (execQuery) as the tablename

I tried to give the exec query as:

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"

or

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"

None of them are working. I am using the jar: greenplum-spark_2.11-1.4.0.jar to read data from greenplum. Below is the spark-submit I tried to use:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar

I wrote the code by referring the instructions from the greenplumn documentation: https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html

I am unable to identify the mistake I made here. Could anyone let me know how can I fix the issue?


Solution

  • Option to replace dbtable with subquery is a feature of the built-in JDBC data source. However Greenplum Spark Connector doesn't seem to provide such capabilities.

    Specifically the source is identified by dbschema and dbtable where the latter one should be (emphasis mine):

    The name of the Greenplum Database table. When reading from Greenplum Database, this table must reside in the Greenplum Database schema identified in the dbschema option value.

    This explains the exception you get.

    At the same time nothing in the code you've shared indicates that you actually need such feature. Since you don't apply any database specific logic the process might be simply rewritten as

    import org.apache.spark.sql.functions.{col, lit}
    
    val allColumns: Seq[String] = ???
    
    val dataDF = spark.read.format("greenplum")
      .option("url", conUrl)
      .option("dbtable", "xx_lines")
      .option("dbschema", "dbanscience")
      .option("partitionColumn", "id")
      .option("user", devUsrName)
      .option("password", devPwd)
      .load()
      .where("year = 2017 and month=12")
      .select(allColumns map col:_*)
      .withColumn(flagCol, lit(0))
    

    Please note that other options you use (upperBound, lowerBound, numPartitions) are neither supported nor required.

    According to the official documentation:

    Greenplum Database stores table data across segments. A Spark application using the Greenplum-Spark Connector to load a Greenplum Database table identifies a specific table column as a partition column. The Connector uses the data values in this column to assign specific table data rows on each Greenplum Database segment to one or more Spark partitions.

    So as you see distribution mechanism is completely different from the built-in JDBC source.

    Connector also provides an additional partitionsPerSegment option which sets:

    The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.