import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.lang.Integer; import java.util.StringTokenizer; import java.util.TreeSet; // >>> Don't Change public class WHV2 extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(WHV2.class); public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WHV2(), args); System.exit(res); } public static class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } public TextArrayWritable(String[] numbers) { super(Text.class); Text[] texts = new Text[numbers.length]; for (int i = 0; i < numbers.length; i++) { texts[i] = new Text(numbers[i]); } set(texts); } } @Override public int run(String[] args) throws Exception { // TODO Configuration conf = this.getConf(); FileSystem fs = FileSystem.get(conf); Path tmpPath = new Path("/user/lada38/mp2/tmp"); fs.delete(tmpPath, true); Job jobA = Job.getInstance(conf, "WHV2_A"); jobA.setOutputKeyClass(Text.class); jobA.setOutputValueClass(IntWritable.class); jobA.setMapperClass(LinkCountMap.class); jobA.setReducerClass(LinkCountReduce.class); jobA.setMapOutputKeyClass(Text.class); jobA.setMapOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(jobA, new Path(args[0])); FileOutputFormat.setOutputPath(jobA, tmpPath); jobA.setJarByClass(WHV2.class); jobA.waitForCompletion(true); Job jobB = Job.getInstance(conf, "WHV2_B"); jobB.setOutputKeyClass(NullWritable.class); jobB.setOutputValueClass(TextArrayWritable.class); jobB.setMapOutputKeyClass( NullWritable.class); jobB.setMapOutputValueClass(TextArrayWritable.class); jobB.setMapperClass(TopLinksMap.class); jobB.setReducerClass(TopLinksReduce.class); jobB.setNumReduceTasks(1); FileInputFormat.setInputPaths(jobB, tmpPath); FileOutputFormat.setOutputPath(jobB, new Path(args[1])); jobB.setInputFormatClass(KeyValueTextInputFormat.class); jobB.setOutputFormatClass(TextOutputFormat.class); jobB.setJarByClass(WHV2.class); return jobB.waitForCompletion(true) ? 0 : 1; } public static class LinkCountMap extends Mapper { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); String keyA=line[0]+" "+line[2]+" "+line[1]+" "+line[19]+" "+line[20]; context.write(new Text(keyA),new IntWritable(1)); } } public static class LinkCountReduce extends Reducer { // TODO @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Integer sum = new Integer(0); for (IntWritable val: values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static class TopLinksMap extends Mapper { Integer N; private TreeSet> coutToPageIdMap = new TreeSet>(); @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); this.N = conf.getInt("N", 10); } // TODO @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { Integer count = Integer.parseInt(value.toString()); String word = key.toString(); coutToPageIdMap.add(new Pair (count,word)); if (coutToPageIdMap.size() > N) { coutToPageIdMap.remove(coutToPageIdMap.first()); } } @Override protected void cleanup(Context context) throws IOException,InterruptedException { for (Pairitem : coutToPageIdMap){ String[] strings = {item.second , item.first.toString()}; context.write(NullWritable.get(),new TextArrayWritable(strings)); } } } public static class TopLinksReduce extends Reducer { Integer N; private TreeSet> coutToPageIdMap = new TreeSet>(); @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); this.N = conf.getInt("N", 10); } // TODO @Override public void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { for(TextArrayWritable val: values){ Text[] pair = (Text[]) val.toArray(); String word = pair[0].toString(); Integer count = Integer.parseInt(pair[1].toString()); coutToPageIdMap.add(new Pair (count , word)); if (coutToPageIdMap.size() > this.N){ coutToPageIdMap.remove(coutToPageIdMap.first()); } } for (Pair item : coutToPageIdMap){ Text word = new Text(item.second); IntWritable value = new IntWritable(item.first); context.write(word, value); } } } } // >>> Don't Change class Pair, B extends Comparable> implements Comparable> { public final A first; public final B second; public Pair(A first, B second) { this.first = first; this.second = second; } public static , B extends Comparable> Pair of(A first, B second) { return new Pair(first, second); } @Override public int compareTo(Pair o) { int cmp = o == null ? 1 : (this.first).compareTo(o.first); return cmp == 0 ? (this.second).compareTo(o.second) : cmp; } @Override public int hashCode() { return 31 * hashcode(first) + hashcode(second); } private static int hashcode(Object o) { return o == null ? 0 : o.hashCode(); } @Override public boolean equals(Object obj) { if (!(obj instanceof Pair)) return false; if (this == obj) return true; return equal(first, ((Pair) obj).first) && equal(second, ((Pair) obj).second); } private boolean equal(Object o1, Object o2) { return o1 == o2 || (o1 != null && o1.equals(o2)); } @Override public String toString() { return "(" + first + ", " + second + ')'; } } // <<< Don't Change