Search code examples
apache-sparkpysparkrdd

Alternate or better approach to aggregateByKey in pyspark RDD


I have a weather data csv file in which each entry has station ID and the minimum or max value recorded for that day. The second element is key word to know what the value represents. Sample input is as below.

  stationID    feature value
  ITE00100554    TMAX  -75  
  ITE00100554    TMIN -148         
  GM000010962    PRCP    0         
  EZE00100082    TMAX  -86         
  EZE00100082    TMIN -135         
  ITE00100554    TMAX  -60         
  ITE00100554    TMIN -125         
  GM000010962    PRCP    0         
  EZE00100082    TMAX  -44         
  EZE00100082    TMIN -130         
  ITE00100554    TMAX  -23 

I have filtered out entries with TMIN or TMAX. Each entry is recorded for a given data. I have stripped Date while building my RDD as it's not of interest. My goal is to find the Min and Max value of each station amongst all of its records i.e.,

ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>

I was able to accomplish this using aggregateByKey, but according to this link https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/ I dont have to use aggregateByKey since the input and output values format is the same. So I would like to know if there are an alternate or better ways to code this without defining so many functions.

stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2])))  # (stationID, (tempkey, value))

max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()


def max_seqOp(accumulator, element):
    return (accumulator if accumulator[1] > element[1] else element)


def max_combOp(accu1, accu2):
    return (accu1 if accu1[1] > accu2[1] else accu2)


def min_seqOp(accumulator, element):
    return (accumulator if accumulator[1] < element[1] else element)


def min_combOp(accu1, accu2):
    return (accu1 if accu1[1] < accu2[1] else accu2)


station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()

min_max_temps = station_max_temps.zip(station_min_temps).collect()

with open('1800_min_max.csv', 'w') as fd:
    writer = csv.writer(fd)
    writer.writerows(map(lambda x: list(list(x)), min_max_temps))

I am learning pyspark and havent mastered all different transforming functions.


Solution

  • Here simulated input and if the min and max is filled in correctly, then why the need for the indicator TMIN, TMAX? Indeed no need for an accumulator.

    rdd = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3)  ])
    rddcollect = rdd.collect()
    #print(rddcollect)
    
    rdd2 = rdd.map(lambda x:  (x[0], x[2]))
    #rdd2collect = rdd2.collect()
    #print(rdd2collect)
    
    rdd3 = rdd2.groupByKey().sortByKey()
    rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1])))  )
    rdd4.collect()
    

    returns:

    Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
    

    ALTERNATE ANSWER

    • after clarification
    • assuming that min and max values make sense
    • with my own data
    • there are other solutions BTW

    Here goes:

    include = ['tmin','tmax']
    
    rdd0 = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3)  ])
    rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
    rdd2 = rdd1.map(lambda x:  ( (x[0],x[1]), x[2]))
    rdd3 = rdd2.groupByKey().sortByKey()
    rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1]  ) ))
    rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1]  ) ))
    rdd5=rdd4Min.union(rdd4Max)
    rdd6 = rdd5.groupByKey().sortByKey()
    res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
    rescollect = res.collect()
    print(rescollect)
    

    returns:

    [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]