I'm looking for hadoop examples, something more complex than the wordcount example.
What I want to do It's read the files in a directory in hadoop and get a zip, so I have thought to collect al the files in the map class and create the zip file in the reduce class.
Can anyone give me a link to a tutorial or example than can help me to built it?
I don't want anyone to do this for me, i'm asking for a link with better examples than the wordaccount.
This is what I have, maybe it's useful for someone
public class Testing {
private static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, BytesWritable> {
// reuse objects to save overhead of object creation
Logger log = Logger.getLogger("log_file");
public void map(LongWritable key, Text value,
OutputCollector<Text, BytesWritable> output, Reporter reporter)
throws IOException {
String line = ((Text) value).toString();
log.info("Doing something ... " + line);
BytesWritable b = new BytesWritable();
b.set(value.toString().getBytes() , 0, value.toString().getBytes() .length);
output.collect(value, b);
}
}
private static class ReduceClass extends MapReduceBase implements
Reducer<Text, BytesWritable, Text, BytesWritable> {
Logger log = Logger.getLogger("log_file");
ByteArrayOutputStream bout;
ZipOutputStream out;
@Override
public void configure(JobConf job) {
super.configure(job);
log.setLevel(Level.INFO);
bout = new ByteArrayOutputStream();
out = new ZipOutputStream(bout);
}
public void reduce(Text key, Iterator<BytesWritable> values,
OutputCollector<Text, BytesWritable> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
byte[] data = values.next().getBytes();
ZipEntry entry = new ZipEntry("entry");
out.putNextEntry(entry);
out.write(data);
out.closeEntry();
}
BytesWritable b = new BytesWritable();
b.set(bout.toByteArray(), 0, bout.size());
output.collect(key, b);
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
super.close();
out.close();
}
}
/**
* Runs the demo.
*/
public static void main(String[] args) throws IOException {
int mapTasks = 20;
int reduceTasks = 1;
JobConf conf = new JobConf(Prue.class);
conf.setJobName("testing");
conf.setNumMapTasks(mapTasks);
conf.setNumReduceTasks(reduceTasks);
MultipleInputs.addInputPath(conf, new Path("/messages"), TextInputFormat.class, MapClass.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(BytesWritable.class);
FileOutputFormat.setOutputPath(conf, new Path("/czip"));
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(ReduceClass.class);
conf.setReducerClass(ReduceClass.class);
// Delete the output directory if it exists already
JobClient.runJob(conf);
}
}