I have a table in hive, i want to query it on a condition in a loop and store the result in multiple pyspark dataframes dynamically.
Base Query
g1 = """
select * from db.hive_table where group = 1
"""
group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()
There are 80 groups in total, Currently running the above code individually for Group = 2, Group = 3 and so on.
My useless iteration code
# changes the geometry type to obj
df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,
# num_list=[1,2,3,4,5,5,6,6]
for d in df_list:
for i in range(1,80):
gi = """
select * from db.hive_table where group = $i
"""
group_i = spk.sql(gi)
print(group_i.show(3))
print(group_i.printSchema())
print((group_i.count(), len(group_i.columns)))
return group_i = group_i.toPandas()
Requesting help to guide me to solve this problem and help me increase my coding knowledge.
Thanks in advance.
python/pyspark won't allow you to create variable names dynamically. However, you can create a list of dataframes that can be used like sdf_list[0].show()
, sdf_list[1].toPandas()
.
sdf_list = []
for i in range(1, 81):
filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
sdf_list.append((i, filtered_sdf)) # (<filter/group identifier>, <spark dataframe>)
del filtered_sdf
Now, the sdf_list
has a list of spark dataframes that can be accessed using list indices. e.g., the first dataframe can be accessed using [0]
and a print will verify that it is a dataframe.
print(sdf_list[0])
# (1, DataFrame[col1: bigint, dt: date, col3: bigint])
# (<filter/group identifier>, <spark dataframe>)
The list can be iterated over and all dataframes within it can be used individually. e.g.,
for (i, sdf) in sdf_list[:2]:
print("dataframe {0}'s count:".format(i), sdf.count())
# dataframe 1's count: 20
# dataframe 2's count: 30
Feel free to use it as you like.
sdf_list[0][1].count() # [0] returns the tuple - (<sdf identifier>, <sdf>)
# 20
sdf_list[0][1].show(2)
# etc...
Let's say you also want all the spark dataframes as pandas dataframes. You'll again need to create a list of dataframes if you want it dynamic. Or just access the spark dataframes using indices.
# using indices
group1_pdf = sdf_list[0][1].toPandas()
# creating list of pandas dataframes
pdf_list = []
for (i, sdf) in sdf_list:
pdf_list.append((i, sdf.toPandas())) # (<filter/group identifier>, <pandas dataframe>)
type(pdf_list)
# list
type(pdf_list[0])
# tuple
type(pdf_list[0][1])
# pandas.core.frame.DataFrame
We could also use dictionaries to store the dataframes and keep track of it using the keys. So, the keys can act as a dataframe name.
sdf_dict = {}
for i in range(1, 81):
filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
sdf_dict['group'+str(i)] = filtered_sdf
del filtered_sdf
The dictionary will have the dataframes that can be accessed using the keys. Let's simply print the first 2 keys and check what values we have.
list(sdf_dict.keys())[:2]
# ['group1', 'group2']
sdf_dict['group1']
# DataFrame[col1: bigint, dt: date, col3: bigint]
sdf_dict['group1'].count()
# 20
You can choose to iterate over the dict keys and use the spark dataframes.
for sdf_key in list(sdf_dict.keys())[:2]:
print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())
# group1's record count: 20
# group2's record count: 30
You can check the type()
for a better understanding.
type(sdf_dict)
# dict
type(sdf_dict['group1'])
# pyspark.sql.dataframe.DataFrame
Conversion to a pandas dataframe would be simple
# single df manually
group1_pdf = sdf_dict['group1'].toPandas()
# with iteration
pdf_dict = {}
for sdf_key in sdf_dict.keys():
pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()
type(pdf_dict)
# dict
type(pdf_dict['group1'])
# pandas.core.frame.DataFrame