Search code examples
pythonpysparkrdd

Python Spark Average of Tuple Values By Key


I'm trying to find the average length of words by paragraph. Data is pulled from a text file in the format of 1|For more than five years... where each line features a paragraph number.

So far this is my code:

from pyspark import SparkContext, SparkConf
sc = SparkContext('local', 'longest')
text = sc.textFile("walden.txt")
lines = text.map(lambda line: (line.split("|")[0],line))
lines = lines.filter(lambda kv: len(kv[1]) > 0)
words = lines.mapValues(lambda x: x.replace("1|","").replace("2|","").replace("3|",""))
words = words.mapValues(lambda x: x.split())
words = words.mapValues(lambda x: [(len(i),1) for i in x])
words = words.reduceByKey(lambda a,b: a+b)
words.saveAsTextFile("results")

And the current output follows this format:

('1', [(2,1),(6,1),(1,1)..etc)]),('2', [(2,1),(6,1),(1,1)..etc)]),('3', [(2,1),(6,1),(1,1)..etc)])

Where '1'/'2'/'3' are the paragraph IDs, and the tuples follow (word length, 1) format.

What I need to do is sum the values of the tuples (by key/paragraph ID) so that (2,1),(6,1),(1,1) becomes (9,3) and then divide these values (9/3) to find the average length of words in each paragraph.

I've tried a bunch of different things but just can't get this to work. Your help is greatly appreciated!


Solution

  • For your rdd case, try this.

    text = sc.textFile("test.txt")
    lines = text.map(lambda line: (line.split("|")[0],line))
    lines = lines.filter(lambda kv: len(kv[1]) > 0)
    words = lines.mapValues(lambda x: x.replace("1|","").replace("2|","").replace("3|",""))
    words = words.mapValues(lambda x: x.split())
    words = words.mapValues(lambda x: [len(i) for i in x])
    words = words.mapValues(lambda x: sum(x) / len(x))
    words.collect()
    
    [('1', 4.0), ('2', 5.4), ('3', 7.0)]
    

    I use the dataframe and got this.

    import pyspark.sql.functions as f
    
    df = spark.read.option("inferSchema","true").option("sep","|").csv("test.txt").toDF("col1", "col2")
    df.show(10, False)
    
    +----+---------------------------------------+
    |col1|col2                                   |
    +----+---------------------------------------+
    |1   |For more than five years               |
    |2   |For moasdre than five asdfyears        |
    |3   |Fasdfor more thasdfan fidafve yearasdfs|
    +----+---------------------------------------+
    
    df.withColumn('array', f.split('col2', r'[ ][ ]*')) \
      .withColumn('count_arr', f.expr('transform(array, x -> LENGTH(x))')) \
      .withColumn('sum', f.expr('aggregate(array, 0, (sum, x) -> sum + LENGTH(x))')) \
      .withColumn('size', f.size('array')) \
      .withColumn('avg', f.col('sum') / f.col('size')) \
      .show(10, False)
    
    +----+---------------------------------------+---------------------------------------------+---------------+---+----+---+
    |col1|col2                                   |array                                        |count_arr      |sum|size|avg|
    +----+---------------------------------------+---------------------------------------------+---------------+---+----+---+
    |1   |For more than five years               |[For, more, than, five, years]               |[3, 4, 4, 4, 5]|20 |5   |4.0|
    |2   |For moasdre than five asdfyears        |[For, moasdre, than, five, asdfyears]        |[3, 7, 4, 4, 9]|27 |5   |5.4|
    |3   |Fasdfor more thasdfan fidafve yearasdfs|[Fasdfor, more, thasdfan, fidafve, yearasdfs]|[7, 4, 8, 7, 9]|35 |5   |7.0|
    +----+---------------------------------------+---------------------------------------------+---------------+---+----+---+
    
    

    I know this is really different approach but would be helpful.