Facebook
From jakurwa, 5 Years ago, written in Java.
Embed
Download Paste or View Raw
Hits: 200
  1.  
  2.  
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.ArrayWritable;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  19. import org.apache.hadoop.util.Tool;
  20. import org.apache.hadoop.util.ToolRunner;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23.  
  24. import java.io.BufferedReader;
  25. import java.io.IOException;
  26. import java.io.InputStreamReader;
  27. import java.util.*;
  28. import org.apache.hadoop.io.LongWritable;
  29.  
  30. public class PopularityLeague extends Configured implements Tool {
  31.     public static final Log LOG = LogFactory.getLog(PopularityLeague.class);
  32.  
  33.     public static void main(String[] args) throws Exception {
  34.         int res = ToolRunner.run(new Configuration(), new PopularityLeague(), args);
  35.         System.exit(res);
  36.     }
  37.  
  38.     @Override
  39.     public int run(String[] args) throws Exception {        
  40.         Configuration conf = this.getConf();
  41.         FileSystem fs = FileSystem.get(conf);
  42.         Path path = new Path("/user/lada14/lab1/tmp/");
  43.         fs.delete(path, true);
  44.        
  45.         Job jobA = Job.getInstance(conf, "Popularity league");
  46.        
  47.         jobA.setMapOutputKeyClass(IntWritable.class);
  48.         jobA.setMapOutputValueClass(IntWritable.class);
  49.        
  50.         jobA.setOutputKeyClass(IntWritable.class);
  51.         jobA.setOutputValueClass(IntWritable.class);
  52.        
  53.         jobA.setMapperClass(LinkCountMap.class);
  54.         jobA.setReducerClass(LinkCountReduce.class);
  55.  
  56.         FileInputFormat.setInputPaths(jobA, new Path(args[0]));
  57.         FileOutputFormat.setOutputPath(jobA, path);
  58.        
  59.         jobA.setJarByClass(PopularityLeague.class);
  60.         jobA.waitForCompletion(true);
  61.        
  62.         Job jobB = Job.getInstance(conf, "Pop Leag 2");
  63.        
  64.         jobB.setMapOutputKeyClass(IntWritable.class);
  65.         jobB.setMapOutputValueClass(IntWritable.class);
  66.        
  67.         jobB.setOutputKeyClass(IntWritable.class);
  68.         jobB.setOutputValueClass(IntWritable.class);
  69.        
  70.         jobB.setMapperClass(LeagueMap.class);
  71.         jobB.setReducerClass(LeagueReduce.class);
  72.        
  73.         FileInputFormat.setInputPaths(jobB, path);
  74.         FileOutputFormat.setOutputPath(jobB, new Path(args[1]));
  75.        
  76.         jobB.setJarByClass(PopularityLeague.class);
  77.         return jobB.waitForCompletion(true) ? 0 : 1;
  78.     }
  79.    
  80.     public static class LinkCountMap extends Mapper<Object, Text, IntWritable, IntWritable> {
  81.        
  82.         List<Integer> leagues = new ArrayList<>();
  83.        
  84.         @Override
  85.         protected void setup(Context context) throws IOException,InterruptedException {
  86.  
  87.             Configuration conf = context.getConfiguration();
  88.  
  89.             String leaguePath = conf.get("league");
  90.  
  91.             List<String> leaguesString = Arrays.asList(readHDFSFile(leaguePath, conf).split("\n"));
  92.             for (String l : leaguesString) {
  93.                 leagues.add(new Integer(l));
  94.             }
  95.         }
  96.        
  97.         @Override
  98.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  99.             String line = value.toString();
  100.             StringTokenizer tokenizer = new StringTokenizer(line, ": ");
  101.             if (tokenizer.hasMoreTokens()) {
  102.                 Integer from = new Integer(tokenizer.nextToken());
  103.                 while (tokenizer.hasMoreTokens()) {
  104.                     Integer to = new Integer(tokenizer.nextToken());
  105.                     if (leagues.contains(to)) {
  106.                         context.write(new IntWritable(to), new IntWritable(1));
  107.                     }
  108.                 }
  109.             }
  110.         }        
  111.     }
  112.    
  113.     public static class LinkCountReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  114.        
  115.         @Override
  116.            public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  117.            int sum = 0;
  118.            for (IntWritable val : values) {
  119.                sum += val.get();
  120.            
  121.             }
  122.            context.write(key, new IntWritable(sum));
  123.         }
  124.     }
  125.    
  126.     public static class LeagueMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
  127.         List<Integer> leagues = new ArrayList<>();
  128.         List<Pair<Integer, Integer>> popularity = new ArrayList<>();
  129.        
  130.         @Override
  131.         protected void setup(Mapper.Context context) throws IOException,InterruptedException {
  132.  
  133.             Configuration conf = context.getConfiguration();
  134.  
  135.             String leaguePath = conf.get("league");
  136.  
  137.             List<String> leaguesString = Arrays.asList(readHDFSFile(leaguePath, conf).split("\n"));
  138.             for (String l : leaguesString) {
  139.                 leagues.add(new Integer(l));
  140.             }
  141.         }
  142.        
  143.        
  144.         @Override
  145.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  146.                         String s = value.toString();
  147.             String[] array = s.split("[^0-9]");
  148.            
  149.             popularity.add(new Pair<>(new Integer(array[0]), new Integer(array[1])));
  150.            
  151.         }  
  152.        
  153.         @Override
  154.         protected void cleanup(Context context) throws IOException, InterruptedException {
  155.             for (int i = 0; i < popularity.size(); i++) {
  156.                 Pair<Integer, Integer> pair1 = popularity.get(i);
  157.                 for (int j = i + 1; j < popularity.size(); j++) {
  158.                     Pair<Integer, Integer> pair2 = popularity.get(j);
  159.                     if (pair1.second > pair2.second) {
  160.                             context.write(new IntWritable(pair1.first), new IntWritable(1));
  161.                             context.write(new IntWritable(pair2.first), new IntWritable(0));
  162.                         } else if (pair1.second < pair2.second) {
  163.                             context.write(new IntWritable(pair2.first), new IntWritable(1));
  164.                             context.write(new IntWritable(pair1.first), new IntWritable(0));
  165.                         } else {
  166.                             context.write(new IntWritable(pair1.first), new IntWritable(0));
  167.                             context.write(new IntWritable(pair2.first), new IntWritable(0));
  168.                         }
  169.                 }
  170.             }
  171.         }
  172.     }
  173.    
  174.     public static class LeagueReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  175.         @Override
  176.            public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  177.            int sum = 0;
  178.            for (IntWritable val : values) {
  179.                sum += val.get();
  180.            }
  181.            context.write(key, new IntWritable(sum));
  182.         }
  183.     }
  184.    
  185.     public static String readHDFSFile(String path, Configuration conf) throws IOException{
  186.         Path pt=new Path(path);
  187.         FileSystem fs = FileSystem.get(pt.toUri(), conf);
  188.         FSDataInputStream file = fs.open(pt);
  189.         BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));
  190.  
  191.         StringBuilder everything = new StringBuilder();
  192.         String line;
  193.         while( (line = buffIn.readLine()) != null) {
  194.             everything.append(line);
  195.             everything.append("\n");
  196.         }
  197.         return everything.toString();
  198.     }
  199.    
  200.      public static class IntArrayWritable extends ArrayWritable {
  201.         public IntArrayWritable() {
  202.             super(IntWritable.class);
  203.         }
  204.  
  205.         public IntArrayWritable(Integer[] numbers) {
  206.             super(IntWritable.class);
  207.             IntWritable[] ints = new IntWritable[numbers.length];
  208.             for (int i = 0; i < numbers.length; i++) {
  209.                 ints[i] = new IntWritable(numbers[i]);
  210.             }
  211.             set(ints);
  212.         }
  213.     }    
  214. }
  215.  
  216.  
  217. class Pair<A extends Comparable<? super A>,
  218.         B extends Comparable<? super B>>
  219.         implements Comparable<Pair<A, B>> {
  220.  
  221.     public final A first;
  222.     public final B second;
  223.  
  224.     public Pair(A first, B second) {
  225.         this.first = first;
  226.         this.second = second;
  227.     }
  228.  
  229.     public static <A extends Comparable<? super A>,
  230.             B extends Comparable<? super B>>
  231.     Pair<A, B> of(A first, B second) {
  232.         return new Pair<A, B>(first, second);
  233.     }
  234.  
  235.     @Override
  236.     public int compareTo(Pair<A, B> o) {
  237.         int cmp = o == null ? 1 : (this.first).compareTo(o.first);
  238.         return cmp == 0 ? (this.second).compareTo(o.second) : cmp;
  239.     }
  240.  
  241.     @Override
  242.     public int hashCode() {
  243.         return 31 * hashcode(first) + hashcode(second);
  244.     }
  245.  
  246.     private static int hashcode(Object o) {
  247.         return o == null ? 0 : o.hashCode();
  248.     }
  249.  
  250.     @Override
  251.     public boolean equals(Object obj) {
  252.         if (!(obj instanceof Pair))
  253.             return false;
  254.         if (this == obj)
  255.             return true;
  256.         return equal(first, ((Pair<?, ?>) obj).first)
  257.                 && equal(second, ((Pair<?, ?>) obj).second);
  258.     }
  259.  
  260.     private boolean equal(Object o1, Object o2) {
  261.         return o1 == o2 || (o1 != null && o1.equals(o2));
  262.     }
  263.  
  264.     @Override
  265.     public String toString() {
  266.         return "(" + first + ", " + second + ')';
  267.     }
  268. }