Search code examples
apache-sparkparquet

Spark thinks I'm reading DataFrame from a Parquet file


Spark 2.x here. My code:

val query = "SELECT * FROM some_big_table WHERE something > 1"

val df : DataFrame = spark.read
  .option("url",
    s"""jdbc:postgresql://${redshiftInfo.hostnameAndPort}/${redshiftInfo.database}?currentSchema=${redshiftInfo.schema}"""
  )
  .option("user", redshiftInfo.username)
  .option("password", redshiftInfo.password)
  .option("dbtable", query)
  .load()

Produces:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:183)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:183)
    at scala.Option.getOrElse(Option.scala:121)

I'm not reading anything from a Parquet file, I'm reading from a Redshift (RDBMS) table. So why am I getting this error?


Solution

  • If you use generic load function you should include format as well:

    // Query has to be subquery 
    val query = "(SELECT * FROM some_big_table WHERE something > 1) as tmp"
    
    ...
      .format("jdbc")
      .option("dbtable", query)
      .load()
    

    Otherwise Spark assumes that you use default format, which in presence of no specific configuration, is Parquet.

    Also nothing forces you to use dbtable.

    spark.read.jdbc(
      s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
      query, 
      props 
    )
    

    variant is also valid.

    And of course with such simple query all of that it is not needed:

    spark.read.jdbc(
      s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
      some_big_table, 
      props 
    ).where("something > 1")
    

    will work the same way, and if you want to improve performance you should consider parallel queries

    or even better, try Redshift connector.