Question
Page Rank algorithm Hadoop implementation Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look
Page Rank algorithm Hadoop implementation
Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look for the string place holder:. This is where you should be adding code to start the final MR stage. For the final MR handling, you will need to add two new classes that have names HadoopPageRankResultMapper.java and HadoopPageRankResultReducer.java.
--------------------------------------------------------------
HadoopPageRank.java
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class HadoopPageRank extends Configured implements Tool { public int run(String[] args) throws Exception { // house keeping int iteration = 0; Path inputPath = new Path(args[0]); Path basePath = new Path(args[1]); FileSystem fs = FileSystem.get(getConf()); fs.delete(basePath, true); fs.mkdirs(basePath); // configure initial job Job initJob = Job.getInstance(getConf(),"pageRank"); initJob.setJarByClass(HadoopPageRank.class); initJob.setMapperClass(HadoopPageRankInitMapper.class); initJob.setReducerClass(HadoopPageRankInitReducer.class); initJob.setOutputKeyClass(Text.class); initJob.setOutputValueClass(Text.class); initJob.setInputFormatClass(TextInputFormat.class); Path outputPath = new Path(basePath, "iteration_" + iteration); FileInputFormat.addInputPath(initJob, inputPath); FileOutputFormat.setOutputPath(initJob, outputPath); // let initJob run and wait for finish if ( !initJob.waitForCompletion(true) ) { return -1; } // calculate the page ranks int totalIterations = Integer.parseInt(args[2]); while ( iteration iteration ++; inputPath = outputPath; // new input is the old output outputPath = new Path(basePath, "iteration_" + iteration); Job mainJob = Job.getInstance(getConf(),"Iteration " + iteration); mainJob.setJarByClass(HadoopPageRank.class); mainJob.setMapperClass(HadoopPageRankMainJobMapper.class); mainJob.setReducerClass(HadoopPageRankMainJobReducer.class); mainJob.setOutputKeyClass(Text.class); mainJob.setOutputValueClass(Text.class); mainJob.setInputFormatClass(TextInputFormat.class); FileInputFormat.setInputPaths(mainJob, inputPath); FileOutputFormat.setOutputPath(mainJob, outputPath); if ( !mainJob.waitForCompletion(true) ) { return -1; } } // collect the result, highest rank first - you will need to finish this up Job resultJob = Job.getInstance(getConf(),"final result"); resultJob.setJarByClass(HadoopPageRank.class);
/* * place holder: * here is the place you will need to add a final Map/Reduce code */ FileInputFormat.setInputPaths(resultJob, outputPath); FileOutputFormat.setOutputPath(resultJob,new Path(basePath, "result")); if ( !resultJob.waitForCompletion(true) ) { return -1; } return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HadoopPageRank(), args); System.exit(exitCode); } }
-----------------------------------------------------------------------------------------------
HadoopPageRankInitMapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class HadoopPageRankInitMapper extends Mapper {,>
@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.charAt(0) == '#' ) { return; } int tabIndex = value.find("\t"); String nodeA = Text.decode(value.getBytes(), 0, tabIndex); String nodeB = Text.decode(value.getBytes(), tabIndex + 1, value.getLength() - (tabIndex + 1)); context.write(new Text(nodeA), new Text(nodeB)); } }
______________________________________________________________________________
HadoopPageRankInitReducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class HadoopPageRankInitReducer extends Reducer {,>
public static final long TOTAL_WEB_PAGES = 4; @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { boolean first = true; String links = (1.0 / TOTAL_WEB_PAGES) + "\t";
for (Text value : values) { if (!first) links += ","; links += value.toString(); first = false; } context.write(key, new Text(links)); } }
_______________________________________
HadoopPageRankMainJobMapper.java
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class HadoopPageRankMainJobMapper extends Mapper {,>
@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.getLength() == 0 ) { return; } int tabIdx1 = value.find("\t"); int tabIdx2 = value.find("\t", tabIdx1 + 1); // extract tokens from the current line String page = Text.decode(value.getBytes(), 0, tabIdx1); String pageRank = Text.decode(value.getBytes(), tabIdx1 + 1, tabIdx2 - (tabIdx1 + 1)); String outlinks = Text.decode(value.getBytes(), tabIdx2 + 1, value.getLength() - (tabIdx2 + 1)); // calculate contribution to each target page String[] allNextPages = outlinks.split(","); if ( allNextPages == null || allNextPages.length == 0 ) { return; } double currentPR = Double.parseDouble(pageRank.toString()); int totalNumOfNextPages = allNextPages.length; for (String nextPage : allNextPages) { Text rankContribution = new Text(currentPR/totalNumOfNextPages + ""); context.write(new Text(nextPage), rankContribution); } // put the original links so the reducer is able to produce the correct output context.write(new Text(page), new Text("|" + outlinks)); } }
______________________________________________________________
HadoopPageRankMainJobReducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class HadoopPageRankMainJobReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if ( values == null ) { return; } String links = ""; double receivedContribution = 0.0; for (Text value : values) { String content = value.toString(); if (content.startsWith("|")) { links += content.substring("|".length()); } else { receivedContribution += Double.parseDouble(content); },>
} double newPageRank = receivedContribution; context.write(key, new Text(newPageRank + "\t" + links)); }
}
Step by Step Solution
There are 3 Steps involved in it
Step: 1
Get Instant Access to Expert-Tailored Solutions
See step-by-step solutions with expert insights and AI powered tools for academic success
Step: 2
Step: 3
Ace Your Homework with AI
Get the answers you need in no time with our AI-driven, step-by-step assistance
Get Started