Apache Hadoop RecordReader Example
In this example,we will have a look at and understand the concept of RecordReader component of Apache Hadoop. But before digging into the example code, we would like look at the theory behind the InputStream and RecordReader to better understand the concept.
1. Introduction
To better understand RecordReader, we have to understand the InputFormat first.
InputFormat defines how the data is read from the input file and passed into the Mapper instance from processing.
Hadoop performs the following things with the input format:
- Validate the input for the job to make sure the data is present for processing.
- Split the input blocks into chunks of the format InputSplit.
- Assign each of the InputSplits to a map task for processing.
- Create the RecordReader implementation to be used to create key-value pairs from the raw InputSplit and pass these key-value pairs to mappers one at a time.
Apache Hadoop provides several implementations of InputFormat
by default. For example, TextInputFormat
reads lines of text files one at a time and SequenceFileInputFormat
is used to read binary file formats. We can always build out own InputFormat implementation with a separate RecordReader based on the input data being used in Apache Hadoop.
So in this article we will concentrate on the RecordReader
part and see how we can implement a custom RecordReader
.
2. Record Reader
RecordReader uses the data in the InputSplit and createS key-value pairs for the mapper. Now when we customize this, we can implement any kind of record reader. We can send JSON objects, XML objects or any other format to the mapper for processing.
A RepordReader usually stays in between the boundaries created by the input split to generate key-value pairs but this is not a hard restriction. A custom implementation can even read more data outside of the input split, it is not encouraged a lot but if it is needed to be done for a specific case then it is also fine.
As we can implement a custom reader with the custom length of the line but there is an option to set the limit on the size a single record can be allowed to have otherwise the whole record will not be processed and simply ignored. This parameter can be set using the following code:
configuration.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
So here in the above code we have set the maximum length to be the max value an integer can have which is 2,147,483,647. Any records with the size greater then this will be ignored.
3. Custom RecordReader
In this section, we will see how we can write our own Custom RecordReader. We have a lot of comments in the code itself to make it self-explanatory but we will still go through the important parts of the code after looking at the code below:
package com.javacodegeeks.examples.CustomRecordReder; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class CustomLineRecordReader extends RecordReader { private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = new LongWritable(); private Text value = new Text(); private static final Log LOG = LogFactory.getLog(CustomLineRecordReader.class); /** * This method takes as arguments the map task’s assigned InputSplit and * TaskAttemptContext, and prepares the record reader. For file-based input * formats, this is a good place to seek to the byte position in the file to * begin reading. */ @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { // This InputSplit is a FileInputSplit FileSplit split = (FileSplit) genericSplit; // Retrieve configuration, and Max allowed // bytes for a single record Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); // Split "S" is responsible for all records // starting from "start" and "end" positions start = split.getStart(); end = start + split.getLength(); // Retrieve file containing Split "S" final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); // If Split "S" starts at byte 0, first line will be processed // If Split "S" does not start at byte 0, first line has been already // processed by "S-1" and therefore needs to be silently ignored boolean skipFirstLine = false; if (start != 0) { skipFirstLine = true; // Set the file pointer at "start - 1" position. // This is to make sure we won't miss any line // It could happen if "start" is located on a EOL --start; fileIn.seek(start); } in = new LineReader(fileIn, job); // If first line needs to be skipped, read first line // and stores its content to a dummy Text if (skipFirstLine) { Text dummy = new Text(); // Reset "start" to "start + line offset" start += in.readLine(dummy, 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } // Position is the actual start this.pos = start; } /** * Like the corresponding method of the InputFormat class, this reads a * single key/ value pair and returns true until the data is consumed. */ @Override public boolean nextKeyValue() throws IOException { // Current offset is the key key.set(pos); int newSize = 0; // Make sure we get at least one record that starts in this Split while (pos < end) { // Read first line and store its content to "value" newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // No byte read, seems that we reached end of Split // Break and return false (no key / value) if (newSize == 0) { break; } // Line is read, new position is set pos += newSize; // Line is lower than Maximum record line size // break and return true (found key / value) if (newSize < maxLineLength) { break; } // Line is too long // Try again with position = position + line offset, // i.e. ignore line and go to next one // TODO: Shouldn't it be LOG.error instead ?? LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { // We've reached end of Split key = null; value = null; return false; } else { // Tell Hadoop a new line has been found // key / value will be retrieved by // getCurrentKey getCurrentValue methods return true; } } /** * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } /** * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** * Like the corresponding method of the InputFormat class, this is an * optional method used by the framework for metrics gathering. */ @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } /** * This method is used by the framework for cleanup after there are no more * key/value pairs to process. */ @Override public void close() throws IOException { if (in != null) { in.close(); } } }
Following are the code snippets which we will highlight in the above class:
- Lines: 49-50: We fetch the start and the end of the input split we have.
- Lines: 61-68: Contains the code where we check where the RecordReader should start
- Lines: 88-134: This is the function overwritten to implement the functionality to check if the next key-value pair exists or not.
Besides these, all other methods and the code snippets in the class are self-explanatory.
4. Custom File Input Format
Once we have our custom line record reader finished, we then need to extend the FileInputFormat class and overwrite the method to use out CustomLineRecordReder class.
package com.javacodegeeks.examples.CustomRecordReder; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class CustomFileInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new CustomLineRecordReader(); } }
Code in the CustomFileInputFormat
is quite straightput. It uses the CustomLineRecordReader
and returns the same object instance when needed.
5. Word Count Driver Class
Now it is time to use the CustomFileInputFormat
in out Hadoop Application, we will use the same old WordCount example but instead of the default FileInputFormat
we will use out CustomFileInputFormat
which in fact uses CustomLineRecordReader
for reading the lines of input format.
package com.javacodegeeks.examples.CustomRecordReder; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * The entry point for the WordCount example, * which setup the Hadoop job with Map and Reduce Class * * @author Raman */ public class WordCount extends Configured implements Tool{ /** * Main function which calls the run method and passes the args using ToolRunner * @param args Two arguments input and output file paths * @throws Exception */ public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } /** * Run method which schedules the Hadoop Job * @param args Arguments passed in main function */ public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s needs two arguments files\n", getClass().getSimpleName()); return -1; } //Initialize the Hadoop job and set the jar as well as the name of the Job Job job = new Job(); job.setJarByClass(WordCount.class); job.setJobName("WordCounter"); //Add input and output file paths to job based on the arguments passed CustomFileInputFormat.addInputPath(job, new Path(args[0])); job.setInputFormatClass(CustomFileInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); //Set the MapClass and ReduceClass in the job job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); //Wait for the job to complete and print if the job was successful or not int returnValue = job.waitForCompletion(true) ? 0:1; if(job.isSuccessful()) { System.out.println("Job was successful"); } else if(!job.isSuccessful()) { System.out.println("Job was not successful"); } return returnValue; } }
This is the driver class for out MapReduce job. The most important snippet of code for this example are lines 49 and 50. We set the path of the input file which we set in out CustomFileInputFormat
and we set the job input format class to CustomFileInputFormat
.
Note: For this example we will skip the Map and the Reduce class used in the MapReduce Driver class above. Map and Reduce used in this example are the same present in the article Apache Hadoop Wordcount Example and are also available in the code available at the bottom of the article.
6. Conclusion
This brings us to the end of the article. So let us conclude what we understood in the article. We started with understanding what exactly is the RecordReader and InputSplit. How and what it is used. Followed by digging into the code to understand how to write custom RecordReader and Input Split. You can find the complete example in the download section below.
7. Download the code for writing Custom RecordReader
This code includes the complete example of how to write CustomRecordReader and CustomInputSplit as we discussed in the article along with the Map and Reduce Classes.
Download the Eclipse project
You can download the full source code of this example here: CustomRecordReader
Hi,
Can you please, If my understanding is correct
i) After getting the size on input split, Recorder nextkeyValue() method will get triggered on specific input split, After first key value pair generated, Method will return true and exit the method, Frame work will read key and value generated and pass it on to mapper task
ii) Then will the hadoop framework trigger nextkeyValue() method once again from pos variable.
Thanks.