Answered step by step
Verified Expert Solution
Link Copied!

Question

1 Approved Answer

Page Rank algorithm Hadoop implementation Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look

Page Rank algorithm Hadoop implementation

Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look for the string place holder:. This is where you should be adding code to start the final MR stage. For the final MR handling, you will need to add two new classes that have names HadoopPageRankResultMapper.java and HadoopPageRankResultReducer.java.

--------------------------------------------------------------

HadoopPageRank.java

import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class HadoopPageRank extends Configured implements Tool { public int run(String[] args) throws Exception { // house keeping int iteration = 0; Path inputPath = new Path(args[0]); Path basePath = new Path(args[1]); FileSystem fs = FileSystem.get(getConf()); fs.delete(basePath, true); fs.mkdirs(basePath); // configure initial job Job initJob = Job.getInstance(getConf(),"pageRank"); initJob.setJarByClass(HadoopPageRank.class); initJob.setMapperClass(HadoopPageRankInitMapper.class); initJob.setReducerClass(HadoopPageRankInitReducer.class); initJob.setOutputKeyClass(Text.class); initJob.setOutputValueClass(Text.class); initJob.setInputFormatClass(TextInputFormat.class); Path outputPath = new Path(basePath, "iteration_" + iteration); FileInputFormat.addInputPath(initJob, inputPath); FileOutputFormat.setOutputPath(initJob, outputPath); // let initJob run and wait for finish if ( !initJob.waitForCompletion(true) ) { return -1; } // calculate the page ranks int totalIterations = Integer.parseInt(args[2]); while ( iteration iteration ++; inputPath = outputPath; // new input is the old output outputPath = new Path(basePath, "iteration_" + iteration); Job mainJob = Job.getInstance(getConf(),"Iteration " + iteration); mainJob.setJarByClass(HadoopPageRank.class); mainJob.setMapperClass(HadoopPageRankMainJobMapper.class); mainJob.setReducerClass(HadoopPageRankMainJobReducer.class); mainJob.setOutputKeyClass(Text.class); mainJob.setOutputValueClass(Text.class); mainJob.setInputFormatClass(TextInputFormat.class); FileInputFormat.setInputPaths(mainJob, inputPath); FileOutputFormat.setOutputPath(mainJob, outputPath); if ( !mainJob.waitForCompletion(true) ) { return -1; } } // collect the result, highest rank first - you will need to finish this up Job resultJob = Job.getInstance(getConf(),"final result"); resultJob.setJarByClass(HadoopPageRank.class);

/* * place holder: * here is the place you will need to add a final Map/Reduce code */ FileInputFormat.setInputPaths(resultJob, outputPath); FileOutputFormat.setOutputPath(resultJob,new Path(basePath, "result")); if ( !resultJob.waitForCompletion(true) ) { return -1; } return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HadoopPageRank(), args); System.exit(exitCode); } }

-----------------------------------------------------------------------------------------------

HadoopPageRankInitMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

public class HadoopPageRankInitMapper extends Mapper {,>

@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.charAt(0) == '#' ) { return; } int tabIndex = value.find("\t"); String nodeA = Text.decode(value.getBytes(), 0, tabIndex); String nodeB = Text.decode(value.getBytes(), tabIndex + 1, value.getLength() - (tabIndex + 1)); context.write(new Text(nodeA), new Text(nodeB)); } }

______________________________________________________________________________

HadoopPageRankInitReducer.java

import java.io.IOException;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

public class HadoopPageRankInitReducer extends Reducer {,>

public static final long TOTAL_WEB_PAGES = 4; @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { boolean first = true; String links = (1.0 / TOTAL_WEB_PAGES) + "\t";

for (Text value : values) { if (!first) links += ","; links += value.toString(); first = false; } context.write(key, new Text(links)); } }

_______________________________________

HadoopPageRankMainJobMapper.java

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HadoopPageRankMainJobMapper extends Mapper {,>

@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.getLength() == 0 ) { return; } int tabIdx1 = value.find("\t"); int tabIdx2 = value.find("\t", tabIdx1 + 1); // extract tokens from the current line String page = Text.decode(value.getBytes(), 0, tabIdx1); String pageRank = Text.decode(value.getBytes(), tabIdx1 + 1, tabIdx2 - (tabIdx1 + 1)); String outlinks = Text.decode(value.getBytes(), tabIdx2 + 1, value.getLength() - (tabIdx2 + 1)); // calculate contribution to each target page String[] allNextPages = outlinks.split(","); if ( allNextPages == null || allNextPages.length == 0 ) { return; } double currentPR = Double.parseDouble(pageRank.toString()); int totalNumOfNextPages = allNextPages.length; for (String nextPage : allNextPages) { Text rankContribution = new Text(currentPR/totalNumOfNextPages + ""); context.write(new Text(nextPage), rankContribution); } // put the original links so the reducer is able to produce the correct output context.write(new Text(page), new Text("|" + outlinks)); } }

______________________________________________________________

HadoopPageRankMainJobReducer.java

import java.io.IOException;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

public class HadoopPageRankMainJobReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if ( values == null ) { return; } String links = ""; double receivedContribution = 0.0; for (Text value : values) { String content = value.toString(); if (content.startsWith("|")) { links += content.substring("|".length()); } else { receivedContribution += Double.parseDouble(content); },>

} double newPageRank = receivedContribution; context.write(key, new Text(newPageRank + "\t" + links)); }

}

Step by Step Solution

There are 3 Steps involved in it

Step: 1

blur-text-image

Get Instant Access to Expert-Tailored Solutions

See step-by-step solutions with expert insights and AI powered tools for academic success

Step: 2

blur-text-image

Step: 3

blur-text-image

Ace Your Homework with AI

Get the answers you need in no time with our AI-driven, step-by-step assistance

Get Started

Recommended Textbook for

App Inventor

Authors: David Wolber, Hal Abelson

1st Edition

1449397484, 9781449397487

More Books

Students also viewed these Programming questions

Question

In Exercises find the indefinite integral. 12 1 + 9x dx

Answered: 1 week ago