import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.StringTokenizer; import java.util.TreeSet; import java.text.SimpleDateFormat; import java.text.ParseException; import java.util.Date; import java.time.temporal.ChronoUnit; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.text.DecimalFormat; import java.util.Calendar; public class Taxi extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(Taxi.class); public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Taxi(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); FileSystem fs = FileSystem.get(conf); Path tmpPath = new Path(args[1]+"Tmp1"); fs.delete(tmpPath, true); Job jobA = Job.getInstance(conf, "Job A"); jobA.setOutputKeyClass(Text.class); jobA.setOutputValueClass(Text.class); jobA.setMapperClass(MapFilter.class); jobA.setReducerClass(MyReduce.class); jobA.setNumReduceTasks(1); FileInputFormat.setInputPaths(jobA, new Path(args[0])); FileOutputFormat.setOutputPath(jobA, tmpPath); jobA.setJarByClass(Taxi.class); jobA.waitForCompletion(true); Job jobB = Job.getInstance(conf, "Job B"); jobB.setOutputKeyClass(IntWritable.class); jobB.setOutputValueClass(Text.class); jobB.setMapOutputKeyClass(IntWritable.class); jobB.setMapOutputValueClass(Text.class); jobB.setMapperClass(MapB.class); jobB.setReducerClass(ReduceB.class); jobB.setNumReduceTasks(1); FileInputFormat.setInputPaths(jobB, tmpPath); FileOutputFormat.setOutputPath(jobB, new Path(args[1])); jobB.setInputFormatClass(KeyValueTextInputFormat.class); jobB.setOutputFormatClass(TextOutputFormat.class); jobB.setJarByClass(Taxi.class); return jobB.waitForCompletion(true) ? 0 : 1; } public static class MapFilter extends Mapper { private Text outKey = new Text(); private Text outValue = new Text(); public enum Days { Nie, Pon, Wt, Śr, Cz, Pt, Sob } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lineArray = line.split(","); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { Date d = df.parse(lineArray[2]); Calendar c = Calendar.getInstance(); c.setTime(d); int dayOfWeek = c.get(Calendar.DAY_OF_WEEK) -1; int hourOfDay = c.get(Calendar.HOUR_OF_DAY); outKey.set(new Text(Days.values()[dayOfWeek]+" "+hourOfDay)); String paymentType =lineArray[9]; String fareAmount = lineArray[10]; String tip = lineArray[13]; outValue.set(new Text(tip)); context.write(outKey, outValue); }catch(ParseException e){ e.printStackTrace(); } } } public static class MyReduce extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long counter=0; float sum=0.0f; float average=0.0f; for (Text val: values) { float value=Float.parseFloat(val.toString()); sum+=value; counter+=1; } if(counter!=0){ average=sum/counter; } String valueOut=Float.toString(average); context.write(key, new Text(valueOut)); } } public static class MapB extends Mapper { private IntWritable outKey = new IntWritable(); private Text outValue = new Text(); public enum Days { Nie, Pon, Wt, Śr, Cz, Pt, Sob } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String oldkey = key.toString(); String[] lineArray = oldkey.split(" "); String day=lineArray[0]; String hour = lineArray[1]; outValue.set(new Text(day)+" "+value); IntWritable key2 = new IntWritable(Integer.parseInt(hour)); context.write(key2, outValue); } } public static class ReduceB extends Reducer { @Override public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { float temp=0.0f; String max=""; for (Text val: values) { String avgS = val.toString().split(" ")[1]; float avg=Float.parseFloat(avgS); if(avg>temp){ max=val.toString(); temp=avg; } } IntWritable key2 = new IntWritable(Integer.parseInt(key.toString())); context.write(key2, new Text(max)); } } }