Custom Input Format in MapReduce

Custom Input Format:

Before implementing Custom Input Format, please find the answer for what is Input Format.

InputFormat describes the input-specification for a Map-Reduce job. (wiki)

The Map-Reduce framework relies on the InputFormat of the job to:

  1. Validate the input-specification of the job.
  2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
  3. Provide the RecordReader implementation to be used to glean input records from the logical InputSplit for processing by the Mapper.

The default behavior of file-based InputFormats, typically sub-classes of FileInputFormat, is to split the input into logical InputSplits based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.

Clearly, logical splits based on input-size is insufficient for many applications since record boundaries are to be respected. In such cases, the application has to also implement a RecordReader on whom lies the responsibilty to respect record-boundaries and present a record-oriented view of the logical InputSplit to the individual task.

Hadoop supports processing of many different formats and types of data through InputFormat. The InputFormat of a Hadoop MapReduce computation generates the key-value pair inputs for the mappers by parsing the input data. InputFormat also performs the splitting of the input data into logical partitions, essentially determining the number of Map tasks of a MapReduce computation and indirectly deciding the execution location of the Map tasks. Hadoop generates a map task for each logical data partition and

invokes the respective mappers with the key-value pairs of the logical splits as the input.

Why we need Custom Input Format?

We can implement custom InputFormat implementations to gain more control over the input data as well as to support proprietary or application specific input data file format as inputs to Hadoop Mapreduce computations.

 

This is advance concept in mapreduce. In this blog I am not going to use complex problem to explain this concept. We usually implement custom input format for complex input data. But if I use the complex data and problem, it is very difficult to understand for beginners. So I go with something like “Hello World” program in Input Format. So experts this blog is not for you.

I am assuming that, people who are reading this blog, knows how to write sample mapreduce program and what is mapper and reducer.

 

Here is the problem statement. I have a university student’s data like below with comma separated.

RegisterNo,Name,Year

96906104029,Muthu Kumar,03

Register No:

First 3 char in reg no is college code : 969

4-5 char is year of joining : 06

6-8 char is dept code : 104

Last three char is unique no

So I want the output is like Key :RegisterNo

Value: Name,Year,2006,104

Simple. Pretty easy and can be implemented with existing input format. But I am going to customize the input format.

FileInputFormat is the base class for all file-based InputFormats.

  1. A Input format implementation should extend the apache.hadoop.mapreduce.InputFormat<K,V> abstract class overriding the createRecordReader() and getSplits() methods.
package UNIV;

import java.io.IOException;
 
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.mapreduce.InputSplit;
 
import org.apache.hadoop.mapreduce.RecordReader;
 
import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import UNIV.MyRecordReader;
 
public class UnivFormat extends FileInputFormat<RegNo,Text> {
 
@Override
 
public RecordReader<RegNo,Text> createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws IOException, InterruptedException {
return new MyRecordReader();
 
}
 
}

public class UnivFormat extends FileInputFormat<RegNo,Text> {

@Override

public RecordReader<RegNo,Text> createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws IOException, InterruptedException {

return new MyRecordReader();

}

}

UnivFormat extends the FileInputFormat, which provides a generic splitting

Mechanism for HDFS-file based InputFormat. We override the createRecordReader()

method in the UnivFormat to provide an instance of our custom RecordReader

implementation, MyRecordReader. Optionally, we can also override the

isSplitable() method of the FileInputFormat to control whether the input files are split up into logical partitions or used as whole files.

 

  1. Implementing the MyRecordReader class.
package UNIV;
import java.lang.Integer;
import java.io.IOException;
 
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.mapreduce.InputSplit;
 
import org.apache.hadoop.mapreduce.RecordReader;
 
import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import UNIV.RegNo;
 
public class MyRecordReader extends RecordReader<RegNo,Text> {
 
private RegNo key;
 
private Text value;
 
private LineRecordReader reader = new LineRecordReader();
private String SEP=",";
private int YEAR=2000;
 
@Override
 
public void close() throws IOException {
 
// TODO Auto-generated method stub
 
reader.close();
 
}
@Override
 
public RegNo getCurrentKey() throws IOException, InterruptedException {
 
// TODO Auto-generated method stub
 
return key;
 
}
 
@Override
 
public Text getCurrentValue() throws IOException, InterruptedException {
 
// TODO Auto-generated method stub
 
return value;
 
}
 
@Override
 
public float getProgress() throws IOException, InterruptedException {
 
// TODO Auto-generated method stub
 
return reader.getProgress();
 
}
 
@Override
 
public void initialize(InputSplit is, TaskAttemptContext tac)
 
throws IOException, InterruptedException {
reader.initialize(is, tac);
 
}
 
@Override
 
public boolean nextKeyValue() throws IOException, InterruptedException {
 
// TODO Auto-generated method stub
 
boolean gotNextKeyValue = reader.nextKeyValue();
 
if(gotNextKeyValue){
 
if(key==null){
 
key = new RegNo();
 
}
 
if(value == null){
 
value = new Text();
 
}
 
Text line = reader.getCurrentValue();
 
String[] tokens = line.toString().split(",");
 
key.setRegNo(new String(tokens[0]));

int _year= YEAR+Integer.valueOf(tokens[0].substring(4,5));
int _college=Integer.valueOf(tokens[0].substring(0,3));
value.set(new String(tokens[1]+SEP+tokens[2]+SEP+_year+SEP+_college));
 
}
 
else {
 
key = null;
 
value = null;
 
}
 
return gotNextKeyValue;
 
}
 
}

 

 

public RecordReader< RegNo,Text>

createRecordReader(InputSplit arg0,

TaskAttemptContext arg1) throws …… {

return new myRecordReader();

}

The MyRecordReader class extends the org.apache.hadoop.mapreduce. RecordReader<K,V> abstract class and uses LineRecordReader internally to perform the basic parsing of the input data. LineRecordReader reads lines of text from the input data.

lineReader = new LineRecordReader();

lineReader.initialize(inputSplit, attempt);

In NexKeyValue() method, we are implementing the logic to extract the Key and Value from each record.

 

  1. Specify UnivFormat as InputFormat for the MapReduce computation using the Job object as follows. Specify the input paths for the computations using the underlying FileInputFormat.

 

Configuration conf = new Configuration();

Job job = new Job(conf, “University Parser”);

……

job.setInputFormatClass(UnivFormat.class);

Here we have implemented the Custom Key as well. RegNo is the custom key which we have implemented.

  1. The instances of Hadoop MapReduce key types should have the ability to compare against each other for sorting purposes. In order to be used as a key type in a MapReduce a computation, a Hadoop Writable data type should implement the org.apache.hadoop. io.WritableComparable<T> interface. The WritableComparable interface extends the org.apache.hadoop.io.Writable interface and adds the compareTo() method to perform the comparisons.
package UNIV;
import java.io.DataInput;
 
import java.io.DataOutput;
 
import java.io.IOException;
 
import org.apache.hadoop.io.*;
 
public class RegNo implements WritableComparable<RegNo> {
 
private String regno;
 
//private String y;
 
public String getRegNo() {
 
return regno;
 
}
 
public void setRegNo(String regno) {
 
this.regno = regno;
 
}
 
public RegNo(String regno) {
 
this.regno = regno;
 
}
 
public void write(DataOutput out) throws IOException {
 
out.writeUTF(regno);



}
 
public void readFields(DataInput in) throws IOException {
 
regno = in.readUTF();



}
 
public RegNo(){
 
}
 
@Override
 
public int compareTo(RegNo o) {
 
// TODO Auto-generated method stub
 
return 0;


}
public boolean equals(Object o1) {
 
if (!(o1 instanceof RegNo)) {
 
return false;
 
}
 
//Key_value other = (Key_value)o1;
 
//return this.x == other.x && this.y == other.y;

return true;
 
}
public int hashCode()

{

return regno.hashCode();

}
@Override
 
public String toString() {
 
return regno.toString();
 
}
}

In MapReduce program we can use this custom key.

public static class UniversityMapper   extends Mapper<RegNo, Text, RegNo, Text>{

private Text word = new Text();

public void map(RegNo key, Text value, Context context) throws IOException, InterruptedException {

}}

 

  1. In configuration also we should mention about this class.

job.setOutputKeyClass(RegNo.class);

job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(RegNo.class);

job.setMapOutputValueClass(Tex.class);

 

3. The WritableComparable interface introduces the comapreTo() method in addition to

the readFields() and write() methods of the Writable interface. The compareTo()

method should return a negative integer, zero, or a positive integer, if this object is less than,

equal to, or greater than the object being compared to respectively.

In my code, I didn’t implement the compareTo() method.

public int compareTo(RegNo o) {

if (regno.compareTo(o. regno)==0){

return 0

}

Else (regno.compareTo(o. regno)>0){

return 1

}else {

Return -1 }

 

}

Hadoop uses HashPartitioner as the default Partitioner implementation to calculate

the distribution of the intermediate data to the reducers. HashPartitioner requires the

hashCode() method of the key objects to satisfy the following two properties:

 

  1. Provide the same hash value across different JVM instances
  2. Provide a uniform distribution of hash values

Hence, you must implement a stable hashCode() method for your custom Hadoop key types satisfying the above mentioned two requirements.

public int hashCode()

{

return regno.hashCode();

}

Write your own reducer with your own logic. Please let me know your thoughts.

 

Advertisements

2 thoughts on “Custom Input Format in MapReduce

  1. I am working in IBM Mainframe TECHNOLOGY.
    But , Now I am planning to learn Big data course.before, Java need to know for Big data technology can you please explain how it works?

    1. In Hadoop there are two important things HDFS and MapReduce. HDFS is file system for hadoop which handles the storage systems.
      Mapreduce is a technique for parallel processing.
      We can implement mapreduce in 3 ways
      1. Java programming
      2. Hive (SQL like language)
      3. Pig (scripting,data flow language)
      Both hive and Pig will implement mapreduce by them self. But not all the problems are solved by Hive and Pig alone. We have to implement mapreduce using java. In this example “Custom Input Format” we cant implement this using Hive or Pig. So Core Java is must for industry. I hope this answer will help you. Let me know if you need more information.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s