Search code examples
mapreduceapache-pig

counting genres in pig


I deal with the dataset movies.dat provided by movielensdata. First 5 rows of the data is

1:Toy Story (1995):Adventure|Animation|Children|Comedy|Fantasy
2:Jumanji (1995):Adventure|Children|Fantasy
3:Grumpier Old Men (1995):Comedy|Romance
4:Waiting to Exhale (1995):Comedy|Drama|Romance
5:Father of the Bride Part II (1995):Comedy

I want to count exact number of occurences of each genre. To do this, the following mapreduce (python) code is sufficient.

#!/usr/bin/env python

import sys

#mapper

for line in sys.stdin:
    for genre in line.strip().split(":")[-1].split("|"):
        print("{x}\t1".format(x=genre))

#!/usr/bin/env python                                                                                                                                                   
#reducer
import sys                                                                                                                                                              

genre_dict={}                                                                                                                                                           
for line in sys.stdin:                                                                                                                                                  
    data=line.strip().split("\t")                                                                                                                                       
    if len(data)!=2:                                                                                                                                                    
        continue                                                                                                                                                        
    else:                                                                                                                                                               
        if data[0] not in genre_dict.keys():                                                                                                                            
            genre_dict[data[0]]=1                                                                                                                                       
        else:                                                                                                                                                           
            genre_dict[data[0]]+=1                                                                                                                                      

a=list(genre_dict.items())                                                                                                                                              
a.sort(key=lambda x:x[1],reverse=True)                                                                                                                                  

for genre,count in a:                                                                                                                                                   
    print("{x}\t{y}".format(x=genre,y=count)) 

Any suggestion for the pig's query to do the same task? Thanks in advance...


Solution

  • TOKENIZE and FLATTEN can help you out here. The TOKENIZE operator in Pig takes a string and a delimiter, splits the string into parts based on the delimiter and puts the parts into a bag. The FLATTEN operator in Pig takes a bag and explodes each element in the bag into a new record. The code will look as follows:

    --Load you initial data and split into columns based on ':'
    data = LOAD 'path_to_data' USING PigStorage(':') AS (index:long, name:chararray, genres:chararray);
    
    --Split & Explode each individual genre into a separate record
    dataExploded = FOREACH data GENERATE FLATTEN(TOKENIZE(genres, '|')) AS genre;
    
    --GROUP and get counts for each genre
    dataWithCounts = FOREACH (GROUP dataExploded BY genre) GENERATE
                  group AS genre,
                  COUNT(dataExploded) AS genreCount;
    
    DUMP dataWithCounts;