Search code examples
apache-sparkpysparkapache-spark-sql

How to overwrite pyspark DataFrame schema without data scan?


This question is related to https://stackoverflow.com/a/37090151/1661491. Let's assume I have a pyspark DataFrame with certain schema, and I would like to overwrite that schema with a new schema that I know is compatible, I could do:

df: DataFrame
new_schema = ...

df.rdd.toDF(schema=new_schema)

Unfortunately this triggers computation as described in the link above. Is there a way to do that at the metadata level (or lazy), without eagerly triggering computation or conversions?

Edit, note:

  • the schema can be arbitrarily complicated (nested etc)
  • new schema includes updates to description, nullability and additional metadata (bonus points for updates to the type)
  • I would like to avoid writing a custom query expression generator, unless there's one already built into Spark that can generate query based on the schema/StructType

Solution

  • I've ended up diving into this a bit myself, and I'm curious about your opinion on my workaround/POC. See https://github.com/ravwojdyla/spark-schema-utils. It transforms expressions, and updates attributes.

    Let's say I have two schemas, first one without any metadata, let's call to schema_wo_metadata:

    {
      "fields": [
        {
          "metadata": {},
          "name": "oa",
          "nullable": false,
          "type": {
            "containsNull": true,
            "elementType": {
              "fields": [
                {
                  "metadata": {},
                  "name": "ia",
                  "nullable": false,
                  "type": "long"
                },
                {
                  "metadata": {},
                  "name": "ib",
                  "nullable": false,
                  "type": "string"
                }
              ],
              "type": "struct"
            },
            "type": "array"
          }
        },
        {
          "metadata": {},
          "name": "ob",
          "nullable": false,
          "type": "double"
        }
      ],
      "type": "struct"
    }
    

    Second one with extra metadata on the inner (ia) field and outer (ob), let's call it schema_wi_metadata

    {
      "fields": [
        {
          "metadata": {},
          "name": "oa",
          "nullable": false,
          "type": {
            "containsNull": true,
            "elementType": {
              "fields": [
                {
                  "metadata": {
                    "description": "this is ia desc"
                  },
                  "name": "ia",
                  "nullable": false,
                  "type": "long"
                },
                {
                  "metadata": {},
                  "name": "ib",
                  "nullable": false,
                  "type": "string"
                }
              ],
              "type": "struct"
            },
            "type": "array"
          }
        },
        {
          "metadata": {
            "description": "this is ob desc"
          },
          "name": "ob",
          "nullable": false,
          "type": "double"
        }
      ],
      "type": "struct"
    }
    

    And now let's say I have a dataset with the schema_wo_metadata schema, and want to swap the schema with schema_wi_metadata:

    from pyspark.sql import SparkSession
    from pyspark.sql import Row, DataFrame
    from pyspark.sql.types import StructType
    
    
    # I assume these get generate/specified somewhere
    schema_wo_metadata: StructType = ...
    schema_wi_metadata: StructType = ...
    
    # You need my extra package
    spark = SparkSession.builder \
        .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
        .getOrCreate()
    
    # Dummy data with `schema_wo_metadata` schema:
    df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
                                     Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
                               schema=schema_wo_metadata)
    
    _jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
    new_df = DataFrame(_jdf, df.sql_ctx)
    

    Now the new_df has the schema_wi_metadata, e.g.:

    new_df.schema["oa"].dataType.elementType["ia"].metadata
    # -> {'description': 'this is ia desc'}
    

    Any opinions?