import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class WordCount{ public static class Map extends MapReduceBase implements Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException{ String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while(tokenizer.hasMoreTokens()){ word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer{ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException{ int sum = 0; while (values.hasNext()){ sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[]args) throws IOException{ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); try{ JobClient.runJob(conf); }catch(IOException e){ System.err.println(e.getMessage()); } } }