Search code examples
pythonpython-2.7hadooppysparkrdd

How to correctly groupByKey for non pairwiseRDDs using pyspark


I'm new to Python. I'm also new to pysaprk. I'm trying to run a code that takes a tuple of tuple that looks like this (id , (span, mention)) to perform .map(lambda (id, (span, text)): (id, text)).

The code I'm working with is:

 m = text\
            .map(lambda (id, (span, text)): (id, text))\
            .mapValues(lambda v: ngrams(v, self.max_ngram))\'''error triggered here'''
            .flatMap(lambda (target, tokens): (((target, t), 1) for t in tokens))\

This how the original data is formatted (id, source, span, text):

 {'_id': u'en.wikipedia.org/wiki/Cerambycidae',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (61, 73),
  'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
 {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (20, 29),
  'text': u'It was described by Dru Drury in 1770.'}]

I get this error:

 for k, v in iterator:
TypeError: tuple indices must be integers, not str

I know groupByKey work on pairwiseRDDs, so I would like to know how to correctly perform groupByKey to resolve this issue?

Any help or guidance will be truly appreciated.

I'm using python 2.7 and pyspark 2.3.0.

Thank you in advance.


Solution

  • First you need to map the data into a form that has a key and value and then groupByKey.

    A key and value form is always a tuple (a, b) with the key being a and value b. a and b may be tuples themselves.

    rdd = sc.parallelize([{'_id': u'en.wikipedia.org/wiki/Cerambycidae',
      'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
      'span': (61, 73),
      'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
     {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
      'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
      'span': (20, 29),
      'text': u'It was described by Dru Drury in 1770.'},
     {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
      'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens2',
      'span': (20, 29, 2),
      'text': u'It was described by Dru Drury in 1770.2'}])
    
    print rdd.map(lambda x: (x["_id"], (x["span"], x["text"]))).groupByKey()\
    .map(lambda x: (x[0], list(x[1]))).collect() 
    

    [(u'en.wikipedia.org/wiki/Dru_Drury', [((20, 29), u'It was described by Dru Drury in 1770.'), ((20, 29, 2), u'It was described by Dru Drury in 1770.2')]), (u'en.wikipedia.org/wiki/Cerambycidae', [((61, 73), u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.')])]