We have a scenario of generating unique key for every single row in a file. we have a timestamp column but the are multiple rows available for a same timestamp in few scenarios.
We decided unique values to be timestamp appended with their respective count as mentioned in the below program.
Mapper will just emit the timestamp as key and the entire row as its value, and in reducer the key is generated.
Problem is Map outputs about 236 rows, of which only 230 records are fed as an input for reducer which outputs the same 230 records.
public class UniqueKeyGenerator extends Configured implements Tool {
private static final String SEPERATOR = "\t";
private static final int TIME_INDEX = 10;
private static final String COUNT_FORMAT_DIGITS = "%010d";
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text row, Context context)
throws IOException, InterruptedException {
String input = row.toString();
String[] vals = input.split(SEPERATOR);
if (vals != null && vals.length >= TIME_INDEX) {
context.write(new Text(vals[TIME_INDEX - 1]), row);
}
}
}
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text eventTimeKey,
Iterable<Text> timeGroupedRows, Context context)
throws IOException, InterruptedException {
int cnt = 1;
final String eventTime = eventTimeKey.toString();
for (Text val : timeGroupedRows) {
final String res = SEPERATOR.concat(getDate(
Long.valueOf(eventTime)).concat(
String.format(COUNT_FORMAT_DIGITS, cnt)));
val.append(res.getBytes(), 0, res.length());
cnt++;
context.write(NullWritable.get(), val);
}
}
}
public static String getDate(long time) {
SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
return utcSdf.format(new Date(time));
}
public int run(String[] args) throws Exception {
conf(args);
return 0;
}
public static void main(String[] args) throws Exception {
conf(args);
}
private static void conf(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "uniquekeygen");
job.setJarByClass(UniqueKeyGenerator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// job.setNumReduceTasks(400);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
It is consistent for higher no of lines and the difference is as huge as 208969 records for an input of 20855982 lines. what might be the reason for reduced inputs to reducer?