Facebook
From Sweet Goose, 3 Years ago, written in Plain Text.
Embed
Download Paste or View Raw
Hits: 160
  1. import org.apache.commons.logging.Log;
  2. import org.apache.commons.logging.LogFactory;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.ArrayWritable;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.FloatWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  19. import org.apache.hadoop.util.Tool;
  20. import org.apache.hadoop.util.ToolRunner;
  21.  
  22. import java.io.IOException;
  23. import java.util.StringTokenizer;
  24. import java.util.TreeSet;
  25. import java.text.SimpleDateFormat;
  26. import java.text.ParseException;
  27. import java.util.Date;
  28. import java.time.temporal.ChronoUnit;
  29. import java.time.Duration;
  30. import java.util.concurrent.TimeUnit;
  31. import java.text.DecimalFormat;
  32. import java.util.Calendar;
  33.  
  34. public class Taxi extends Configured implements Tool {
  35.     public static final Log LOG = LogFactory.getLog(Taxi.class);
  36.  
  37.     public static void main(String[] args) throws Exception {
  38.         int res = ToolRunner.run(new Configuration(), new Taxi(), args);
  39.         System.exit(res);
  40.     }
  41.  
  42.  
  43.     @Override
  44.     public int run(String[] args) throws Exception {
  45.         Configuration conf = this.getConf();
  46.         FileSystem fs = FileSystem.get(conf);
  47.         Path tmpPath = new Path(args[1]+"Tmp1");
  48.         fs.delete(tmpPath, true);
  49.  
  50.         Job jobA = Job.getInstance(conf, "Job A");
  51.         jobA.setOutputKeyClass(Text.class);
  52.         jobA.setOutputValueClass(Text.class);
  53.  
  54.         jobA.setMapperClass(MapFilter.class);
  55.         jobA.setReducerClass(MyReduce.class);
  56.             jobA.setNumReduceTasks(1);
  57.  
  58.         FileInputFormat.setInputPaths(jobA, new Path(args[0]));
  59.         FileOutputFormat.setOutputPath(jobA, tmpPath);
  60.  
  61.         jobA.setJarByClass(Taxi.class);
  62.         jobA.waitForCompletion(true);
  63.  
  64.         Job jobB = Job.getInstance(conf, "Job B");
  65.         jobB.setOutputKeyClass(IntWritable.class);
  66.         jobB.setOutputValueClass(Text.class);
  67.  
  68.         jobB.setMapOutputKeyClass(IntWritable.class);
  69.         jobB.setMapOutputValueClass(Text.class);
  70.  
  71.         jobB.setMapperClass(MapB.class);
  72.         jobB.setReducerClass(ReduceB.class);
  73.         jobB.setNumReduceTasks(1);
  74.  
  75.         FileInputFormat.setInputPaths(jobB, tmpPath);
  76.         FileOutputFormat.setOutputPath(jobB, new Path(args[1]));
  77.  
  78.         jobB.setInputFormatClass(KeyValueTextInputFormat.class);
  79.         jobB.setOutputFormatClass(TextOutputFormat.class);
  80.  
  81.         jobB.setJarByClass(Taxi.class);
  82.         return jobB.waitForCompletion(true) ? 0 : 1;
  83.     }
  84.  
  85.  
  86.     public static class MapFilter extends Mapper<Object, Text, Text, Text> {
  87.         private Text outKey = new Text();
  88.         private Text outValue = new Text();
  89.  
  90.         public enum Days {
  91.             Nie, Pon, Wt, Śr, Cz, Pt, Sob
  92.         }
  93.  
  94.         @Override
  95.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  96.             String line = value.toString();
  97.             String[] lineArray = line.split(",");
  98.             SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  99.             try {
  100.                 Date d = df.parse(lineArray[2]);
  101.                 Calendar c = Calendar.getInstance();
  102.                 c.setTime(d);
  103.                 int dayOfWeek = c.get(Calendar.DAY_OF_WEEK) -1;
  104.                 int hourOfDay = c.get(Calendar.HOUR_OF_DAY);
  105.  
  106.                 outKey.set(new Text(Days.values()[dayOfWeek]+" "+hourOfDay));
  107.  
  108.                 String paymentType =lineArray[9];
  109.                 String fareAmount = lineArray[10];
  110.                 String tip = lineArray[13];
  111.  
  112.                 outValue.set(new Text(tip));
  113.                 context.write(outKey, outValue);
  114.             }catch(ParseException e){
  115.                 e.printStackTrace();
  116.             }
  117.         }
  118.     }
  119.  
  120.     public static class MyReduce extends Reducer<Text, Text, Text, Text> {
  121.  
  122.         @Override
  123.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  124.            
  125.             long counter=0;
  126.             float sum=0.0f;
  127.             float average=0.0f;
  128.            
  129.  
  130.             for (Text val: values) {
  131.                 float value=Float.parseFloat(val.toString());
  132.                 sum+=value;
  133.                 counter+=1;
  134.             }
  135.             if(counter!=0){
  136.                
  137.                 average=sum/counter;
  138.             }
  139.            
  140.            String valueOut=Float.toString(average);
  141.             context.write(key, new Text(valueOut));
  142.         }
  143.     }
  144.  
  145.         public static class MapB extends Mapper<Object, Text, IntWritable, Text> {
  146.         private IntWritable outKey = new IntWritable();
  147.         private Text outValue = new Text();
  148.  
  149.         public enum Days {
  150.             Nie, Pon, Wt, Śr, Cz, Pt, Sob
  151.         }
  152.  
  153.         @Override
  154.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  155.             String oldkey = key.toString();
  156.             String[] lineArray = oldkey.split(" ");
  157.             String day=lineArray[0];
  158.             String hour = lineArray[1];
  159.  
  160.             outValue.set(new Text(day)+" "+value);
  161.             IntWritable key2 = new IntWritable(Integer.parseInt(hour));
  162.             context.write(key2, outValue);
  163.  
  164.         }
  165.     }
  166.     public static class ReduceB extends Reducer<IntWritable, Text, IntWritable, Text> {
  167.  
  168.         @Override
  169.         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  170.            
  171.            
  172.             float temp=0.0f;
  173.             String max="";
  174.  
  175.             for (Text val: values) {
  176.                 String avgS = val.toString().split(" ")[1];
  177.                 float avg=Float.parseFloat(avgS);
  178.                 if(avg>temp){
  179.                     max=val.toString();
  180.                     temp=avg;
  181.                 }
  182.             }
  183.             IntWritable key2 = new IntWritable(Integer.parseInt(key.toString()));
  184.            
  185.             context.write(key2, new Text(max));
  186.         }
  187.     }
  188. }