Reducer getting fewer records than expected
Posted
by
sathishs
on Stack Overflow
See other posts from Stack Overflow
or by sathishs
Published on 2013-06-28T15:33:08Z
Indexed on
2013/06/28
16:21 UTC
Read the original article
Hit count: 281
We have a scenario of generating unique key for every single row in a file. we have a timestamp column but the are multiple rows available for a same timestamp in few scenarios.
We decided unique values to be timestamp appended with their respective count as mentioned in the below program.
Mapper will just emit the timestamp as key and the entire row as its value, and in reducer the key is generated.
Problem is Map outputs about 236 rows, of which only 230 records are fed as an input for reducer which outputs the same 230 records.
public class UniqueKeyGenerator extends Configured implements Tool {
private static final String SEPERATOR = "\t";
private static final int TIME_INDEX = 10;
private static final String COUNT_FORMAT_DIGITS = "%010d";
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text row, Context context)
throws IOException, InterruptedException {
String input = row.toString();
String[] vals = input.split(SEPERATOR);
if (vals != null && vals.length >= TIME_INDEX) {
context.write(new Text(vals[TIME_INDEX - 1]), row);
}
}
}
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text eventTimeKey,
Iterable<Text> timeGroupedRows, Context context)
throws IOException, InterruptedException {
int cnt = 1;
final String eventTime = eventTimeKey.toString();
for (Text val : timeGroupedRows) {
final String res = SEPERATOR.concat(getDate(
Long.valueOf(eventTime)).concat(
String.format(COUNT_FORMAT_DIGITS, cnt)));
val.append(res.getBytes(), 0, res.length());
cnt++;
context.write(NullWritable.get(), val);
}
}
}
public static String getDate(long time) {
SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
return utcSdf.format(new Date(time));
}
public int run(String[] args) throws Exception {
conf(args);
return 0;
}
public static void main(String[] args) throws Exception {
conf(args);
}
private static void conf(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "uniquekeygen");
job.setJarByClass(UniqueKeyGenerator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// job.setNumReduceTasks(400);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
It is consistent for higher no of lines and the difference is as huge as 208969 records for an input of 20855982 lines. what might be the reason for reduced inputs to reducer?
© Stack Overflow or respective owner