Facebook
From Capacious Butterfly, 3 Years ago, written in Plain Text.
Embed
Download Paste or View Raw
Hits: 133
  1. import org.apache.commons.logging.Log;
  2. import org.apache.commons.logging.LogFactory;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.ArrayWritable;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.NullWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20.  
  21. import java.io.IOException;
  22. import java.lang.Integer;
  23. import java.util.StringTokenizer;
  24. import java.util.TreeSet;
  25.  
  26. // >>> Don't Change
  27. public class WHV2 extends Configured implements Tool {
  28.     public static final Log LOG = LogFactory.getLog(WHV2.class);
  29.  
  30.     public static void main(String[] args) throws Exception {
  31.         int res = ToolRunner.run(new Configuration(), new WHV2(), args);
  32.         System.exit(res);
  33.     }
  34.  
  35.     public static class TextArrayWritable extends ArrayWritable {
  36.         public TextArrayWritable() {
  37.             super(Text.class);
  38.         }
  39.  
  40.         public TextArrayWritable(String[] numbers) {
  41.             super(Text.class);
  42.             Text[] texts = new Text[numbers.length];
  43.             for (int i = 0; i < numbers.length; i++) {
  44.                 texts[i] = new Text(numbers[i]);
  45.             }
  46.             set(texts);
  47.         }
  48.     }
  49.  
  50.     @Override
  51.     public int run(String[] args) throws Exception {
  52.         // TODO
  53.         Configuration conf = this.getConf();
  54.         FileSystem fs = FileSystem.get(conf);
  55.         Path tmpPath = new Path("/user/lada38/mp2/tmp");
  56.         fs.delete(tmpPath, true);
  57.  
  58.         Job jobA = Job.getInstance(conf, "WHV2_A");
  59.         jobA.setOutputKeyClass(Text.class);
  60.         jobA.setOutputValueClass(IntWritable.class);
  61.  
  62.         jobA.setMapperClass(LinkCountMap.class);
  63.         jobA.setReducerClass(LinkCountReduce.class);
  64.  
  65.         jobA.setMapOutputKeyClass(Text.class);
  66.         jobA.setMapOutputValueClass(IntWritable.class);
  67.  
  68.         FileInputFormat.setInputPaths(jobA, new Path(args[0]));
  69.         FileOutputFormat.setOutputPath(jobA, tmpPath);
  70.  
  71.         jobA.setJarByClass(WHV2.class);
  72.         jobA.waitForCompletion(true);
  73.  
  74.         Job jobB = Job.getInstance(conf, "WHV2_B");
  75.         jobB.setOutputKeyClass(NullWritable.class);
  76.         jobB.setOutputValueClass(TextArrayWritable.class);
  77.  
  78.         jobB.setMapOutputKeyClass(  NullWritable.class);
  79.         jobB.setMapOutputValueClass(TextArrayWritable.class);
  80.  
  81.         jobB.setMapperClass(TopLinksMap.class);
  82.         jobB.setReducerClass(TopLinksReduce.class);
  83.         jobB.setNumReduceTasks(1);
  84.  
  85.             FileInputFormat.setInputPaths(jobB, tmpPath);
  86.         FileOutputFormat.setOutputPath(jobB, new Path(args[1]));
  87.  
  88.         jobB.setInputFormatClass(KeyValueTextInputFormat.class);
  89.         jobB.setOutputFormatClass(TextOutputFormat.class);
  90.  
  91.         jobB.setJarByClass(WHV2.class);
  92.         return jobB.waitForCompletion(true) ? 0 : 1;
  93.     }
  94.  
  95.     public static class LinkCountMap extends Mapper<Object, Text, Text, IntWritable> {
  96.         @Override
  97.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  98.             String[] line = value.toString().split(",");
  99.             String keyA=line[0]+" "+line[2]+" "+line[1]+" "+line[19]+" "+line[20];
  100.             context.write(new Text(keyA),new IntWritable(1));
  101.         }
  102.     }
  103.  
  104.     public static class LinkCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  105.         // TODO
  106.         @Override
  107.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  108.             Integer sum = new Integer(0);
  109.             for (IntWritable val: values) {
  110.                 sum += val.get();
  111.             }
  112.  
  113.             context.write(key, new IntWritable(sum));
  114.         }
  115.     }
  116.  
  117.     public static class TopLinksMap extends Mapper<Text, Text, NullWritable, TextArrayWritable> {
  118.         Integer N;
  119.         private TreeSet<Pair<Integer, String>> coutToPageIdMap = new TreeSet<Pair<Integer, String>>();
  120.        
  121.         @Override
  122.         protected void setup(Context context) throws IOException,InterruptedException {
  123.             Configuration conf = context.getConfiguration();
  124.             this.N = conf.getInt("N", 10);
  125.         }
  126.         // TODO
  127.        
  128.         @Override
  129.         public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  130.             Integer count = Integer.parseInt(value.toString());
  131.             String word = key.toString();
  132.  
  133.             coutToPageIdMap.add(new Pair <Integer , String> (count,word));
  134.            
  135.             if (coutToPageIdMap.size() > N) {
  136.                 coutToPageIdMap.remove(coutToPageIdMap.first());
  137.             }
  138.         }
  139.  
  140.         @Override
  141.         protected void cleanup(Context context) throws IOException,InterruptedException {
  142.             for (Pair<Integer , String>item : coutToPageIdMap){
  143.             String[] strings = {item.second , item.first.toString()};
  144.             context.write(NullWritable.get(),new TextArrayWritable(strings));
  145.  
  146.             }
  147.         }
  148.     }
  149.  
  150.     public static class TopLinksReduce extends Reducer<NullWritable, TextArrayWritable, Text, IntWritable> {
  151.         Integer N;
  152.         private TreeSet<Pair<Integer, String>> coutToPageIdMap = new TreeSet<Pair<Integer, String>>();
  153.        
  154.         @Override
  155.         protected void setup(Context context) throws IOException,InterruptedException {
  156.             Configuration conf = context.getConfiguration();
  157.             this.N = conf.getInt("N", 10);
  158.         }
  159.         // TODO
  160.         @Override
  161.         public void reduce(NullWritable key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException {
  162.             for(TextArrayWritable val: values){
  163.                 Text[] pair = (Text[]) val.toArray();
  164.                 String word =  pair[0].toString();
  165.                 Integer count = Integer.parseInt(pair[1].toString());
  166.  
  167.                 coutToPageIdMap.add(new Pair<Integer , String> (count , word));
  168.  
  169.                 if (coutToPageIdMap.size() > this.N){
  170.                     coutToPageIdMap.remove(coutToPageIdMap.first());
  171.                 }
  172.             }
  173.             for (Pair<Integer , String> item : coutToPageIdMap){
  174.                 Text word = new Text(item.second);
  175.                 IntWritable value = new IntWritable(item.first);
  176.                 context.write(word, value);
  177.             }
  178.         }
  179.     }
  180. }
  181.  
  182. // >>> Don't Change
  183. class Pair<A extends Comparable<? super A>,
  184.         B extends Comparable<? super B>>
  185.         implements Comparable<Pair<A, B>> {
  186.  
  187.     public final A first;
  188.     public final B second;
  189.  
  190.     public Pair(A first, B second) {
  191.         this.first = first;
  192.         this.second = second;
  193.     }
  194.  
  195.     public static <A extends Comparable<? super A>,
  196.             B extends Comparable<? super B>>
  197.     Pair<A, B> of(A first, B second) {
  198.         return new Pair<A, B>(first, second);
  199.     }
  200.  
  201.     @Override
  202.     public int compareTo(Pair<A, B> o) {
  203.         int cmp = o == null ? 1 : (this.first).compareTo(o.first);
  204.         return cmp == 0 ? (this.second).compareTo(o.second) : cmp;
  205.     }
  206.  
  207.     @Override
  208.     public int hashCode() {
  209.         return 31 * hashcode(first) + hashcode(second);
  210.     }
  211.  
  212.     private static int hashcode(Object o) {
  213.         return o == null ? 0 : o.hashCode();
  214.     }
  215.  
  216.     @Override
  217.     public boolean equals(Object obj) {
  218.         if (!(obj instanceof Pair))
  219.             return false;
  220.         if (this == obj)
  221.             return true;
  222.         return equal(first, ((Pair<?, ?>) obj).first)
  223.                 && equal(second, ((Pair<?, ?>) obj).second);
  224.     }
  225.  
  226.     private boolean equal(Object o1, Object o2) {
  227.         return o1 == o2 || (o1 != null && o1.equals(o2));
  228.     }
  229.  
  230.     @Override
  231.     public String toString() {
  232.         return "(" + first + ", " + second + ')';
  233.     }
  234. }
  235. // <<< Don't Change