This is a case for calculating average hold cost. We only consider the trades that increase the account balance, regardless of the trades that decrease the account balance.
# data example: ((1,'000001'),('A',0,5000,5000)),
# (1,'000001') is the groupby key , 'A' is order by key (serialno) , '0' is the account balance
# before the trade, '5000' is trade balance, '5000' is the account balance aftre the trade. We aim
# to calculate #the average cost per unit after the trades in each group by spark rdd.
confirm = [
((1, '000001'), ('A', 0, 5000, 5000)),
((1, '000001'), ('C', 9000, 1000, 10000)),
((1, '000001'), ('B', 5000, 5000, 9000)),
((2, '000001'), ('D', 0, 3300, 3000)),
((2, '000001'), ('F', 4000, 5000, 10000)),
((2, '000001'), ('E', 3000, 4200, 6000)),
((3, '000001'), ('G', 0, 3300, 3000)),
((3, '000001'), ('H', 3000, 3300, 6300))
]
def my_partition(x):
return x[0] % 3
def partSort(x):
xlist = list(x)
a = sorted(xlist, key=lambda x: x[1][0])
return iter(a)
import pandas as pd
def udf_func(x, y):
if y is None:
result = x[2] / x[3]
df = pd.DataFrame([{'serialno': x[0], 'result': result}])
else:
result = (
(x if isinstance(x, float) else (x[2] / x[3])) * y[1] + y[2]
) / y[3]
df = pd.DataFrame([{'serialno': y[0], 'result': result}])
# this is where I want to store the intermediate result,but does not work eg:
df.to_csv("/home/zo_om/result.csv", 'a')
return result
rdd = sc.parallelize(confirm).partitionBy(3, my_partition). \
mapPartitions(partSort).reduceByKey(udf_func)
rdd.collect()
After I run the code, the result is:
[
((3, '000001'), 1.0476190476190477),
((2, '000001'), 1.0),
((1, '000001'), 1.1)
]
which is last result of each group.
I can see only 1 row in the "/home/zo_om/result.csv" ( only in one work node of spark cluster, zo_om is the kerberos user). What I expect to see is 8 rows ( one for each serialno ('A'~'H') )
I guess you only see one line since pd.DataFrame.to_csv
overwrites the existing data and you write to the same path all the time