Search code examples
pythonjsonpysparkavro

How to programmatically generate create table statement based on data frame


I'm trying to have a function to create Hive table based on the data part of avro data. The schema for the source data is like below. The target table needs to be partitioned by partition field in the source data, with two columns name and description. I can get the source data part by df.select('data.*') and the table schema by df.select('data.*').schema, but partition column is not in there. My goal is to have a create table clause create table mytable (name string, description string) partitioned by (partition integer) store as parquet How do I do this? Do I need to append df.select('partition.*') to df.select('data.*') first? Many thanks for your help.

Edited: The goal is that you wouldn't need to specify the level of columns, like data.name and partition, but only pass in "columns" and "partition column" (which can be at any nested level, and then generate a create table statement.

root
|--metadata: struct
|   |---id: string
|   |---time : string
|--data:struct
|   |---name : string
|   |---description : string
|--partition:integer

Solution

  • The following self-contained example shows you how you can create and write the table that you specify. You need to provide your own path_for_saving.

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    
    schema = T.StructType([
              T.StructField('metadata', T.StructType([
                T.StructField("id",T.StringType()),
                T.StructField("time",T.StringType())])),
              T.StructField('data', T.StructType([
                T.StructField("name",T.StringType()),
                T.StructField("description",T.StringType()),
              ])),
              T.StructField("partition", T.IntegerType()),
              T.StructField("Level1", T.StructType([
                T.StructField("Level2",T.StructType([
                  T.StructField("Level3", T.StructType([
                    T.StructField("partition_alt", T.IntegerType())]))]))]))
             ])
    df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
    df_sample_data.printSchema()
    df_sample_data.show()
    
    def parse_fields(schema, path=""):
      collect = []
      for struct_field in schema:
        this_field_name = struct_field.name
        if type(struct_field.dataType) == T.StructType:
          collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
        else: 
          collect = collect + [path + this_field_name]
      return collect
    
    parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
    print("Parsed fields:" + str(parsed_fields))
    def get_column(col_name):
      for field in parsed_fields:
        if col_name in field:
          return F.col(field).alias(col_name)
        
    name_col = "name"
    description_col = "description"
    partition_col = "partition_alt"
    
    df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
    df_mytable.show()
    
    df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
    

    Output:

    root
     |-- metadata: struct (nullable = true)
     |    |-- id: string (nullable = true)
     |    |-- time: string (nullable = true)
     |-- data: struct (nullable = true)
     |    |-- name: string (nullable = true)
     |    |-- description: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- Level1: struct (nullable = true)
     |    |-- Level2: struct (nullable = true)
     |    |    |-- Level3: struct (nullable = true)
     |    |    |    |-- partition_alt: integer (nullable = true)
    
    +------------+--------------+---------+-------+
    |    metadata|          data|partition| Level1|
    +------------+--------------+---------+-------+
    |{id1, time1}|{name1, desc1}|        1|{{{3}}}|
    |{id2, time2}|{name2, desc2}|        2|{{{4}}}|
    +------------+--------------+---------+-------+
    
    Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
    +-----+-----------+-------------+
    | name|description|partition_alt|
    +-----+-----------+-------------+
    |name1|      desc1|            3|
    |name2|      desc2|            4|
    +-----+-----------+-------------+
    

    The example demonstrates how to find a deeply nested field. You need to rewrite get_column with your own criteria for matching a field name to a complete column name. Here, get_column just returns the first field that has a name with col_name in it.