- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.ArrayWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import java.util.TreeSet;
- import java.text.SimpleDateFormat;
- import java.text.ParseException;
- import java.util.Date;
- import java.time.temporal.ChronoUnit;
- import java.time.Duration;
- import java.util.concurrent.TimeUnit;
- import java.text.DecimalFormat;
- import java.util.Calendar;
- public class Taxi extends Configured implements Tool {
- public static final Log LOG = LogFactory.getLog(Taxi.class);
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new Taxi(), args);
- System.exit(res);
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = this.getConf();
- FileSystem fs = FileSystem.get(conf);
- Path tmpPath = new Path(args[1]+"Tmp1");
- fs.delete(tmpPath, true);
- Job jobA = Job.getInstance(conf, "Job A");
- jobA.setOutputKeyClass(Text.class);
- jobA.setOutputValueClass(Text.class);
- jobA.setMapperClass(MapFilter.class);
- jobA.setReducerClass(MyReduce.class);
- jobA.setNumReduceTasks(1);
- FileInputFormat.setInputPaths(jobA, new Path(args[0]));
- FileOutputFormat.setOutputPath(jobA, tmpPath);
- jobA.setJarByClass(Taxi.class);
- jobA.waitForCompletion(true);
- Job jobB = Job.getInstance(conf, "Job B");
- jobB.setOutputKeyClass(IntWritable.class);
- jobB.setOutputValueClass(Text.class);
- jobB.setMapOutputKeyClass(IntWritable.class);
- jobB.setMapOutputValueClass(Text.class);
- jobB.setMapperClass(MapB.class);
- jobB.setReducerClass(ReduceB.class);
- jobB.setNumReduceTasks(1);
- FileInputFormat.setInputPaths(jobB, tmpPath);
- FileOutputFormat.setOutputPath(jobB, new Path(args[1]));
- jobB.setInputFormatClass(KeyValueTextInputFormat.class);
- jobB.setOutputFormatClass(TextOutputFormat.class);
- jobB.setJarByClass(Taxi.class);
- return jobB.waitForCompletion(true) ? 0 : 1;
- }
- public static class MapFilter extends Mapper<Object, Text, Text, Text> {
- private Text outKey = new Text();
- private Text outValue = new Text();
- public enum Days {
- Nie, Pon, Wt, Śr, Cz, Pt, Sob
- }
- @Override
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String[] lineArray = line.split(",");
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- try {
- Date d = df.parse(lineArray[2]);
- Calendar c = Calendar.getInstance();
- c.setTime(d);
- int dayOfWeek = c.get(Calendar.DAY_OF_WEEK) -1;
- int hourOfDay = c.get(Calendar.HOUR_OF_DAY);
- outKey.set(new Text(Days.values()[dayOfWeek]+" "+hourOfDay));
- String paymentType =lineArray[9];
- String fareAmount = lineArray[10];
- String tip = lineArray[13];
- outValue.set(new Text(tip));
- context.write(outKey, outValue);
- }catch(ParseException e){
- e.printStackTrace();
- }
- }
- }
- public static class MyReduce extends Reducer<Text, Text, Text, Text> {
- @Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- long counter=0;
- float sum=0.0f;
- float average=0.0f;
- for (Text val: values) {
- float value=Float.parseFloat(val.toString());
- sum+=value;
- counter+=1;
- }
- if(counter!=0){
- average=sum/counter;
- }
- String valueOut=Float.toString(average);
- context.write(key, new Text(valueOut));
- }
- }
- public static class MapB extends Mapper<Object, Text, IntWritable, Text> {
- private IntWritable outKey = new IntWritable();
- private Text outValue = new Text();
- public enum Days {
- Nie, Pon, Wt, Śr, Cz, Pt, Sob
- }
- @Override
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String oldkey = key.toString();
- String[] lineArray = oldkey.split(" ");
- String day=lineArray[0];
- String hour = lineArray[1];
- outValue.set(new Text(day)+" "+value);
- IntWritable key2 = new IntWritable(Integer.parseInt(hour));
- context.write(key2, outValue);
- }
- }
- public static class ReduceB extends Reducer<IntWritable, Text, IntWritable, Text> {
- @Override
- public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- float temp=0.0f;
- String max="";
- for (Text val: values) {
- String avgS = val.toString().split(" ")[1];
- float avg=Float.parseFloat(avgS);
- if(avg>temp){
- max=val.toString();
- temp=avg;
- }
- }
- IntWritable key2 = new IntWritable(Integer.parseInt(key.toString()));
- context.write(key2, new Text(max));
- }
- }
- }