- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.ArrayWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import java.io.IOException;
- import java.lang.Integer;
- import java.util.StringTokenizer;
- import java.util.TreeSet;
- // >>> Don't Change
- public class WHV2 extends Configured implements Tool {
- public static final Log LOG = LogFactory.getLog(WHV2.class);
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new WHV2(), args);
- System.exit(res);
- }
- public static class TextArrayWritable extends ArrayWritable {
- public TextArrayWritable() {
- super(Text.class);
- }
- public TextArrayWritable(String[] numbers) {
- super(Text.class);
- Text[] texts = new Text[numbers.length];
- for (int i = 0; i < numbers.length; i++) {
- texts[i] = new Text(numbers[i]);
- }
- set(texts);
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- // TODO
- Configuration conf = this.getConf();
- FileSystem fs = FileSystem.get(conf);
- Path tmpPath = new Path("/user/lada38/mp2/tmp");
- fs.delete(tmpPath, true);
- Job jobA = Job.getInstance(conf, "WHV2_A");
- jobA.setOutputKeyClass(Text.class);
- jobA.setOutputValueClass(IntWritable.class);
- jobA.setMapperClass(LinkCountMap.class);
- jobA.setReducerClass(LinkCountReduce.class);
- jobA.setMapOutputKeyClass(Text.class);
- jobA.setMapOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(jobA, new Path(args[0]));
- FileOutputFormat.setOutputPath(jobA, tmpPath);
- jobA.setJarByClass(WHV2.class);
- jobA.waitForCompletion(true);
- Job jobB = Job.getInstance(conf, "WHV2_B");
- jobB.setOutputKeyClass(NullWritable.class);
- jobB.setOutputValueClass(TextArrayWritable.class);
- jobB.setMapOutputKeyClass( NullWritable.class);
- jobB.setMapOutputValueClass(TextArrayWritable.class);
- jobB.setMapperClass(TopLinksMap.class);
- jobB.setReducerClass(TopLinksReduce.class);
- jobB.setNumReduceTasks(1);
- FileInputFormat.setInputPaths(jobB, tmpPath);
- FileOutputFormat.setOutputPath(jobB, new Path(args[1]));
- jobB.setInputFormatClass(KeyValueTextInputFormat.class);
- jobB.setOutputFormatClass(TextOutputFormat.class);
- jobB.setJarByClass(WHV2.class);
- return jobB.waitForCompletion(true) ? 0 : 1;
- }
- public static class LinkCountMap extends Mapper<Object, Text, Text, IntWritable> {
- @Override
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String[] line = value.toString().split(",");
- String keyA=line[0]+" "+line[2]+" "+line[1]+" "+line[19]+" "+line[20];
- context.write(new Text(keyA),new IntWritable(1));
- }
- }
- public static class LinkCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
- // TODO
- @Override
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- Integer sum = new Integer(0);
- for (IntWritable val: values) {
- sum += val.get();
- }
- context.write(key, new IntWritable(sum));
- }
- }
- public static class TopLinksMap extends Mapper<Text, Text, NullWritable, TextArrayWritable> {
- Integer N;
- private TreeSet<Pair<Integer, String>> coutToPageIdMap = new TreeSet<Pair<Integer, String>>();
- @Override
- protected void setup(Context context) throws IOException,InterruptedException {
- Configuration conf = context.getConfiguration();
- this.N = conf.getInt("N", 10);
- }
- // TODO
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- Integer count = Integer.parseInt(value.toString());
- String word = key.toString();
- coutToPageIdMap.add(new Pair <Integer , String> (count,word));
- if (coutToPageIdMap.size() > N) {
- coutToPageIdMap.remove(coutToPageIdMap.first());
- }
- }
- @Override
- protected void cleanup(Context context) throws IOException,InterruptedException {
- for (Pair<Integer , String>item : coutToPageIdMap){
- String[] strings = {item.second , item.first.toString()};
- context.write(NullWritable.get(),new TextArrayWritable(strings));
- }
- }
- }
- public static class TopLinksReduce extends Reducer<NullWritable, TextArrayWritable, Text, IntWritable> {
- Integer N;
- private TreeSet<Pair<Integer, String>> coutToPageIdMap = new TreeSet<Pair<Integer, String>>();
- @Override
- protected void setup(Context context) throws IOException,InterruptedException {
- Configuration conf = context.getConfiguration();
- this.N = conf.getInt("N", 10);
- }
- // TODO
- @Override
- public void reduce(NullWritable key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException {
- for(TextArrayWritable val: values){
- Text[] pair = (Text[]) val.toArray();
- String word = pair[0].toString();
- Integer count = Integer.parseInt(pair[1].toString());
- coutToPageIdMap.add(new Pair<Integer , String> (count , word));
- if (coutToPageIdMap.size() > this.N){
- coutToPageIdMap.remove(coutToPageIdMap.first());
- }
- }
- for (Pair<Integer , String> item : coutToPageIdMap){
- Text word = new Text(item.second);
- IntWritable value = new IntWritable(item.first);
- context.write(word, value);
- }
- }
- }
- }
- // >>> Don't Change
- class Pair<A extends Comparable<? super A>,
- B extends Comparable<? super B>>
- implements Comparable<Pair<A, B>> {
- public final A first;
- public final B second;
- public Pair(A first, B second) {
- this.first = first;
- this.second = second;
- }
- public static <A extends Comparable<? super A>,
- B extends Comparable<? super B>>
- Pair<A, B> of(A first, B second) {
- return new Pair<A, B>(first, second);
- }
- @Override
- public int compareTo(Pair<A, B> o) {
- int cmp = o == null ? 1 : (this.first).compareTo(o.first);
- return cmp == 0 ? (this.second).compareTo(o.second) : cmp;
- }
- @Override
- public int hashCode() {
- return 31 * hashcode(first) + hashcode(second);
- }
- private static int hashcode(Object o) {
- return o == null ? 0 : o.hashCode();
- }
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof Pair))
- return false;
- if (this == obj)
- return true;
- return equal(first, ((Pair<?, ?>) obj).first)
- && equal(second, ((Pair<?, ?>) obj).second);
- }
- private boolean equal(Object o1, Object o2) {
- return o1 == o2 || (o1 != null && o1.equals(o2));
- }
- @Override
- public String toString() {
- return "(" + first + ", " + second + ')';
- }
- }
- // <<< Don't Change