Sunday, June 23, 2019

How MapReduce Works in Hadoop

In the post WordCount MapReduce program we have seen how to write a MapReduce program in Java, create a jar and run it. There are a lot of things that you do to create a MapReduce job and Hadoop framework also do a lot of processing internally. In this post we’ll see in detail how MapReduce works in Hadoop internally using the word count MapReduce program as example.

What is MapReduce

Hadoop MapReduce is a framework for writing applications that can process huge data in parallel, by working on small chunks of data in parallel on cluster of nodes. The framework ensures that this distributed processing happens in a reliable, fault-tolerant manner.

Map and Reduce

A MapReduce job in Hadoop consists of two phases-

  • Map phase– It has a Mapper class which has a map function specified by the developer. The input and output for Map phase is a (key, value) pair. When you copy the file that has to be processed to HDFS it is split into independent chunks. Hadoop framework creates one map task for each chunk and these map tasks run in parallel.
  • Reduce phase- It has a Reducer class which has a reduce function specified by the developer. The input and output for Reduce phase is also a (key, value) pair. The output of Map phase after some further processing by Hadoop framework (known as sorting and shuffling) becomes the input for reduce phase. So the output of Map phase is the intermediate output and it is processed by Reduce phase to generate the final output.

Since input and output for both map and reduce functions are key, value pair so if we say input for map is (K1, V1) and output is (K2, V2) then map function input and output will have the following form-

(K1, V1) -> list(K2, V2)

The intermediate output of the map function goes through some further processing with in the framework, known as shuffle and sort phase, before inputting to reduce function. The general form for the reduce function can be depicted as follows-

(K2, list(V2)) -> list(K3, V3)

Here note that the types of the reduce input matches the types of map output.

MapReduce explanation with example

Let’s take Word count MapReduce code as example and see what all happens in both Map and Reduce phases and how MapReduce works in Hadoop.

When we put the input text file into HDFS it is split into chunks of data. For simplicity sake let’s say we have two lines in the file and it is split into two parts with each part having one line.

If the text file has following two lines-

This is a test file
This is a Hadoop MapReduce program file

Then there will be two splits and two map tasks will get those two splits as input.

Mapper class

// Map function
public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
    // Splitting the line on spaces
    String[] stringArr = value.toString().split("\\s+");
    for (String str : stringArr) {
      word.set(str);
      context.write(word, one);
    }
  }
}

In the Mapper class you can see that it has four parameters first two specify the input to the map function and other to specify the output of the map function.

In this Word count program input key value pair will be as follows-

key- byte offset into the file at which the line starts.

Value– Content of the line.

As we assumed there will be two splits (each having one line of the file) and two map tasks let’s say Map-1 and Map-2, so input to Map-1 and Map-2 will be as follows.

Map-1– (0, This is a test file)

Map-2– (0, This is a Hadoop MapReduce program file)

Logic in map function is to split the line on spaces and the write each word to the context with value as 1.

So output from Map-1 will be as follows-

(This, 1)
(is, 1)
( a, 1)
(test, 1)
(file, 1)

And output from Map-2 will be as follows-

(This, 1)
(is, 1)
(a, 1)
(Hadoop, 1)
(MapReduce, 1)
(program, 1)
(file, 1)
Reducer class
// Reduce function
public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{	   
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

In the Reducer class again there are four parameters two for input types and two for output types of the reduce function.

Note that input type of the reduce function must match the output types of the map function.

This intermediate output from Map will be further processed by the Hadoop framework in the shuffle phase where it will be sorted and grouped as per keys, after this internal processing input to reduce will look like this-

[Hadoop, (1)]
[MapReduce, (1)]
[This, (1, 1)]
[a, (1, 1)]
[file, (1, 1)]
[is, (1, 1)]
[program, (1)]
[test, (1)]

You can see that the input to the reduce function is in the form (key, list(values)). In the logic of the reduce function, for each key value pair list of values is iterated and values are added. That will be the final output.

Hadoop 1
MapReduce 1
This 2
a 2
file. 2
is 2
program 1
test 1

That's all for the topic How MapReduce Works in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Hadoop MapReduce Word Count Program

Once you have installed Hadoop on your systemand initial verification is done you would be looking to write your first MapReduce program. Before digging deeper into the intricacies of MapReduce programming first step is the word count MapReduce program in Hadoop which is also known as the "Hello World" of the Hadoop framework.

So here is a simple Hadoop MapReduce word count program written in Java to get you started with MapReduce programming.

What you need

  1. It will be good if you have any IDE like Eclipse to write the Java code.
  2. A text file which is your input file. It should be copied to HDFS. This is the file which Map task will process and produce output in (key, value) pairs. This Map task output becomes input for the Reduce task.

Process

These are the steps you need for executing your Word count MapReduce program in Hadoop.

  1. Start daemons by executing the start-dfs and start-yarn scripts.
  2. Create an input directory in HDFS where you will keep your text file.
    bin/hdfs dfs -mkdir /user
    
    bin/hdfs dfs -mkdir /user/input
    
  3. Copy the text file you created to /usr/input directory.
    bin/hdfs dfs -put /home/knpcode/Documents/knpcode/Hadoop/count /user/input
    

    I have created a text file called count with the following content

    This is a test file.
    This is a test file.
    

    If you want to verify that the file is copied or not, you can run the following command-

    bin/hdfs dfs -ls /user/input
    
    Found 1 items
    -rw-r--r--   1 knpcode supergroup         42 2017-12-22 18:12 /user/input/count
    

Word count MapReduce Java code

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
  // Map function
  public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, one);
      }       
    }
  }
	
  // Reduce function
  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
	
  public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordMapper.class);    
    job.setReducerClass(CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

You will need at least the given jars to compile your MapReduce code, you will find them in the share directory of your Hadoop installation.

Word count MapReduce program jars

Running the word count MapReduce program

Once your code is successfully compiled, create a jar. If you are using eclipse IDE you can use it to create the jar by Right clicking on project – export – Java (Jar File)

Once jar is created you need to run the following command to execute your MapReduce code.

bin/hadoop jar /home/knpcode/Documents/knpcode/Hadoop/wordcount.jar org.knpcode.WordCount /user/input /user/output

In the above command

/home/knpcode/Documents/knpcode/Hadoop/wordcount.jar is the path to your jar.

org.knpcode.WordCount is the fully qualified name of Java class that you need to run.

/user/input is the path to input file.

/user/output is the path to output

In the java program in the main method there were these two lines-

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

That’s where input and output directories will be set.

To see an explanation of word count MapReduce program working in detail, check this post- How MapReduce Works in Hadoop

After execution you can check the output directory for the output.

bin/hdfs dfs -ls /user/output

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2017-12-22 18:15 /user/output/_SUCCESS
-rw-r--r--   1 knpcode supergroup         31 2017-12-22 18:15 /user/output/part-r-00000

The output can be verified by listing the content of the created output file.

bin/hdfs dfs -cat /user/output/part-r-00000
This	2
a	2
file.	2
is	2
test	2

That's all for the topic Hadoop MapReduce Word Count Program. If something is missing or you have something to share about the topic please write a comment.


You may also like

Saturday, June 15, 2019

How to Improve Map-Reduce Performance

In this post we’ll see some of the ways to improve performance of the Map-Reduce job in Hadoop.

The tips given here for improving the performance of MapReduce job are more from the MapReduce code and configuration perspective rather than cluster and hardware perspective.

1- Enabling uber mode– Like Hadoop 1 there is no JVM resuse feature in YARN Hadoop but you can enable the task to run in Uber mode, by default uber is not enabled. If uber mode is enabled ApplicationMaster can calculate that the overhead of negotiating resources with ResourceManager, communicating with NodeManagers on different nodes to launch the containers and running the tasks on those containers is much more that running MapReduce job sequentially in the same JVM, it can run a job as uber task.

2- For compression try to use native library- When using compression and decompression in Hadoop it is better to use native library as native library will outperform codec written in programming language like Java.

3- Increasing the block size- In case input file is of very large size you can consider improving the hdfs block size to 512 M. That can be done by setting the parameter dfs.blocksize. If you set the dfs.blocksize to a higher value input split size will increase to same size because the input size is calculated using the formula.

Math.max(mapreduce.input.fileinputformat.split.minsize, Math.min(mapreduce.input.fileinputformat.split.maxsize, dfs.blocksize))

thus making it of same size as HDFS block size. By increasing the block size you will have less overhead in terms of metadata as there will be less number of blocks.

If input split is larger, Map tasks will get more data to process. In Hadoop as many map tasks are started as there are input splits so having less input splits means the overhead to initialize map tasks is reduced.

4- Time taken by map tasks- A map task should run for at least a minute (1-3 minutes) if it is finishing with in less than a minute that means input data to a map task is less. If there are many small files in your map reduce job then try to use a container file format like Sequence file or Avro that contains those small files.

You can also use CombineFileInputFormat which put many files into an input split so that there is more data for mapper to process.

5- Input data compression is splittable or not- If input data is compressed then the compression format used is splittable or not is also one of the thing to consider. If input data is not splittable there would only be a single split processed by a single map task making the processing very slow and no parallelism at all.

For compressing input data compress using bzip2 which is splittable or using lzo with indexing to make it splittable.

6- Setting number of reduce tasks- The number of maps is usually driven by the number of input splits but number of reducers can be controlled. As per the documentation; the right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

7- Data skew at reducer side- If data is skewed in such a way that more values are grouped with a single key rather than having an even distribution of values then reduce tasks which process keys with more values will take more time to finish where as other reducers will get less data because of the uneven distribution and finish early.

In this type of scenario try to analyze the partition of data and look at the possibility of writing a custom partitioner so that data is evenly distributed among keys.

8- Shuffle phase performance improvements- Shuffle phase in Hadoop framework is very network intensive as files are transferred from mappers to reducers. There is lots of IO involve as map output is written to local disk, there is lots of processing also in form of partitioning the data as per reducers, sorting data by keys, merging.

Optimization for reducing the shuffle phase time helps in reducing the overall job time. Some of the performance improvement tips are as follows-

  • Compressing the map output- Since Map output is written to disk and also transferred to the reducer, compressing map output saves storage space, makes it faster to write to disk and reduces data that has to be transferred to reducer node.
  • Filtering data- See how you can cut down on data emitted by Map tasks. Filter the records to remove unwanted records entirely. Also, reduce the record size by taking only the relevant record fields.
  • Using Combiner- Using combiner in MapReduce is a good way to improve performance of the overall MapReduce job. By using combiner you can aggregate data in the map phase itself and reduce the number of records sent to the reducer.
  • Raw Comparator- During sorting and merging Hadoop framework uses comparator to compare keys. If you are using a custom comparator then try to write it to be a raw comparator so that comparison can be done at the byte level itself. Otherwise keys in the map tasks are to be deserialized to create an object and then compare making the process time consuming.
  • Setting parameters with optimum values- Another action you can take to improve performance of the MapReduce job is to change values of some of the configuration parameters.

    Your goal is to reduce the records spilled to disk at map as well as reduce side. At map side you can change the setting for the following parameters to try to reduce the number of spills to disk.

    • mapreduce.task.io.sort.mb- The total amount of buffer memory to use while sorting files, in megabytes.
    • mapreduce.map.sort.spill.percent- The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.At reduce side you can change the setting for the following parameters to try to keep data in memory itself.
    • mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
    • mapreduce.reduce.input.buffer.percent- The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce.
    • mapreduce.reduce.shuffle.memory.limit.percent- Maximum percentage of the in-memory limit that a single shuffle can consume.

9-Improvements in MapReduce coding- You should also optimize your MapReduce code so that it runs efficiently.

  • Reusing objects- Since map method is called many times so creating new objects judiciously will help you to reduce overhead associated with object creation. Try to reuse objects as much as you can. One of the mistake which is very frequent is writing code as follows.
    String[] stringArr = value.toString().split("\\s+");
    Text value = new Text(stringArr[0]);
    context.write(key, value);
    

    You should write it as following-

    private Text value = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      String[] stringArr = value.toString().split("\\s+");
      value.set(stringArr[0]);// reusing object
      context.write(key, value);
    }
    
  • String concatenation- Since String in Java is immutable so String concatenation results in String object creation. For appending prefer StringBuffer or StringBuilder instead.

That's all for the topic How to Improve Map-Reduce Performance. If something is missing or you have something to share about the topic please write a comment.


You may also like

Friday, June 14, 2019

OutputCommitter in Hadoop MapReduce

In Hadoop framework distributed processing happens where map and reduce tasks are spawned on different nodes and process part of the data. In this type of distributed processing it is important to ensure that framework knows when a particular task finishes or there is a need to abort the task and when does the over all job finish. For that purpose, like many other distributed systems, Hadoop also uses commit protocol. Class that implements it in Hadoop is OutputCommitter.

OutputCommitter class in Hadoop describes the commit of task output for a Map-Reduce job. In Hadoop 2 OutputCommitter implementation can be set using the getOutputCommitter() method of the OutputFormat class. FileOutputCommitter is the default OutputCommitter. Note that OutputCommitter is an abstract class in Hadoop framework which can be extended to provide OutputCommitter implementation.

Tasks performed by OutputCommitter in Hadoop Map-Reduce

  1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. For that setupJob() method is used. This is called from the application master process for the entire job. This will be called multiple times, once per job attempt.
  2. Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion.

    Cleaning up is done when either commitJob() or abortJob() method is called.

    • commitJob() method is invoked for jobs with final runstate as SUCCESSFUL. This is called from the application master process for the entire job. This method is guaranteed to only be called once to maintain atomicity.
    • abortJob() method is invoked for jobs with final runstate as JobStatus.State.FAILED or JobStatus.State.KILLED. This is called from the application master process for the entire job. This may be called multiple times.
  3. Setup the task temporary output. This is done by invoking setupTask() method. This method is called from each individual task's process that will output to HDFS, and it is called just for that task. This may be called multiple times for the same task, but for different task attempts.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
    Checking if the task needs commit is done using needsTaskCommit() method. This method returning false means commit phase is disabled for the tasks.
  5. Commit of the task output. During this step task's temporary output is promoted to final output location.
    Method used is commitTask(). There may be multiple task attempts for the same task, Hadoop framework ensures that the failed task attempts are aborted and only one task is committed.
  6. Discard the task commit. If a task doesn't finish abortTask() method is called. This method may be called multiple times for the same task, but for different task attempts.

That's all for the topic OutputCommitter in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Wednesday, June 12, 2019

How to Chain MapReduce Job in Hadoop

In many scenarios you would like to create a sequence of MapReduce jobs to completely transform and process the data. This is better than putting every thing in a single MapReduce job job and making it very complex.

In fact you can get your data through various sources and use a sequence of various applications too. That can be done by creating a work flow using Oozie but that is a topic for another post. In this post we’ll see how to chain MapReduce job in Hadoop using ChainMapper and ChainReducer.

ChainMapper in Hadoop

ChainMapper is one of the predefined MapReduce class in Hadoop. ChainMapper class allows you to use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion where the output of the first mapper becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

You can add mappers to a ChainMapper using addMapper() method.

ChainReducer in Hadoop

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

To add a Mapper class to the chain reducer you can use addMapper() method.

To set the Reducer class to the chain job you can use setReducer() method.

Chaining MapReduce job

Using the ChainMapper and the ChainReducer classes it is possible to compose MapReduce jobs that look like [MAP+ / REDUCE MAP*].

When you are using chained MapReduce you can have a combination as follows-

  1. One or more mappers
  2. Single Reducer
  3. Zero or more mappers (optional and to be used only if chained reducer is used)

When you are using chained MapReduce job the data from mappers or reducer is stored (and used) in the memory rather than on disk that reduces the disk IO to a large extent.

MapReduce chaining example

There is data of stocks with stock symbol, price and transaction in a day in the following format.

AAA		23	5677
BBB		23	12800
aaa		26	23785
.....
.....

In the data symbols are not always in the uppercase. So there are two mappers, in first relevant fields are extracted (symbol and transaction). In the second mapper symbols are converted to upper case.

Then there is a reducer that adds the transaction per symbol. Then with in the reduce task there is an InverseMapper that inverses the key, value pair. Note that InverseMapper is a predefined Mapper class with in the Hadoop framework that is why there is no implementation of it in the example code.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockTrans extends Configured implements Tool{
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      Integer trans = Integer.parseInt(stringArr[2]);
      context.write(symbol, new IntWritable(trans));
    }
  }
	
  // Mapper 2
  public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
    public void map(Text key, IntWritable value, Context context) 
        throws IOException, InterruptedException {
    
      String symbol = key.toString().toUpperCase();       
      context.write(new Text(symbol), value);
    }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }	

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockTrans(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock transactio");
    job.setJarByClass(getClass());
    // MapReduce chaining
    Configuration map1Conf = new Configuration(false);
    ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
        Text.class, IntWritable.class,  map1Conf);
    
    Configuration map2Conf = new Configuration(false);
    ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
           Text.class, IntWritable.class, map2Conf);
    
    Configuration reduceConf = new Configuration(false);		
    ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

On running this code after creating the jar.

hadoop jar /home/knpcode/Documents/knpcode/knpcodehadoop.jar org.knpcode.StockTrans /user/input/StockTrans.txt /user/output/stock
Output
hdfs dfs -cat /user/output/stock/part-r-00000

50483	AAA
180809	BBB

That's all for the topic How to Chain MapReduce Job in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Tuesday, June 11, 2019

Distributed Cache in Hadoop

In this post we’ll see what Distributed cache in Hadoop is.

What is a distributed cache

As the name suggests distributed cache in Hadoop is a cache where you can store a file (text, archives, jars etc.) which is distributed across the nodes where mappers and reducers for the MapReduce job are running. That way the cached files are localized for the running map and reduce tasks.

Methods for adding the files in Distributed Cache

There is a DistributedCache class with relevant methods but the whole class is deprecated in Hadoop2. You should be using the methods in Job class instead.

  • public void addCacheFile(URI uri)- Add a file to be localized.
  • public void addCacheArchive(URI uri)- Add archives to be localized.
  • public void addFileToClassPath(Path file)- Adds file path to the current set of classpath entries. It adds the file to cache as well. Files added with this method will not be unpacked while being added to the classpath.
  • public void addArchiveToClassPath(Path archive)- Adds an archive path to the current set of classpath entries. It adds the archive to cache as well. Archive files will be unpacked and added to the classpath when being distributed.

How to use distributed cache

In order to make available a file through distributed cache in Hadoop.

  1. Copy the file you want to make available through distributed cache to HDFS if it is not there already.
  2. Based on the file type use the relevant method to add it to distributed cache.

As example if you want to add a text file to distributed cache then you can use the following statement in your driver class.

job.addCacheFile(new URI("/user/input/test.txt#test"));

If you want to add a jar to the class path then you can do it as follows-

job.addFileToClassPath(new Path("/myapp/mylib.jar"));

Distributed cache example MapReduce code

Here is an Avro MapReduce word count example program. Output file is an Avro data file which uses an Avro schema. This Avro schema is added to the distributed cache using the addCacheFile() method and used by the mappers and reducers.

import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroWordCount extends Configured implements Tool{
	
  // Map function
  public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey, AvroValue>{
    private Text word = new Text();
    private GenericRecord record;
     
    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      Schema AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
      record = new GenericData.Record(AVRO_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        // creating Avro record
        record.put("word", str);
        record.put("count", 1);
        context.write(new AvroKey(word), new AvroValue(record));
      }
    }
  }
	
  // Reduce function
  public static class AvroWordReducer extends Reducer<AvroKey, AvroValue,
      AvroKey, NullWritable>{	  
    Schema AVRO_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
    }
    public void reduce(AvroKey key, Iterable<AvroValue> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue value : values) {
        GenericRecord	record = value.datum();
        sum += (Integer)record.get("count");
      }
      GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
      record.put("word", key.datum());
      record.put("count", sum);
      context.write(new AvroKey(record), NullWritable.get());
    }
  }

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroWordCount(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "AvroWC");
    job.setJarByClass(getClass());
    job.setMapperClass(AvroWordMapper.class);    
    job.setReducerClass(AvroWordReducer.class);
    // Name after the # sign in the file location
    // will be used as the file name in Mapper/Reducer
    job.addCacheFile(new URI("/user/input/wcschema.avsc#wcschema"));
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    FileSystem fs = FileSystem.get(conf);
    // Need schema file stored in HDFS here also
    Path path = new Path("/user/input/wcschema.avsc".toString());
    Schema sc = new Schema.Parser().parse((fs.open(path)));
    AvroJob.setMapOutputValueSchema(job, sc);
    AvroJob.setOutputKeySchema(job,	sc);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for the topic Distributed Cache in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Monday, June 10, 2019

Combiner in Hadoop MapReduce

This post shows what is combiner in Hadoop MapReduce and how combiner function can be used to reduce the overall memory, I/O and network requirement of the overall MapReduce execution.

Why is combiner needed in MapReduce

When a MapReduce job is executed and the mappers start producing output a lot of processing happens with in the Hadoop framework knows as the shuffling and sorting phase.

Map output is partitioned based on the number of reducers, those partitions are also sorted and then written to local disk.

Then the data, from the nodes where maps are running, is transferred to the nodes where reducers are running. Since a single reducer will get its input from several mappers so all that data from several maps is transferred to the reducer and merged again to form the complete input for the reduce task.

As you can see all this processing requires memory, network bandwidth and I/O. That is where combiner in Hadoop can help by minimizing the data that is sent to the reducer.

Combiner function in MapReduce

Combiner in Hadoop is an optimization that can aggregate data at the map-side itself. Combiner function runs on the map output, aggregates the data (so the data size becomes less) and the output of combiner function becomes the input for reduce task. Note that using combiner is optional.

Most of the times you will use your Reducer class as the combiner class too. If you are not, then also your Combiner class implementation must extend the Reducer and implement the reduce method.

Since combiner has the same semantics as reducer so the input and output types follows the same requirement. In a MapReduce job reduce input types must match the map output types, same way combiner input types must match the map output types. Since output of combiner becomes the input to reducer so output types of combiner must match the reduce input types.

combiner in hadoop

For example– Suppose you are trying to get maximum price for a stock. There are two input splits which are processed by two different maps.

Split 1
AAA		23
AAA		26
AAA		21
AAA		19
Split 2-
AAA		27
AAA		28
AAA		25
AAA		24
Output of Map-1
(AAA, 23)
(AAA, 26)
(AAA, 21)
(AAA, 19)
Output of Map-2
(AAA, 27)
(AAA, 28)
(AAA, 25)
(AAA, 24)

After the shuffle and sort phase reduce task will get its input as follows-

[AAA, (23, 26, 21, 19, 27, 28, 25, 24)] 

And the reduce output– (AAA, 28)

Here if you specify the combiner class same as reducer then the combiner will aggregate the respective map outputs.

Combiner for Map-1 output
(AAA, 26)
Combiner for Map-2 output
(AAA, 28)
Now the input to the reduce is as follows-
[AAA, (26, 28)]

So you can see how the data that is transferred to the reducer is minimized.

How to specify a combiner in MapReduce job

You can specify a combiner using the setCombinerClass() method of the Job class in your MapReduce driver. For example if your Reducer class is MaxStockPriceReducer and you want to set the Reducer class as the Combiner class too then it can be done as follows.

job.setCombinerClass(MaxStockPriceReducer.class);

One thing you will have to ensure when using combiner is; however inputs are combined the end result should be identical.

As example if you are calculating average where map-1 (3,4,5) and map-2 (6, 8)

Then reduce function will calculate average as- (3, 4, 5, 6, 8) = 5.2

With combiner-

Average of (3,4,5) = 4

Average of (6, 8) = 7

Then in reduce function– Average of (4, 7) = 5.5

In this example you can see with combiner the result is different, so you’ll have to write your logic in such a way that even if combiner is used the result should be identical.

MapReduce Example using combiner

Here is a MapReduce example where the max price per stock symbol is calculated using MapReduce. Input file has tab separated data comprising of Stock symbol and price.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Map function
  public static class MaxStockPriceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      
    private final static IntWritable one = new IntWritable(1);
    private Text symbol = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      symbol.set(stringArr[0]);
      Integer price = Integer.parseInt(stringArr[1]);
      context.write(symbol, new IntWritable(price));
    }
  }
	
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {

      int maxValue = Integer.MIN_VALUE;
      for (IntWritable val : values) {
        maxValue = Math.max(maxValue, val.get());
      }      
      context.write(key, new IntWritable(maxValue));
    }
  }
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxStockPriceMapper.class);    
    job.setReducerClass(MaxStockPriceReducer.class);		
    //job.setCombinerClass(MaxStockPriceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

As you can see initially the line where combiner class is set is commented. If you run this MapReduce job without specifying any combiner and see the counters in the console.

Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=106
Reduce input records=10
Reduce output records=2
Spilled Records=20

Now the line setting the combiner is uncommented and the MapReduce job is run again now the same counters are as follows.

Combine input records=10
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4

So you can see combiner itself minimized the data sent to reducer and the shuffled bytes are also reduced in the process.

That's all for the topic Combiner in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Mapper Only Job in Hadoop MapReduce

Generally when we think of MapReduce job in Hadoop we think of both mappers and reducers doing their share of processing. That is true for most of the cases but you can have scenarios where you want to have a mapper only job in Hadoop.

When do you need map only job

You may opt for a map only job in Hadoop when you do need to process the data and get the output as (key, value) pairs but don’t want to aggregate those (key, value) pairs.

For example– If you are converting a text file to sequence file using MapReduce. In this case you just want to read a line from text file and write it to a sequence file so you can opt for a MapReduce with only map method.

Same way if you are converting a text file to parquet file using MapReduce you can opt for a mapper only job in Hadoop.

What you need to do for mapper only job

For a mapper only job you need to write only map method in the code, which will do the processing. Number of reducers is set to zero.

In order to set the number of reducers to zero you can use the setNumReduceTasks() method of the Job class. So you need to add the following in your job configuration in your MapReduce code driver.

job.setNumReduceTasks(0);

Benefits of Mapper only job

As already stated if you just want to process the data with out any aggregation then better go for a mapper only job as you can save on some of the processing done internally by the Hadoop framework.

Since reducer is not there so no need of shuffle and sort phase also transferring of data to the nodes where reducers are running is not required.

Also note that in a MapReduce job the output of map phase is written to the local disk on the node rather than to HDFS. Where as, in the case of Mapper only job, Map output is written to the HDFS.

Mapper only job in Hadoop example

If you have to convert a text file to sequence file that can be done using only a map function, you can set the number of reducers to zero.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileWriter extends	Configured implements Tool{
  // Map function
  public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
  public static void main(String[] args)  throws Exception{
    int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
    System.exit(exitFlag);	   
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "sfwrite");
    job.setJarByClass(SequenceFileWriter.class);
    job.setMapperClass(SFMapper.class);
    // Setting reducer to zero
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // Compression related settings
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    int returnFlag = job.waitForCompletion(true) ? 0 : 1;
    return returnFlag;
  }
}

You can run the MapReduce job using the followng command.

$ hadoop jar /pathto/jar/knpcodehadoop.jar org.knpcode.SequenceFileWriter /user/input/count /user/output/seq

By listing the output directory you can see that a sequence file is created.

hdfs dfs -ls /user/output/seq

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2018-06-14 12:26 /user/output/seq/_SUCCESS
-rw-r--r--   1 knpcode supergroup        287 2018-06-14 12:26 /user/output/seq/part-m-00000

That's all for the topic Mapper Only Job in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Sunday, June 9, 2019

How to See Logs And Sysouts in Hadoop MapReduce

While writing a program, in order to debug we do put some logs or system.out to display messages. In your MapReduce program also you can use logger or sysouts for debugging purposes. In this post we’ll see how you can access those logs or system.out.print messages in Hadoop MR2.

How to see log messages in MapReduce2

First thing of course is to put logs in your code. Then at the time of running your MapReduce job you can note the application_id of the job from the console. Once you run your MapReduce job you will get a line as following displayed on the console showing the application id.

18/06/13 15:20:59 INFO impl.YarnClientImpl: Submitted application application_1528883210739_0001

With the same application_id a folder will be created in the location HADOOP_INSTALLATION_DIR/logs/userlogs/ there you will find folders having logs for your mappers and reducers. In those folders you can check stdout file for any system.out.print and syslog for log messages.

Example MapReduce showing how to put logs

You can use Apache commons logging which comes with the Hadoop bundle for logging purposes. Here is a simple word count MapReduce program with some log.info and sysout messages put in.

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount{
  public static final Log log = LogFactory.getLog(WordCount.class);
  // Map function
  public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
				
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      log.info("in map method");
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        System.out.println("word -- " + word.toString());
        context.write(word, one);
      }	 
    }
  }
	
  // Reduce function
  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      log.info("in reducer ");
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      log.info(key + " -- Sum is --- " + sum);
      result.set(sum);
      context.write(key, result);
    }
  }
	
  public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordMapper.class);  
    //job.setNumReduceTasks(0);
    job.setReducerClass(CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Once you run it and you know the application id just go to the location and check the stdout and syslog files.

As example after running this code I can access the stdout at the path-HADOOP_INSTALLATION_DIR/logs/userlogs/application_1528883210739_0001/container_1528883210739_0001_01_000002/stdout and see my sysouts there-

word -- This
word -- is
word -- a
word -- test
word -- file.
word -- This
word -- is
word -- a
word -- Hadoop
word -- MapReduce
word -- program
word – file.

Or I can access syslog at the path- HADOOP_INSTALLATION_DIR/logs/userlogs/application_1528883210739_0001/container_1528883210739_0001_01_000003/syslog and see the loggers for the reducer.

2018-06-13 15:21:15,321 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,321 INFO [main] org.knpcode.WordCount$WordMapper: Hadoop -- Sum is --- 1
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: MapReduce -- Sum is --- 1
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: This -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: a -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: file. -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: is -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: program -- Sum is --- 1
2018-06-13 15:21:15,323 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,323 INFO [main] org.knpcode.WordCount$WordMapper: test -- Sum is --- 1

That's all for the topic How to See Logs And Sysouts in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Thursday, June 6, 2019

Input Split in Hadoop MapReduce

When a MapReduce job is started to process a file stored in HDFS, one of the thing Hadoop does is to divide the input into logical splits, these splits are known as input splits in Hadoop.

InputSplit represents the data to be processed by an individual map task which means the number of mappers started equals the number of input splits calculated for the job. For example if input data is logically divided into 8 input splits, then 8 mappers will be started to process those input splits in parallel.

Input split is a logical division of data

Input split is just the logical division of the data, it doesn’t contain the physical data. What input split refers to in this logical division is the records in the data. When mapper processes the input split it actually works on the records ((key, value) pairs) with in that input split in Hadoop.

With in Hadoop framework it is the InputFormat class that splits-up the input files into logical InputSplits.

It is the RecordReader class that breaks the data into key/value pairs which is then passed as input to the Mapper.

InputFormat class in Hadoop Framework

public abstract class InputFormat<K, V> {
  public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

Input Split Vs HDFS blocks

Many people get confused between the HDFS blocks and input splits, since HDFS block is also the division of data into smaller chunks which are then stored across the cluster. Moreover, it is ultimately the data stored in the nodes that is processed by MapReduce job then what actually is the task of input split in Hadoop.

HDFS block is the physical representation of the data, actual data is stored with in the Hadoop Distributed File System. Where as input split is just the logical representation of the data. When data is split into blocks for storing into HDFS it just divides the data into chunks of 128 MB (default block size) with no consideration for record boundaries.

For example if each record is 50 MB then two records will fit with in the block but the third record won’t fit, 28 MBs of the third record will be stored in another block. If a mapper processes a block then it won’t be able to process the third record as it won’t get the full record.

Input split which is logical representation of the data honors logical record boundaries. Using the starting record in the block and the byte offset it can get the complete record even if it spans the block boundaries. Thus the mapper working on the input split will be able to process all 3 records even if part of third record is stored in another block.

Input Split in Hadoop

That's all for the topic Input Split in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Wednesday, June 5, 2019

Speculative Execution in Hadoop Framework

Speculative execution in Hadoop framework is an optimization technique to ensure that the submitted job finishes in a time-bound manner.

Need for speculative execution in Hadoop

When a MapReduce job is submitted there will be several map tasks running in parallel working on the portion of the data (input splits). Same way several reduce tasks will be spawned to work in parallel to produce the final output.

These map and reduce tasks are started on different nodes across the cluster. You may have a scenario that a few map or reduce tasks run slower than the others in a cluster. It may happen because of some hardware or network problem in the node where these tasks are running.

These slower tasks may effect the overall job execution as reduce tasks can start only when all the map tasks finish, so a slow map task may be a bottleneck here. Same way a slower reduce task may increase the overall time to finish the job. To mitigate against these bottlenecks Hadoop framework uses speculative execution.

How speculative execution in Hadoop works

After starting the map tasks and reduce tasks respectively and monitoring their progress for some time Hadoop framework knows which map or reduce tasks are taking more time than the usual. For those slow running tasks Hadoop starts the same task on another node. Here Hadoop framework is speculating that the same task operating on the same data started on another node will finish faster thus the name speculative execution of the task.

Here note that the original task and speculative task both will run and the output of whichever finishes first is used and the another one is killed. If the original task finishes before the speculative task then the speculative task is killed and vice versa.

For example if Hadoop framework detects that a map task for a given job is executing slower than the other map tasks for the same job, another instance of the same map task operating on the same data will be started on another node. Whichever map tasks finishes first, output of that will be used and other is killed.

Configuration for Speculative execution

In Hadoop speculative execution is set to true by default for both map and reduce tasks. Properties for it are set in mapred-site.xml.

  • mapreduce.map.speculative- If true, then multiple instances of some map tasks may be executed in parallel. Default is true.
  • mapreduce.reduce.speculative- If true, then multiple instances of some reduce tasks may be executed in parallel. Default is true.
  • mapreduce.job.speculative.speculative-cap-running-tasks- The max percent of running tasks that can be speculatively re-executed at any time. Default value is 0.1.

The class used for speculative execution calculations by Hadoop framework is yarn.app.mapreduce.am.job.speculator.class. The speculator class is instantiated in MRAppMaster.

Speculative execution in Hadoop - Drawbacks

Though idea of speculative execution of task is to reduce the execution time of the task but that involves running duplicate tasks. This duplicate execution of the tasks increases the load on the cluster. In case of a very busy cluster or a cluster with limited resources administrator may consider turning off the speculative execution.

This problem of running duplicate tasks is more pronounced in the case of reduce tasks. A reduce task gets its input from more than one map tasks running on different nodes so there is data transfer in case of reduce tasks. Running the same reduce task as part of speculative execution means same data transfer happens more than once thus increasing network load.

That's all for the topic Speculative Execution in Hadoop Framework. If something is missing or you have something to share about the topic please write a comment.


You may also like

Monday, June 3, 2019

What is Data Locality in Hadoop

In this Hadoop tutorial we’ll talk about data locality in Hadoop, how data locality helps in running the job faster and saves cluster bandwidth.

Data locality in Hadoop

When a file is stored in HDFS it is divided into blocks of 128 MB (Default block size) and these blocks are stored on different nodes across the cluster. These HDFS blocks are also replicated as per the replication factor (Default is 3). Even at the time of creating replicas Hadoop takes the cluster topology into consideration and tries to honor the data locality.

When a MapReduce job is started to process a file in Hadoop, MapReduce job calculates the input splits for the job, by default input split size is same as HDFS block size i.e. 128 MB. Hadoop framework creates as many map tasks as there are input splits on the job.

For example– There is a 1 GB file which is stored as 8 HDFS blocks of 128 MB each. A  MapReduce job processing this file calculates that there are 8 input splits then Hadoop framework will start 8 map tasks to process these 8 input splits. Now what makes more sense for processing these splits:

Sending the map tasks, which will be few KBs in most cases, to the node where data is residing (128 MB block which map task has to process)

or

Transferring the data across to the network where Map task is started?

Don’t forget that there are 8 Map tasks and all of them will want their split data which means a lot of pressure on bandwidth if all of that data is transferred across nodes to their respective map tasks.

To avoid this Hadoop framework does the smart thing known as "data locality optimization", rather than bringing data to computation it sends computation to data. Hadoop tries to run the Map tasks on the same nodes where the split data resides in HDFS thus making the task data local.

Task execution in YARN

When the application master requests containers for map tasks from ResourceManager data locality is also considered. Scheduler tries to allocate container on the node where the data resides so that the task is data local. But that is not possible always as there may not be enough resources available on the node where data resides to run a map task that brings us to the topic of levels of proximity between map task and data.

Map task and data proximity categories

Data locality in Hadoop is categorized into 3 categories based on the proximity between the Map task and the data.

  1. Data local– If map task runs on the same node where data resides that is the optimal case and known as data local.
  2. Rack local– If a map task runs on the same rack though not on the same node where the split resides that is known as rack local.
  3. Different rack– If map task can’t run on the same node, not even on the same rack then map task has to get the data it has to process from different rack. This is the least preferred scenario.
Data locality in hadoop

That's all for the topic What is Data Locality in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Thursday, April 11, 2019

Shuffle Phase in Hadoop MapReduce

In a MapReduce job when Map tasks start producing output, the output is sorted by keys and the map outputs are also transferred to the nodes where reducers are running. This whole process is known as shuffle phase in the Hadoop MapReduce.

Though shuffle phase is internal to Hadoop framework but there are several configuration parameters to control it. This tuning helps in running your MapReduce job efficiently. In this post we’ll see what happens during sorting and shuffling at both mapper as well as reducer end.

Shuffling and sorting at Map end

When the map task starts producing output it is first written to a memory buffer which is 100 MB by default. It is configured using mapreduce.task.io.sort.mb parameter in mapred-site.xml.

When the memory buffer reaches a certain threshold then only the map output is spilled to the disk. Configuration parameter for it is mapreduce.map.sort.spill.percent which is by default 80% of the allotted memory buffer size. Once this threshold is reached, a thread will begin to spill the contents to disk in the background.

Before the map output is written to disk following actions are taken-

  1. Output is divided into partitions as per the number of reducers. For example if there are 4 reducers then each map output is divided into 4 partitions. A partition can have data for more than one key but the data for any specific key resides in a single partition. If there are 10 mappers running then output of each mapper is divided into 4 partitions and then the partition having the similar kind of keys is transferred to a reducer.
  2. With in each partition data is also sorted by key.
  3. If there is a combiner defined that is also executed.

Every time buffer memory reaches the threshold a new spill file is created and the actions as stated above are executed. At the end before the map tasks finishes all these files spilled to the disk are merged to create a single file while still honoring the partition boundaries and the sorting of keys with in each partition.

Shuffle and sort in MapReduce

Shuffle phase at Reduce end

Once the Map output is written to the local disk of the node where Map task is running, the partitions are to be transferred to the reducers. Each reducer will get the data of its particular partition from all the mappers.

For example if there are 4 map tasks and 2 reducers then output of all these 4 maps will be divided into 2 partitions, one for each reducer.

Copy phase in reduce

As soon as the map task finishes and notifies ApplicationMaster the reduce tasks start copying the data of that particular map. It doesn’t wait for all the running map tasks to finish. Reducer uses threads to copy map outputs in parallel. How many threads to run is configurable and the parameter for the same is mapreduce.reduce.shuffle.parallelcopies. The default number of parallel transfers run by reduce during the copy (shuffle) phase is 5.

On the reduce side also data is kept in the memory buffer, if it fits in the memory itself then it helps in reduce task to execute faster. The size of the memory buffer is configured using the mapreduce.reduce.shuffle.input.buffer.percent parameter. It denotes the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%.

If data doesn’t fit the memory then it is spilled to the disk. Threshold for that is set using the following 2 configuration parameters-

  • mapreduce.reduce.merge.inmem.threshold- The threshold, in terms of the number of files for the in-memory merge process. When we accumulate threshold number of files we initiate the in-memory merge and spill to disk. Default number of files is 1000.
  • mapreduce.reduce.shuffle.merge.percent- The usage threshold at which an in-memory merge will be initiated, expressed as a percentage of the total memory allocated to storing in-memory map outputs, as defined by mapreduce.reduce.shuffle.input.buffer.percent.

Once the data from all the mappers is copied and merged to create a single sorted file (partitions from all the mappers, sorted by keys) that becomes the input for the reduce task.

shuffle phase reduce side

That's all for the topic Shuffle Phase in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like