i am using hadoop 3.3.4 and trying to execute a mapreduce program in python that use google page rank algorithm to rank pages.
I'm trying to run this on my own Hadoop cluster. I ran the job using the following command.
mapred streaming -files mapper.py,reducer.py -input /user/hadoop/input/input.txt -output /user/hadoop/output -mapper ./mapper.py -reducer ./reducer.py
But getting the following error!
2023-07-14 17:19:01,076 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at Master/192.168.1.10:8032
2023-07-14 17:19:01,343 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/hadoop/.staging/job_1689368595696_0003
2023-07-14 17:19:01,790 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-14 17:19:01,934 INFO mapreduce.JobSubmitter: number of splits:20
2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1689368595696_0003
2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-14 17:19:02,293 INFO conf.Configuration: resource-types.xml not found
2023-07-14 17:19:02,294 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-07-14 17:19:02,351 INFO impl.YarnClientImpl: Submitted application application_1689368595696_0003
2023-07-14 17:19:02,408 INFO mapreduce.Job: The url to track the job: http://Master:8088/proxy/application_1689368595696_0003/
2023-07-14 17:19:02,410 INFO mapreduce.Job: Running job: job_1689368595696_0003
2023-07-14 17:19:09,539 INFO mapreduce.Job: Job job_1689368595696_0003 running in uber mode : false
2023-07-14 17:19:09,540 INFO mapreduce.Job: map 0% reduce 0%
2023-07-14 17:19:33,742 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000002_0, Status : FAILED
[2023-07-14 17:19:29.868]Container killed on request. Exit code is 137
[2023-07-14 17:19:30.046]Container exited with a non-zero exit code 137.
[2023-07-14 17:19:30.080]Killed by external signal
2023-07-14 17:19:33,830 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
i have also added #! /usr/bin/env python3 at the beginning of my both files mapper.py and reducer.py
this is my mapper.py:
#!/usr/bin/env python3
import sys
import networkx as nx
# Create a directed graph
G = nx.DiGraph()
# Dictionary to store the mapping of page_id to page_link
page_link_dict = {}
# Read the input and store lines in a list
lines = sys.stdin.readlines()
# Process each line
for line in lines:
page_infos = line.strip().split('\t')
page_id = int(page_infos[0])
page_link = page_infos[2]
# Add the node to the graph
G.add_node(page_id)
# Store the mapping of page_id to page_link in the dictionary
page_link_dict[page_id] = page_link
for line in lines:
page_id, page_title, page_link, links = line.strip().split('\t')
# Split the links
pages_links = links.split(',')
for page_link in pages_links:
# Search for the id of the linked page and add an edge to it
for linked_page_id, link in page_link_dict.items():
if page_link == link:
G.add_edge(int(page_id), linked_page_id)
# Output the graph as adjacency list
for node in G.nodes():
neighbors = ','.join(map(str, G.neighbors(node)))
sys.stdout.write(f'{node}\t{neighbors}\n')
and this is my reducer.py:
#!/usr/bin/env python3
import sys
import networkx as nx
# Create a directed graph
G = nx.DiGraph()
# Read the adjacency list from mapper output and add edges to the graph
for line in sys.stdin:
page_infos = line.strip().split('\t')
node_id = page_infos[0]
node_id = int(node_id)
G.add_node(node_id)
if len(page_infos) == 2:
neighbors = page_infos[1]
neighbors = neighbors.split(',')
for neighbor_id in neighbors:
G.add_edge(node_id, int(neighbor_id))
# Run the PageRank algorithm
pagerank_scores = nx.pagerank(G)
# Write the output to stdout
for page_id, rank in pagerank_scores.items():
sys.stdout.write(f'{page_id}\t{rank}\n')
i also tried to test the code locally in my computer by running these commands:
cat input.txt | ./mapper.py > file.txt
cat file.txt | ./reducer.py
and it worked fine here some result below displaying every page id and its corresponding rank score:
10 8.883661079121364e-05
9 6.371139303094697e-05
97 7.724393979460297e-05
152 0.0002934058532326906
145 0.00011016393468028126
11 8.886938479913977e-05
13 8.887866372994127e-05
12 6.371139303094697e-05
more results after that ...
finally, I also tested a small wordcount MapReduce program in python to test my Hadoop configuration and this worked as well and i did install all the dependency packages in my master to run the program and i installed them in my 2 slaves too i don't know if that would be necessary.
UPDATE!!: i used apache spark framework with graphframes for this project since it's more compatible with graphs and got the desired resluts thank youu
My problem was that i installed dependencies using pip install networkx
and that will not install the module in the root system instead i added sudo to make that works sudo pip install networkx
.
after that the job executed successfully.
but still the results i got are not similar to the ones i got locally by running the scripts so my guess is something related to MapReduce logic i missed. so if anyone there can help me with that and i will make another post for that.
UPDATE: The MapReduce paradigm in Hadoop is primarily designed for batch processing and does not naturally support iterative algorithms like PageRank that require multiple iterations and global communication between nodes.
In the case of the PageRank algorithm, where each iteration depends on the results of the previous iteration and requires all the nodes (all my page ids in my input.txt), frameworks like Apache Spark are better suited. Spark provides an in-memory distributed computing model that allows for iterative processing and efficient data sharing across iterations.