Search code examples
pythonpandaspysparkhive

Iterate over multiple queries and store it in pyspark dataframe


I have a table in hive, i want to query it on a condition in a loop and store the result in multiple pyspark dataframes dynamically.

Base Query

g1 = """
    select * from db.hive_table where group =  1
"""

group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()

There are 80 groups in total, Currently running the above code individually for Group = 2, Group = 3 and so on.

My useless iteration code

    # changes the geometry type to obj

df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
         group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
         group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
         group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
         group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
         group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
         group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
         group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,
         
# num_list=[1,2,3,4,5,5,6,6]

for d in df_list:
    for i in range(1,80):
         gi = """
        select * from db.hive_table where group =  $i
        """
    
        group_i = spk.sql(gi)
        print(group_i.show(3))
        print(group_i.printSchema())
        print((group_i.count(), len(group_i.columns)))
        return group_i = group_i.toPandas()

Requesting help to guide me to solve this problem and help me increase my coding knowledge.

Thanks in advance.


Solution

  • Using lists

    python/pyspark won't allow you to create variable names dynamically. However, you can create a list of dataframes that can be used like sdf_list[0].show(), sdf_list[1].toPandas().

    sdf_list = []
    
    for i in range(1, 81):
        filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
        sdf_list.append((i, filtered_sdf))  # (<filter/group identifier>, <spark dataframe>)
        del filtered_sdf
    

    Now, the sdf_list has a list of spark dataframes that can be accessed using list indices. e.g., the first dataframe can be accessed using [0] and a print will verify that it is a dataframe.

    print(sdf_list[0])
    # (1, DataFrame[col1: bigint, dt: date, col3: bigint])
    # (<filter/group identifier>, <spark dataframe>)
    

    The list can be iterated over and all dataframes within it can be used individually. e.g.,

    for (i, sdf) in sdf_list[:2]:
        print("dataframe {0}'s count:".format(i), sdf.count())
    
    # dataframe 1's count: 20
    # dataframe 2's count: 30
    

    Feel free to use it as you like.

    sdf_list[0][1].count()  # [0] returns the tuple - (<sdf identifier>, <sdf>)
    # 20
    
    sdf_list[0][1].show(2)
    # etc...
    

    Let's say you also want all the spark dataframes as pandas dataframes. You'll again need to create a list of dataframes if you want it dynamic. Or just access the spark dataframes using indices.

    # using indices
    group1_pdf = sdf_list[0][1].toPandas()
    
    # creating list of pandas dataframes
    pdf_list = []
    
    for (i, sdf) in sdf_list:
        pdf_list.append((i, sdf.toPandas()))  # (<filter/group identifier>, <pandas dataframe>)
    
    type(pdf_list)
    # list
    
    type(pdf_list[0])
    # tuple
    
    type(pdf_list[0][1])
    # pandas.core.frame.DataFrame
    

    Using dictionaries

    We could also use dictionaries to store the dataframes and keep track of it using the keys. So, the keys can act as a dataframe name.

    sdf_dict = {}
    
    for i in range(1, 81):
        filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
        sdf_dict['group'+str(i)] = filtered_sdf
        del filtered_sdf
    

    The dictionary will have the dataframes that can be accessed using the keys. Let's simply print the first 2 keys and check what values we have.

    list(sdf_dict.keys())[:2]
    # ['group1', 'group2']
    
    sdf_dict['group1']
    # DataFrame[col1: bigint, dt: date, col3: bigint]
    
    sdf_dict['group1'].count()
    # 20
    

    You can choose to iterate over the dict keys and use the spark dataframes.

    for sdf_key in list(sdf_dict.keys())[:2]:
        print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())
    
    # group1's record count: 20
    # group2's record count: 30
    

    You can check the type() for a better understanding.

    type(sdf_dict)
    # dict
    
    type(sdf_dict['group1'])
    # pyspark.sql.dataframe.DataFrame
    

    Conversion to a pandas dataframe would be simple

    # single df manually
    group1_pdf = sdf_dict['group1'].toPandas()
    
    # with iteration
    pdf_dict = {}
    
    for sdf_key in sdf_dict.keys():
        pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()
    
    type(pdf_dict)
    # dict
    
    type(pdf_dict['group1'])
    # pandas.core.frame.DataFrame