I have a json string. I am currently reading it into a pyspark dataframe.
rdd = sc.parallelize([json_str])
nested_df = hc.read.json(rdd)
Upon doing nested_df.show(20,False) I get:
\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|C_0_0 |C_0_1 |
\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|\[{19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}, {19.765432, 3.13}\]|{{2000-12-12 23:30:30.1234567, 2000-12-12 23:30:30.1234567}, {2000-12-12 23:30:30.1234567, 3.13}}|
\+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
My Current Schema is after reading the json_str is :
# root
# |-- C_0_0: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- C_2_0: double (nullable = true)
# | | |-- C_2_1: double (nullable = true)
# |-- C_0_1: struct (nullable = true)
# | |-- C_1_0: struct (nullable = true)
# | | |-- C_2_0: string (nullable = true)
# | | |-- C_2_1: string (nullable = true)
# | |-- C_1_1: struct (nullable = true)
# | | |-- C_2_0: string (nullable = true)
# | | |-- C_2_1: double (nullable = true)
But I want my schema to be without data loss:
# root
# |-- C_0_0: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- C_2_0: decimal(6,6) (nullable = true)
# | | |-- C_2_1: double (nullable = true)
# |-- C_0_1: struct (nullable = true)
# | |-- C_1_0: struct (nullable = true)
# | | |-- C_2_0: timestamp (nullable = true)
# | | |-- C_2_1: timestamp (nullable = true)
# | |-- C_1_1: struct (nullable = true)
# | | |-- C_2_0: timestamp (nullable = true)
# | | |-- C_2_1: double (nullable = true)
This is my create table statement:
CREATE TABLE CompoundDataTypesSchema.TABLE_NAME ( C_0_0 ARRAY OF ROW ( C_2_0 DECIMAL(6,6) ,C_2_1 DOUBLE ) ,C_0_1 ROW ( C_1_0 ROW ( C_2_0 TIMESTAMP ,C_2_1 TIMESTAMP ),C_1_1 ROW ( C_2_0 TIMESTAMP ,C_2_1 DOUBLE ) ) ) in
FILES_SERVICE
I am expecting the above mentioned table with newly defined schema without data loss. But my current table is:
+-----+----------------------------------------------------------------------------------------------+
|C_0_0|C_0_1 |
+-----+----------------------------------------------------------------------------------------------+
|null |{{2000-12-12 23:30:30.123456, 2000-12-12 23:30:30.123456}, {2000-12-12 23:30:30.123456, 3.13}}|
+-----+----------------------------------------------------------------------------------------------+
The C_0_0 column has NULL
value.
To change the schema I tried the following:
def transform_schema(self, schema, parent=""):
if schema == None:
return StructType()
new_schema = []
for f in schema.fields:
if parent:
field_name = parent + '.' + f.name
else:
field_name = f.name
if isinstance(f.dataType, ArrayType):
new_schema.append(StructField(f.name, ArrayType(self.transform_schema(f.dataType.elementType))))
elif isinstance(f.dataType, StructType):
new_schema.append(StructField(f.name, self.transform_schema(f.dataType)))
else:
new_datatype = self.changeDatatypeforNestedField()
new_schema.append(StructField(f.name, new_datatype, f.nullable))
return StructType(new_schema)
nested_df_schema = nested_df.schema
for f in nested_df_schema.fields:
print("Name: ", f.name)
col_name = f.name
if isinstance(f.dataType, ArrayType):
new_schema = ArrayType(self.transform_schema(f.dataType.elementType, parent = f.name))
nested_df = nested_df.withColumn("col_name_json", to_json(col_name)).drop(col_name)
nested_df = nested_df.withColumn(col_name, from_json("col_name_json", new_schema)).drop("col_name_json")
elif isinstance(f.dataType, StructType):
new_schema = self.transform_schema(f.dataType, parent = f.name)
nested_df = nested_df.withColumn("col_name_json", to_json(col_name)).drop(col_name)
nested_df = nested_df.withColumn(col_name, from_json("col_name_json", new_schema)).drop("col_name_json")
else:
new_datatype = self.changeDatatypeforNestedField()
nested_df = nested_df.withColumn(col_name, nested_df[col_name].cast(new_datatype))]
Can someone point me out what might be the issue?
Read the data first, and transform the column what you want.
rdd = sc.parallelize([{'C_0_0': [{'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}, {'C_2_0': 19.765432, 'C_2_1': 3.13}], 'C_0_1': {'C_1_0': {'C_2_0': "2000-12-12 23:30:30.1234567", 'C_2_1': "2000-12-12 23:30:30.1234567"}, 'C_1_1': {'C_2_0': "2000-12-12 23:30:30.1234567", 'C_2_1': 3.13}}}])
schema = StructType([StructField('C_0_0', ArrayType(StructType([StructField('C_2_0', DecimalType(8, 6), True), StructField('C_2_1', DoubleType(), True)]), True), True), StructField('C_0_1', StructType([StructField('C_1_0', StructType([StructField('C_2_0', TimestampType(), True), StructField('C_2_1', TimestampType(), True)]), True), StructField('C_1_1', StructType([StructField('C_2_0', TimestampType(), True), StructField('C_2_1', DoubleType(), True)]), True)]), True)])
df = spark.read.json(rdd, schema=schema)
df2 = df.withColumn('C_0_0', f.transform('C_0_0', lambda e: f.struct((e['C_2_0'] % 1).cast("decimal(6, 6)"), e['C_2_1'])))
df2.show(truncate=False)
df2.printSchema()
+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|C_0_0 |C_0_1 |
+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|[{0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}, {0.765432, 3.13}]|{{2000-12-12 23:30:30.123456, 2000-12-12 23:30:30.123456}, {2000-12-12 23:30:30.123456, 3.13}}|
+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
root
|-- C_0_0: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- col1: decimal(6,6) (nullable = true)
| | |-- col2: double (nullable = true)
|-- C_0_1: struct (nullable = true)
| |-- C_1_0: struct (nullable = true)
| | |-- C_2_0: timestamp (nullable = true)
| | |-- C_2_1: timestamp (nullable = true)
| |-- C_1_1: struct (nullable = true)
| | |-- C_2_0: timestamp (nullable = true)
| | |-- C_2_1: double (nullable = true)