Facebook
From Beefy Dormouse, 4 Years ago, written in Plain Text.
Embed
Download Paste or View Raw
Hits: 127
  1. import org.apache.commons.logging.Log;
  2. import org.apache.commons.logging.LogFactory;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. import org.apache.hadoop.io.NullWritable;
  18.  
  19. import java.io.BufferedReader;
  20. import java.io.IOException;
  21. import java.io.InputStreamReader;
  22. import java.util.Arrays;
  23. import java.util.List;
  24. import java.util.Random;
  25. import java.util.StringTokenizer;
  26.  
  27. // >>> Don't Change
  28. public class WHV1 extends Configured implements Tool {
  29.     public static void main(String[] args) throws Exception {
  30.         int res = ToolRunner.run(new Configuration(), new WHV1(), args);
  31.         System.exit(res);
  32.     }
  33.  
  34.     @Override
  35.     public int run(String[] args) throws Exception {
  36.         Job job = Job.getInstance(this.getConf(), "WH1");
  37.         job.setOutputKeyClass(NullWritable.class);
  38.         job.setOutputValueClass(Text.class);
  39.  
  40.         job.setMapOutputKeyClass(NullWritable.class);
  41.         job.setMapOutputValueClass(Text.class);
  42.  
  43.         job.setMapperClass(TitleCountMap.class);
  44.         job.setReducerClass(TitleCountReduce.class);
  45.  
  46.         FileInputFormat.setInputPaths(job, new Path(args[0]));
  47.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  48.  
  49.         job.setJarByClass(WHV1.class);
  50.         return job.waitForCompletion(true) ? 0 : 1;
  51.     }
  52.  
  53.  
  54.     public static class TitleCountMap extends Mapper<Object, Text, NullWritable, Text> {
  55.         List<String> stopWords;
  56.         String delimiters;
  57.  
  58.        
  59.  
  60.         @Override
  61.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  62.             String[] line = value.toString().split(",");
  63.             if((!line[6].equals("")&(!line[11].equals("")))){
  64.                 context.write(NullWritable.get(),value);
  65.             }
  66.         }
  67.     }
  68.  
  69.     public static class TitleCountReduce extends Reducer<NullWritable, Text, NullWritable, Text> {
  70.         private double percentage;
  71.         @Override
  72.         protected void setup(Context context) throws IOException,InterruptedException {
  73.             Configuration conf = context.getConfiguration();
  74.             this.percentage = conf.getDouble("percentage", 1);
  75.         }
  76.         @Override
  77.         public void reduce(NullWritable key, Iterable<Text> values, Context context)
  78.                 throws IOException, InterruptedException {
  79.             Random temp = new Random();
  80.             for (Text val : values) {
  81.                 if(percentage>temp.nextDouble()){
  82.                     context.write(key, new Text(val));
  83.                 }
  84.             }
  85.         }
  86.     }
  87. }