Search code examples
apache-sparkhivepysparkapache-spark-sqlrdd

PySpark: Map a SchemaRDD into a SchemaRDD


I am loading a file of JSON objects as a PySpark SchemaRDD. I want to change the "shape" of the objects (basically, I'm flattening them) and then insert into a Hive table.

The problem I have is that the following returns a PipelinedRDD not a SchemaRDD:

log_json.map(flatten_function)

(Where log_json is a SchemaRDD).

Is there either a way to preserve type, cast back to the desired type, or efficiently insert from the new type?


Solution

  • The solution is applySchema:

    mapped = log_json.map(flatten_function)
    hive_context.applySchema(mapped, flat_schema).insertInto(name)
    

    Where flat_schema is a StructType representing the schema in the same way as you would obtain from log_json.schema() (but flattened, obviously).