import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.NullWritable; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.StringTokenizer; // >>> Don't Change public class WHV1 extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WHV1(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), "WH1"); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(TitleCountMap.class); job.setReducerClass(TitleCountReduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJarByClass(WHV1.class); return job.waitForCompletion(true) ? 0 : 1; } public static class TitleCountMap extends Mapper { List stopWords; String delimiters; @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); if((!line[6].equals("")&(!line[11].equals("")))){ context.write(NullWritable.get(),value); } } } public static class TitleCountReduce extends Reducer { private double percentage; @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); this.percentage = conf.getDouble("percentage", 1); } @Override public void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { Random temp = new Random(); for (Text val : values) { if(percentage>temp.nextDouble()){ context.write(key, new Text(val)); } } } } }