Hadoop Mapreduce Combiner Example
In this example, we will learn about Hadoop Combiners. Combiners are highly useful functions offered by Hadoop especially when we are processing large amount of data. We will understand the combiners using a simple question.
1. Introduction
Hadoop Combiner class is an optional class in the MapReduce framework which is added in between the Map class and the Reduce class and is used to reduce the amount of data received by Reduce class by combining the data output from Map.
The main function of a Combiner is to summarize the output from Map class so that the stress of data processing from reducer can be managed and network congestion can be handled.
Due to this functionality, Combiners are also given names like “Mini-Reducer”, “Semi-Reducer” etc.
2. Workflow
Unlike mapper and reducer, combiner do not has any predefined interface. It needs to implement the reducer interface and overwrite reduce()
method. Technically speaking Combiner and Reducers share the same code..
Let’s assume we have a map class which takes an input from the Record Reader and process it to produce key-value pairs as output. These key-value pairs contain each work as key and 1 as the value where 1 represent the number of instances this key has. For example, something like <This,1> <is,1> <an,1> <example,1>.
Not Combiner takes each of this key-value pair from map output and process it to combine the common words by keys and transfors value as collection. For example, <This,1,1,1> <is,1,1,1,1> <an,1> <example,1,1> etc where “This” represents the key and “1,1,1” represents the collection of values, here it represents that the work “this” appeared 3 times and the values were 1 for all 3.
After this, the Reducer method takes these “key-value collection” pairs from the combiner and process it to output the final result. Which will transform the <This,1,1,1> to <This,3>.
3. MapReduce Wordcount example with Combiner
Word count program is the basic code which is used to understand the working of the MapReduce programming paradigm. We will use this word count program to understand the Map, Reduce and Combiner classes. The program consists of Map method, Combine method and Reduce method that counts the number of occurrences of each word in a file.
3.1 Setup
We shall use Maven to setup a new project for Hadoop word count example. Setup a maven project in Eclipse and add the following Hadoop dependency to the pom.xml
. This will make sure we have the required access to the Hadoop core library.
pom.xml
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
3.2 Mapper Class
The mapper task is responsible for tokenizing the input text based on space and create a list of words, then traverse over all the tokens and emit a key-value pair of each word with a count of one. Following is the MapClass
:
MapClass.java
package com.javacodegeeks.examples.wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapClass extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Get the text and tokenize the word using space as separator. String line = value.toString(); StringTokenizer st = new StringTokenizer(line," "); //For each token aka word, write a key value pair with //word and 1 as value to context while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word,one); } } }
Following is what exactly map
task does:
- Line 13-14, defines static variable
one
with integer value 1 andword
for storing the words. - Line 22-23, In
map
method the inputText
variable is converted toString
and Tokenized based on the space to get all the words in the input text. - Line 27-30, For each word in the text, set the
word
variable and pass a key-value pair ofword
and integer valueone
to thecontext
.
3.3 Combiner/Reducer Code
Following code snippet contains ReduceClass
which is the same code we will use for the Combiner also, so we do not need to write completely other class but will use the same reducer class and assign it as a combiner in the driver class(entry point for MapReduce). This class extends the MapReduce Reducer class and overwrites the reduce()
function. The method iterates over the values, adds them and combines/reduces to a single value/value pairs. Daa is moved from mapper class to combiner followed by the reducer class
ReduceClass.java
package com.javacodegeeks.examples.wordcount; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ReduceClass extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator valuesIt = values.iterator(); //For each key value pair, get the value and adds to the sum //to get the total occurances of a word while(valuesIt.hasNext()){ sum = sum + valuesIt.next().get(); } //Writes the word and total occurances as key-value pair to the context context.write(key, new IntWritable(sum)); } }
Following is the workflow of reduce
function:
- Lines 17-18, define a variable
sum
as interger with value 0 andIterator
over the values received by the reducer. - Lines 22-24, Iterate over all the values and add the occurances of the words in
sum
- Line 27, write the
word
and thesum
as key-value pair in thecontext
.
3.4 The Driver Class
So now when we have our map, combiner and reduce classes ready, it is time to put it all together as a single job which is done in a class called driver class. This class contains the main()
method to setup and run the job.
WordCount.java
package com.javacodegeeks.examples.wordcount; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCount extends Configured implements Tool{ public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s needs two arguments, input and output files\n", getClass().getSimpleName()); return -1; } //Create a new Jar and set the driver class(this class) as the main class of jar Job job = new Job(); job.setJarByClass(WordCount.class); job.setJobName("WordCounter"); //Set the input and the output path from the arguments FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); //Set the map and reduce classes in the job job.setMapperClass(MapClass.class); job.setCombinerClass(ReduceClass.class); job.setReducerClass(ReduceClass.class); //Run the job and wait for its completion int returnValue = job.waitForCompletion(true) ? 0:1; if(job.isSuccessful()) { System.out.println("Job was successful"); } else if(!job.isSuccessful()) { System.out.println("Job was not successful"); } return returnValue; } }
Following is the workflow of main
function:
- Line 22-26, check if the required number of arguments are provided.
- Line 29-31, create a new Job, set the name of the job and the main class.
- Line 34-35, set the input and the output paths from the arguments.
- Line 37-39, set the key value type classes and the output format class. These classes need to be the same type which we use in the map and reduce for the output.
- Line 42-44, set the Map, Combiner and Reduce classes in the job
- Line 46, execute the job and wait for its completion
4. Code Execution
There are two ways to execute the code we have written, first is to execute it within Eclipse IDE itself for the testing purpose and second is to execute in the Hadoop Cluster. We will see both ways in this section.
4.1 In Eclipse IDE
For executing the wordcount code in eclipse. First of all, create an input.txt file with dummy data. For the testing purpose, We have created a file with the following text in the project root.
Input.txt
This is the example text file for word count example also knows as hello world example of the Hadoop ecosystem. This example is written for the examples article of java code geek The quick brown fox jumps over the lazy dog. The above line is one of the most famous lines which contains all the english language alphabets.
In Eclipse, Pass the input file and output file name in the project arguments. Following is how the arguments look like. In this case, the input file is in the root of the project that is why just filename is required, but if your input file is in some other location, you should provide the complete path.
Note: Make sure the output file does not exist already. If it does, the program will throw an error.
After setting the arguments, simply run the application. Once the application is successfully completed, console will show the output.
Below is the content of the output file:
Hadoop 1 The 2 This 2 above 1 all 1 alphabets. 1 also 1 article 1 as 1 brown 1 code 1 contains 1 count 1 dog. 1 ecosystem. 1 english 1 example 4 examples 1 famous 1 file 1 for 2 fox 1 geek 1 hello 1 is 3 java 1 jumps 1 knows 1 language 1 lazy 1 line 1 lines 1 most 1 of 3 one 1 over 1 quick 1 text 1 the 6 which 1 word 1 world 1 written 1
4.2 On Hadoop Cluster
For running the Wordcount example on hadoop cluster, we assume:
- Hadoop cluster is setup and running
- Input file is at the path
/user/root/wordcount/Input.txt
in the HDFS
In case, you need any help with setting up the hadoop cluster or Hadoop File System, please refer to the following articles:
Now, first of all make sure the Input.txt
file is present at the path /user/root/wordcount
using the command:
hadoop fs -ls /user/root/wordcount
Now it is time to submit the MapReduce job. Use the following command for execution
hadoop jar Downloads/wordcount-0.0.1-SNAPSHOT.jar com.javacodegeeks.examples.wordcount.Wordcount /user/root/wordcount/Input.txt /user/root/wordcount/Output
In the above code, jar file is in the Downloads
folder and the Main class is at the path com.javacodegeeks.examples.wordcount.Wordcount
Now we can read the output of the Wordcount map reduce job in the folder /user/root/wordcount/Output/
. Use the following command to check the output in the console:
hadoop fs -cat /user/root/wordcount/Output/part-r-00000
Following screenshot display the content of the Output folder on the console.
5. Conclusion
This example explains the Map-Reduce and Combiner paradigm with respect to Apache Hadoop how to write the word count example in MapReduce step by step. Next we saw how to execute the example in the eclipse for the testing purpose and also how to execute in the Hadoop cluster using HDFS for the input files. The article also provides links to the other useful articles for setting up Hadoop on Ubuntu, Setting up Hadoop Cluster, Understanding HDFS and Basic FS commands. We hope, this article serves the best purpose of explaining the basics of the Hadoop MapReduce and provides you with the solid base for understanding Apache Hadoop and MapReduce.
6. Download the Eclipse Project
Click on the following link to download the complete eclipse project of wordcount example with Mapper, Reducer and Combiner.
You can download the full source code of this example here:WordcountWithCombiner