I'm trying to have a function to create Hive table based on the data
part of avro data. The schema for the source data is like below. The target table needs to be partitioned by partition
field in the source data, with two columns name
and description
. I can get the source data
part by df.select('data.*')
and the table schema by df.select('data.*').schema
, but partition
column is not in there. My goal is to have a create table clause create table mytable (name string, description string) partitioned by (partition integer) store as parquet
How do I do this? Do I need to append df.select('partition.*')
to df.select('data.*') first? Many thanks for your help.
Edited: The goal is that you wouldn't need to specify the level of columns, like data.name and partition, but only pass in "columns" and "partition column" (which can be at any nested level, and then generate a create table statement.
root
|--metadata: struct
| |---id: string
| |---time : string
|--data:struct
| |---name : string
| |---description : string
|--partition:integer
The following self-contained example shows you how you can create and write the table that you specify. You need to provide your own path_for_saving
.
import pyspark.sql.functions as F
import pyspark.sql.types as T
schema = T.StructType([
T.StructField('metadata', T.StructType([
T.StructField("id",T.StringType()),
T.StructField("time",T.StringType())])),
T.StructField('data', T.StructType([
T.StructField("name",T.StringType()),
T.StructField("description",T.StringType()),
])),
T.StructField("partition", T.IntegerType()),
T.StructField("Level1", T.StructType([
T.StructField("Level2",T.StructType([
T.StructField("Level3", T.StructType([
T.StructField("partition_alt", T.IntegerType())]))]))]))
])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()
def parse_fields(schema, path=""):
collect = []
for struct_field in schema:
this_field_name = struct_field.name
if type(struct_field.dataType) == T.StructType:
collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
else:
collect = collect + [path + this_field_name]
return collect
parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
for field in parsed_fields:
if col_name in field:
return F.col(field).alias(col_name)
name_col = "name"
description_col = "description"
partition_col = "partition_alt"
df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()
df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
Output:
root
|-- metadata: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- time: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
|-- partition: integer (nullable = true)
|-- Level1: struct (nullable = true)
| |-- Level2: struct (nullable = true)
| | |-- Level3: struct (nullable = true)
| | | |-- partition_alt: integer (nullable = true)
+------------+--------------+---------+-------+
| metadata| data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}| 1|{{{3}}}|
|{id2, time2}|{name2, desc2}| 2|{{{4}}}|
+------------+--------------+---------+-------+
Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1| desc1| 3|
|name2| desc2| 4|
+-----+-----------+-------------+
The example demonstrates how to find a deeply nested field. You need to rewrite get_column
with your own criteria for matching a field name to a complete column name. Here, get_column
just returns the first field that has a name with col_name
in it.