Apache Hadoop

Apache Hadoop RecordReader Example

In this example,we will have a look at and understand the concept of RecordReader component of Apache Hadoop. But before digging into the example code, we would like look at the theory behind the InputStream and RecordReader to better understand the concept.
 
 
 
 
 
 
 
 
 

1. Introduction

To better understand RecordReader, we have to understand the InputFormat first.

InputFormat defines how the data is read from the input file and passed into the Mapper instance from processing.

Hadoop performs the following things with the input format:

  • Validate the input for the job to make sure the data is present for processing.
  • Split the input blocks into chunks of the format InputSplit.
  • Assign each of the InputSplits to a map task for processing.
  • Create the RecordReader implementation to be used to create key-value pairs from the raw InputSplit and pass these key-value pairs to mappers one at a time.

Apache Hadoop provides several implementations of InputFormat by default. For example, TextInputFormat reads lines of text files one at a time and SequenceFileInputFormat is used to read binary file formats. We can always build out own InputFormat implementation with a separate RecordReader based on the input data being used in Apache Hadoop.

So in this article we will concentrate on the RecordReader part and see how we can implement a custom RecordReader.

2. Record Reader

RecordReader uses the data in the InputSplit and createS key-value pairs for the mapper. Now when we customize this, we can implement any kind of record reader. We can send JSON objects, XML objects or any other format to the mapper for processing.

A RepordReader usually stays in between the boundaries created by the input split to generate key-value pairs but this is not a hard restriction. A custom implementation can even read more data outside of the input split, it is not encouraged a lot but if it is needed to be done for a specific case then it is also fine.

As we can implement a custom reader with the custom length of the line but there is an option to set the limit on the size a single record can be allowed to have otherwise the whole record will not be processed and simply ignored. This parameter can be set using the following code:

configuration.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

So here in the above code we have set the maximum length to be the max value an integer can have which is 2,147,483,647. Any records with the size greater then this will be ignored.

3. Custom RecordReader

In this section, we will see how we can write our own Custom RecordReader. We have a lot of comments in the code itself to make it self-explanatory but we will still go through the important parts of the code after looking at the code below:

package com.javacodegeeks.examples.CustomRecordReder;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class CustomLineRecordReader extends RecordReader {

private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = new LongWritable();
private Text value = new Text();

private static final Log LOG = LogFactory.getLog(CustomLineRecordReader.class);

/**
 * This method takes as arguments the map task’s assigned InputSplit and
 * TaskAttemptContext, and prepares the record reader. For file-based input
 * formats, this is a good place to seek to the byte position in the file to
 * begin reading.
 */
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {

	// This InputSplit is a FileInputSplit
	FileSplit split = (FileSplit) genericSplit;

	// Retrieve configuration, and Max allowed
	// bytes for a single record
	Configuration job = context.getConfiguration();
	this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

	// Split "S" is responsible for all records
	// starting from "start" and "end" positions
	start = split.getStart();
	end = start + split.getLength();

	// Retrieve file containing Split "S"
	final Path file = split.getPath();
	FileSystem fs = file.getFileSystem(job);
	FSDataInputStream fileIn = fs.open(split.getPath());

	// If Split "S" starts at byte 0, first line will be processed
	// If Split "S" does not start at byte 0, first line has been already
	// processed by "S-1" and therefore needs to be silently ignored
	boolean skipFirstLine = false;
	if (start != 0) {
		skipFirstLine = true;
		// Set the file pointer at "start - 1" position.
		// This is to make sure we won't miss any line
		// It could happen if "start" is located on a EOL
		--start;
		fileIn.seek(start);
	}

	in = new LineReader(fileIn, job);

	// If first line needs to be skipped, read first line
	// and stores its content to a dummy Text
	if (skipFirstLine) {
		Text dummy = new Text();
		// Reset "start" to "start + line offset"
		start += in.readLine(dummy, 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
	}

	// Position is the actual start
	this.pos = start;
}

/**
 * Like the corresponding method of the InputFormat class, this reads a
 * single key/ value pair and returns true until the data is consumed.
 */
@Override
public boolean nextKeyValue() throws IOException {
	// Current offset is the key
	key.set(pos);

	int newSize = 0;

	// Make sure we get at least one record that starts in this Split
	while (pos < end) {

		// Read first line and store its content to "value"
		newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));

		// No byte read, seems that we reached end of Split
		// Break and return false (no key / value)
		if (newSize == 0) {
			break;
		}

		// Line is read, new position is set
		pos += newSize;

		// Line is lower than Maximum record line size
		// break and return true (found key / value)
		if (newSize < maxLineLength) {
			break;
		}

		// Line is too long
		// Try again with position = position + line offset,
		// i.e. ignore line and go to next one
		// TODO: Shouldn't it be LOG.error instead ??
		LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
	}

	if (newSize == 0) {
		// We've reached end of Split
		key = null;
		value = null;
		return false;
	} else {
		// Tell Hadoop a new line has been found
		// key / value will be retrieved by
		// getCurrentKey getCurrentValue methods
		return true;
	}
}

/**
 * This methods are used by the framework to give generated key/value pairs
 * to an implementation of Mapper. Be sure to reuse the objects returned by
 * these methods if at all possible!
 */
@Override
public LongWritable getCurrentKey() throws IOException,
		InterruptedException {
	return key;
}

/**
 * This methods are used by the framework to give generated key/value pairs
 * to an implementation of Mapper. Be sure to reuse the objects returned by
 * these methods if at all possible!
 */
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
	return value;
}

/**
 * Like the corresponding method of the InputFormat class, this is an
 * optional method used by the framework for metrics gathering.
 */
@Override
public float getProgress() throws IOException, InterruptedException {
	if (start == end) {
		return 0.0f;
	} else {
		return Math.min(1.0f, (pos - start) / (float) (end - start));
	}
}

/**
 * This method is used by the framework for cleanup after there are no more
 * key/value pairs to process.
 */
@Override
public void close() throws IOException {
	if (in != null) {
		in.close();
	}
}
}

Following are the code snippets which we will highlight in the above class:

  • Lines: 49-50: We fetch the start and the end of the input split we have.
  • Lines: 61-68: Contains the code where we check where the RecordReader should start
  • Lines: 88-134: This is the function overwritten to implement the functionality to check if the next key-value pair exists or not.

Besides these, all other methods and the code snippets in the class are self-explanatory.

4. Custom File Input Format

Once we have our custom line record reader finished, we then need to extend the FileInputFormat class and overwrite the method to use out CustomLineRecordReder class.

package com.javacodegeeks.examples.CustomRecordReder;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class CustomFileInputFormat extends FileInputFormat{

	@Override
	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		
		return new CustomLineRecordReader();
	}
}

Code in the CustomFileInputFormat is quite straightput. It uses the CustomLineRecordReader and returns the same object instance when needed.

5. Word Count Driver Class

Now it is time to use the CustomFileInputFormat in out Hadoop Application, we will use the same old WordCount example but instead of the default FileInputFormat we will use out CustomFileInputFormat which in fact uses CustomLineRecordReader for reading the lines of input format.

package com.javacodegeeks.examples.CustomRecordReder;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * The entry point for the WordCount example,
 * which setup the Hadoop job with Map and Reduce Class
 * 
 * @author Raman
 */
public class WordCount extends Configured implements Tool{
	
	/**
	 * Main function which calls the run method and passes the args using ToolRunner
	 * @param args Two arguments input and output file paths
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception{
		int exitCode = ToolRunner.run(new WordCount(), args);
		System.exit(exitCode);
	}
 
	/**
	 * Run method which schedules the Hadoop Job
	 * @param args Arguments passed in main function
	 */
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.printf("Usage: %s needs two arguments   files\n",
					getClass().getSimpleName());
			return -1;
		}
	
		//Initialize the Hadoop job and set the jar as well as the name of the Job
		Job job = new Job();
		job.setJarByClass(WordCount.class);
		job.setJobName("WordCounter");
		
		//Add input and output file paths to job based on the arguments passed
		CustomFileInputFormat.addInputPath(job, new Path(args[0]));
		job.setInputFormatClass(CustomFileInputFormat.class);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//Set the MapClass and ReduceClass in the job
		job.setMapperClass(MapClass.class);
		job.setReducerClass(ReduceClass.class);
	
		//Wait for the job to complete and print if the job was successful or not
		int returnValue = job.waitForCompletion(true) ? 0:1;
		
		if(job.isSuccessful()) {
			System.out.println("Job was successful");
		} else if(!job.isSuccessful()) {
			System.out.println("Job was not successful");			
		}
		
		return returnValue;
	}
}

This is the driver class for out MapReduce job. The most important snippet of code for this example are lines 49 and 50. We set the path of the input file which we set in out CustomFileInputFormat and we set the job input format class to CustomFileInputFormat.

Note: For this example we will skip the Map and the Reduce class used in the MapReduce Driver class above. Map and Reduce used in this example are the same present in the article Apache Hadoop Wordcount Example and are also available in the code available at the bottom of the article.

6. Conclusion

This brings us to the end of the article. So let us conclude what we understood in the article. We started with understanding what exactly is the RecordReader and InputSplit. How and what it is used. Followed by digging into the code to understand how to write custom RecordReader and Input Split. You can find the complete example in the download section below.

7. Download the code for writing Custom RecordReader

This code includes the complete example of how to write CustomRecordReader and CustomInputSplit as we discussed in the article along with the Map and Reduce Classes.

Download the Eclipse project

Download
You can download the full source code of this example here: CustomRecordReader

Raman Jhajj

Ramaninder has graduated from the Department of Computer Science and Mathematics of Georg-August University, Germany and currently works with a Big Data Research Center in Austria. He holds M.Sc in Applied Computer Science with specialization in Applied Systems Engineering and minor in Business Informatics. He is also a Microsoft Certified Processional with more than 5 years of experience in Java, C#, Web development and related technologies. Currently, his main interests are in Big Data Ecosystem including batch and stream processing systems, Machine Learning and Web Applications.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
manoj
manoj
5 years ago

Hi,

Can you please, If my understanding is correct
i) After getting the size on input split, Recorder nextkeyValue() method will get triggered on specific input split, After first key value pair generated, Method will return true and exit the method, Frame work will read key and value generated and pass it on to mapper task
ii) Then will the hadoop framework trigger nextkeyValue() method once again from pos variable.

Thanks.

Back to top button