Search code examples
apache-sparkamazon-emr

Spark 2.3.1 AWS EMR not returning data for some columns yet works in Athena/Presto and Spectrum


I am using PySpark on Spark 2.3.1 on AWS EMR (Python 2.7.14)

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.debug.maxToStringFields", 100) \
    .enableHiveSupport() \
    .getOrCreate()


spark.sql('select `message.country` from datalake.leads_notification where `message.country` is not null').show(10)

This returns no data, 0 rows found. Every value for each row in above table is returned Null. Data is stored in PARQUET.

When I ran same SQL query on AWS Athena/Presto or on AWs Redshift Spectrum then I get all column data returned correctly (most column values are not null).

This is the Athena SQL and Redshift SQL query that returns correct data:

select "message.country" from datalake.leads_notification where "message.country" is not null limit 10;

I use AWS Glue catalog in all cases. The column above is NOT partitioned but the table is partitioned on other columns. I tried to use repair table, it did not help. i.e. MSCK REPAIR TABLE datalake.leads_notification

i tried Schema Merge = True like so:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("spark.sql.parquet.mergeSchema", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.debug.maxToStringFields", 200) \
    .enableHiveSupport() \
    .getOrCreate()

No difference, still every value of one column is nulls even though some are not null.

This column was added as the last column to the table so most data is indeed null but some rows are not null. The column is listed at last on the column list in catalog, sitting just above the partitioned columns.

Nevertheless Athena/Presto retrieves all non-null values OK and so does Redshift Spectrum too but alas EMR Spark 2.3.1 PySpark shows all values for this column as "null". All other columns in Spark are retrieved correctly.

Can anyone help me to debug this problem please?

Hive Schema is hard to cut and paste here due to output format.

***CREATE TABLE datalake.leads_notification(
  message.environment.siteorigin string, 
  dcpheader.dcploaddateutc string, 
  message.id int, 
  message.country string, 
  message.financepackage.id string, 
  message.financepackage.version string)
PARTITIONED BY ( 
  partition_year_utc string, 
  partition_month_utc string, 
  partition_day_utc string, 
  job_run_guid string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://blahblah/leads_notification/leads_notification/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='weekly_datalake_crawler', 
  'averageRecordSize'='3136', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='2', 
  'recordCount'='897025', 
  'sizeKey'='1573529662', 
  'spark.sql.create.version'='2.2 or prior', 
  'spark.sql.sources.schema.numPartCols'='4', 
  'spark.sql.sources.schema.numParts'='3', 
  'spark.sql.sources.schema.partCol.0'='partition_year_utc', 
  'spark.sql.sources.schema.partCol.1'='partition_month_utc', 
  'spark.sql.sources.schema.partCol.2'='partition_day_utc', 
  'spark.sql.sources.schema.partCol.3'='job_run_guid', 
  'typeOfData'='file')***

Last 3 columns all have the same problems in Spark:

message.country string, 
message.financepackage.id string, 
message.financepackage.version string

All return OK in Athena/Presto and Redshift Spectrum using same catalog.

I apologize for my editing.

thank you


Solution

  • do step 5 schema inspection: http://www.openkb.info/2015/02/how-to-build-and-use-parquet-tools-to.html

    my bet is these new column names in parquet definition are either upper case (while other column names are lower case) or new column names in parquet definition are either lower case (while other column names are upper case)

    see Spark issues reading parquet files https://medium.com/@an_chee/why-using-mixed-case-field-names-in-hive-spark-sql-is-a-bad-idea-95da8b6ec1e0