Search code examples
jsonapache-sparkpysparkapache-spark-sql

Getting schema from JSON column using schema_of_json function


The documentation of schema_of_json says:

Parameters: 
    json: Column or str
           a JSON string or a foldable string column containing a JSON string.

But executing the following code where I provide a column raises an error:

import pyspark.sql.functions as f
from pyspark.shell import spark

df = spark.createDataFrame([
    ["""{"foo":"hello","bar":"world"}"""],
    ["""{"foo":"hello","bar":"world"}"""],
    ["""{"foo":"hello","bar":"world"}"""]
], ['json_column'])

df.printSchema()
# root
#  |-- json_column: string (nullable = true)

df_schema = df.select('json_column', f.schema_of_json(f.col('json_column')).alias('schema'))
df_schema.show(truncate=False)

# pyspark.sql.utils.AnalysisException: cannot resolve 'schema_of_json(`json_column`)' due to data type mismatch: 
# The input json should be a foldable string expression and not null; however, got `json_column`.;

The only way that I know to use this function is hard-coding a JSON object, but in a production scenario is useless because I can't parse dynamically the content column.

df_schema = df.select('json_column', f.schema_of_json('{"foo":"hello","bar":"world"}').alias('schema'))
df_schema.show(truncate=False)
# +-----------------------------+------------------------------------+
# |json_column                  |schema                              |
# +-----------------------------+------------------------------------+
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# +-----------------------------+------------------------------------+

Am I missing to do some step or the documentation isn't clear how this should be used?


Solution

  • Lets try rdd the df, get schema and infer new schema in a read.json

    s=df.select(col('json_column').alias('j')).rdd.map(lambda x: x.j)#convert json column into an rdd
    
    s.collect()
    
    ['{"foo":"hello","bar":"world"}',
     '{"foo":"hello","bar":"world"}',
     '{"foo":"hello","bar":"world"}']
    
    sc=spark.read.json(s).schema# read the rdd's schema
    
    
    
    
    
    df1=df.select('*', from_json('json_column', schema=sc).alias('jsonread'))# change string to struct 
    
    df1.printSchema()