Search code examples
pythonpysparkapache-spark-sqlmultiple-columnsdatabase-schema

Null Column Values in PySpark DataFrame after changing Schema


I have a json string. I am currently reading it into a pyspark dataframe.

rdd = sc.parallelize([json_str])
nested_df = hc.read.json(rdd)

Upon doing nested_df.show(20,False) I get:

\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|C_0_0                                                                                          |C_0_1                                                                                            |
\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|\[{19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}\]|{{2000-12-12 23:30:30.1234567, 2000-12-12 23:30:30.1234567}, {2000-12-12 23:30:30.1234567, 3.13}}|
\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+

My Current Schema is after reading the json_str is :

    # root
    # |-- C_0_0: array (nullable = true)
    # |    |-- element: struct (containsNull = true)
    # |    |    |-- C_2_0: double (nullable = true)
    # |    |    |-- C_2_1: double (nullable = true)
    # |-- C_0_1: struct (nullable = true)
    # |    |-- C_1_0: struct (nullable = true)
    # |    |    |-- C_2_0: string (nullable = true)
    # |    |    |-- C_2_1: string (nullable = true)
    # |    |-- C_1_1: struct (nullable = true)
    # |    |    |-- C_2_0: string (nullable = true)
    # |    |    |-- C_2_1: double (nullable = true)

But I want my schema to be without data loss:

    # root
    # |-- C_0_0: array (nullable = true)
    # |    |-- element: struct (containsNull = true)
    # |    |    |-- C_2_0: decimal(6,6) (nullable = true)
    # |    |    |-- C_2_1: double (nullable = true)
    # |-- C_0_1: struct (nullable = true)
    # |    |-- C_1_0: struct (nullable = true)
    # |    |    |-- C_2_0: timestamp (nullable = true)
    # |    |    |-- C_2_1: timestamp (nullable = true)
    # |    |-- C_1_1: struct (nullable = true)
    # |    |    |-- C_2_0: timestamp (nullable = true)
    # |    |    |-- C_2_1: double (nullable = true)

This is my create table statement:

CREATE TABLE CompoundDataTypesSchema.TABLE_NAME ( C_0_0 ARRAY OF ROW ( C_2_0 DECIMAL(6,6) ,C_2_1 DOUBLE ) ,C_0_1 ROW ( C_1_0 ROW ( C_2_0 TIMESTAMP ,C_2_1 TIMESTAMP ),C_1_1 ROW ( C_2_0 TIMESTAMP ,C_2_1 DOUBLE ) ) ) in
 FILES_SERVICE

I am expecting the above mentioned table with newly defined schema without data loss. But my current table is:

+-----+----------------------------------------------------------------------------------------------+
|C_0_0|C_0_1                                                                                         |
+-----+----------------------------------------------------------------------------------------------+
|null |{{2000-12-12 23:30:30.123456, 2000-12-12 23:30:30.123456}, {2000-12-12 23:30:30.123456, 3.13}}|
+-----+----------------------------------------------------------------------------------------------+

The C_0_0 column has NULL value.

To change the schema I tried the following:

def transform_schema(self, schema, parent=""):
        
        if schema == None:
            return StructType()
        
        new_schema = []    
        for f in schema.fields:
            if parent:
                field_name = parent + '.' + f.name
            else:
                field_name = f.name
            if isinstance(f.dataType, ArrayType):
                new_schema.append(StructField(f.name, ArrayType(self.transform_schema(f.dataType.elementType))))
            elif isinstance(f.dataType, StructType):
                new_schema.append(StructField(f.name, self.transform_schema(f.dataType)))
            else:
                new_datatype = self.changeDatatypeforNestedField()
                new_schema.append(StructField(f.name, new_datatype, f.nullable))
        
        return StructType(new_schema)


nested_df_schema = nested_df.schema
        for f in nested_df_schema.fields:
            print("Name: ", f.name)
            col_name = f.name
            if isinstance(f.dataType, ArrayType):
                new_schema = ArrayType(self.transform_schema(f.dataType.elementType, parent = f.name))
                nested_df = nested_df.withColumn("col_name_json", to_json(col_name)).drop(col_name)
                nested_df = nested_df.withColumn(col_name, from_json("col_name_json", new_schema)).drop("col_name_json")
            elif isinstance(f.dataType, StructType):
                new_schema = self.transform_schema(f.dataType, parent = f.name)
                nested_df = nested_df.withColumn("col_name_json", to_json(col_name)).drop(col_name)
                nested_df = nested_df.withColumn(col_name, from_json("col_name_json", new_schema)).drop("col_name_json")
            else:
                new_datatype = self.changeDatatypeforNestedField()
                nested_df = nested_df.withColumn(col_name, nested_df[col_name].cast(new_datatype))]

Can someone point me out what might be the issue?


Solution

  • Read the data first, and transform the column what you want.

    rdd = sc.parallelize([{'C_0_0': [{'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}], 'C_0_1': {'C_1_0': {'C_2_0': "2000-12-12 23:30:30.1234567", 'C_2_1': "2000-12-12 23:30:30.1234567"}, 'C_1_1': {'C_2_0': "2000-12-12 23:30:30.1234567", 'C_2_1': 3.13}}}])
    
    schema = StructType([StructField('C_0_0', ArrayType(StructType([StructField('C_2_0', DecimalType(8, 6), True), StructField('C_2_1', DoubleType(), True)]), True), True), StructField('C_0_1', StructType([StructField('C_1_0', StructType([StructField('C_2_0', TimestampType(), True), StructField('C_2_1', TimestampType(), True)]), True), StructField('C_1_1', StructType([StructField('C_2_0', TimestampType(), True), StructField('C_2_1', DoubleType(), True)]), True)]), True)])
    df = spark.read.json(rdd, schema=schema)
    
    df2 = df.withColumn('C_0_0', f.transform('C_0_0', lambda e: f.struct((e['C_2_0'] % 1).cast("decimal(6, 6)"), e['C_2_1'])))
    df2.show(truncate=False)
    df2.printSchema()
    
    +------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
    |C_0_0                                                                                     |C_0_1                                                                                         |
    +------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
    |[{0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}]|{{2000-12-12 23:30:30.123456, 2000-12-12 23:30:30.123456}, {2000-12-12 23:30:30.123456, 3.13}}|
    +------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
    
    root
     |-- C_0_0: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- col1: decimal(6,6) (nullable = true)
     |    |    |-- col2: double (nullable = true)
     |-- C_0_1: struct (nullable = true)
     |    |-- C_1_0: struct (nullable = true)
     |    |    |-- C_2_0: timestamp (nullable = true)
     |    |    |-- C_2_1: timestamp (nullable = true)
     |    |-- C_1_1: struct (nullable = true)
     |    |    |-- C_2_0: timestamp (nullable = true)
     |    |    |-- C_2_1: double (nullable = true)