This question is related to https://stackoverflow.com/a/37090151/1661491. Let's assume I have a pyspark DataFrame with certain schema, and I would like to overwrite that schema with a new schema that I know is compatible, I could do:
df: DataFrame
new_schema = ...
df.rdd.toDF(schema=new_schema)
Unfortunately this triggers computation as described in the link above. Is there a way to do that at the metadata level (or lazy), without eagerly triggering computation or conversions?
Edit, note:
StructType
I've ended up diving into this a bit myself, and I'm curious about your opinion on my workaround/POC. See https://github.com/ravwojdyla/spark-schema-utils. It transforms expressions, and updates attributes.
Let's say I have two schemas, first one without any metadata, let's call to schema_wo_metadata
:
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
Second one with extra metadata on the inner (ia
) field and outer (ob
), let's call it schema_wi_metadata
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {
"description": "this is ia desc"
},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {
"description": "this is ob desc"
},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
And now let's say I have a dataset with the schema_wo_metadata
schema, and want to swap the schema with schema_wi_metadata
:
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType
# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...
# You need my extra package
spark = SparkSession.builder \
.config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
.getOrCreate()
# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
schema=schema_wo_metadata)
_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)
Now the new_df
has the schema_wi_metadata
, e.g.:
new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}
Any opinions?