- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.hadoop.io.NullWritable;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.Arrays;
- import java.util.List;
- import java.util.Random;
- import java.util.StringTokenizer;
- // >>> Don't Change
- public class WHV1 extends Configured implements Tool {
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new WHV1(), args);
- System.exit(res);
- }
- @Override
- public int run(String[] args) throws Exception {
- Job job = Job.getInstance(this.getConf(), "WH1");
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setMapOutputKeyClass(NullWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setMapperClass(TitleCountMap.class);
- job.setReducerClass(TitleCountReduce.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setJarByClass(WHV1.class);
- return job.waitForCompletion(true) ? 0 : 1;
- }
- public static class TitleCountMap extends Mapper<Object, Text, NullWritable, Text> {
- List<String> stopWords;
- String delimiters;
- @Override
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String[] line = value.toString().split(",");
- if((!line[6].equals("")&(!line[11].equals("")))){
- context.write(NullWritable.get(),value);
- }
- }
- }
- public static class TitleCountReduce extends Reducer<NullWritable, Text, NullWritable, Text> {
- private double percentage;
- @Override
- protected void setup(Context context) throws IOException,InterruptedException {
- Configuration conf = context.getConfiguration();
- this.percentage = conf.getDouble("percentage", 1);
- }
- @Override
- public void reduce(NullWritable key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- Random temp = new Random();
- for (Text val : values) {
- if(percentage>temp.nextDouble()){
- context.write(key, new Text(val));
- }
- }
- }
- }
- }