Search code examples
hiveapache-spark-sqlparquetcloudera-cdh

Corrupt Decimal value when querying a hive on parquet table from spark


Spark is returning garbage/incorrect values for decimal fields when querying an external hive table on parquet in Spark code using Spark SQL.

In my application flow, a spark process originally writes data to these parquet files directly into HDFS on which external Hive table exists. Incorrect data is fetched when the second Spark process is trying to consume from Hive table using Spark-SQL.

Scenario steps: This is a simple demo reproducing the issue:

  1. Write to Parquet: I am writing data to parquet file in HDFS, Spark itself assumes precision for decimal fields as Decimal(28,26).

    scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
    df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    scala> df.schema
    res0: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,false), StructField(value,DecimalType(28,26),false))
    scala> df.show
    +-----+--------------------+
    | name|               value|
    +-----+--------------------+
    |dummy|10.70000000000000...|
    +-----+--------------------+
    scala> df.write.option("overwrite",true).parquet("/my/hdfs/location/test")
    
  2. Read parquet file: to see if value is correctly written.

    scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("/tenants/gwm/morph/test/tablePrecisionTest/test")
    df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    scala> df_parq.show
    +-------+--------------------+
    |   name|               value|
    +-------+--------------------+
    |  dummy|10.70000000000000...|
    +-------+--------------------+ 
    
  3. Create external hive table: on top of parquet location with Decimal field as Decimal(18,6).

    hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
    
  4. Run Hive query in beeline: to verify that correct data is returned.

    hive> select * from db1.test_precision;
    +----------------------+-----------------------+--+
    | test_precision.name  | test_precision.value  |
    +----------------------+-----------------------+--+
    | dummy                | 10.7                  |
    +----------------------+-----------------------+--+
    
  5. Run same query using Spark Sql: Incorrect decimal values are produced.

    scala> val df_hive = spark.sql("select * from db1.test_precision")
    df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(18,6)]
    scala> df_hive.show
    +-----+-----------+
    | name|      value|
    +-----+-----------+
    |dummy|-301.989888|
    +-----+-----------+
    

Note - I am aware that storing the value to parquet with an explicit cast(value as Decima(18,6)) on first step can fix the issue, but I already have historical data that I can't reload right away.

Is there a way I can fix this while reading the value at step 5?


Solution

  • I reproduced your example completely except for step 3. You should keep precision and scale when you create the table for type Decimal.

    In your case, you have created a Decimal(28,26)

    df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    

    so you should create a table with the same precision and scale for decimal type.

    hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
    /**AND NOT**/
    hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
    
    scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
    df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    
    scala> df.show()
    +-----+--------------------+
    | name|               value|
    +-----+--------------------+
    |dummy|10.70000000000000...|
    +-----+--------------------+
    
    
    scala> df.printSchema()
    root
     |-- name: string (nullable = false)
     |-- value: decimal(28,26) (nullable = false)
    
    scala> df.write.option("overwrite",true).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
    
    scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
    df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    
    scala> df_parq.printSchema
    root
     |-- name: string (nullable = true)
     |-- value: decimal(28,26) (nullable = true)
    
    
    scala> df_parq.show
    +-----+--------------------+
    | name|               value|
    +-----+--------------------+
    |dummy|10.70000000000000...|
    +-----+--------------------+
    
    hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
    
    
    hive> select * from test_precision;
    
    +----------------------+-----------------------+--+
    | test_precision.name  | test_precision.value  |
    +----------------------+-----------------------+--+
    | dummy                | 10.7                  |
    +----------------------+-----------------------+--+
    
    scala> val df_hive = spark.sql("select * from test.test_precision")
    df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    
    scala> df_hive.show
    +-----+--------------------+
    | name|               value|
    +-----+--------------------+
    |dummy|10.70000000000000...|
    +-----+--------------------+
    
    
    scala> df_hive.printSchema
    root
     |-- name: string (nullable = true)
     |-- value: decimal(28,26) (nullable = true)