I have a weather data csv file in which each entry has station ID and the minimum or max value recorded for that day. The second element is key word to know what the value represents. Sample input is as below.
stationID feature value
ITE00100554 TMAX -75
ITE00100554 TMIN -148
GM000010962 PRCP 0
EZE00100082 TMAX -86
EZE00100082 TMIN -135
ITE00100554 TMAX -60
ITE00100554 TMIN -125
GM000010962 PRCP 0
EZE00100082 TMAX -44
EZE00100082 TMIN -130
ITE00100554 TMAX -23
I have filtered out entries with TMIN or TMAX. Each entry is recorded for a given data. I have stripped Date while building my RDD as it's not of interest. My goal is to find the Min and Max value of each station amongst all of its records i.e.,
ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>
I was able to accomplish this using aggregateByKey, but according to this link https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/ I dont have to use aggregateByKey since the input and output values format is the same. So I would like to know if there are an alternate or better ways to code this without defining so many functions.
stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2]))) # (stationID, (tempkey, value))
max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()
def max_seqOp(accumulator, element):
return (accumulator if accumulator[1] > element[1] else element)
def max_combOp(accu1, accu2):
return (accu1 if accu1[1] > accu2[1] else accu2)
def min_seqOp(accumulator, element):
return (accumulator if accumulator[1] < element[1] else element)
def min_combOp(accu1, accu2):
return (accu1 if accu1[1] < accu2[1] else accu2)
station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()
min_max_temps = station_max_temps.zip(station_min_temps).collect()
with open('1800_min_max.csv', 'w') as fd:
writer = csv.writer(fd)
writer.writerows(map(lambda x: list(list(x)), min_max_temps))
I am learning pyspark and havent mastered all different transforming functions.
Here simulated input and if the min and max is filled in correctly, then why the need for the indicator TMIN, TMAX? Indeed no need for an accumulator.
rdd = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3) ])
rddcollect = rdd.collect()
#print(rddcollect)
rdd2 = rdd.map(lambda x: (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)
rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))) )
rdd4.collect()
returns:
Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
ALTERNATE ANSWER
Here goes:
include = ['tmin','tmax']
rdd0 = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3) ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x: ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1] ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1] ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)
returns:
[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]