December 26, 2023

How to Improve Map-Reduce Performance

In this post we’ll see some of the ways to improve performance of the Map-Reduce job in Hadoop.

The tips given here for improving the performance of MapReduce job are more from the MapReduce code and configuration perspective rather than cluster and hardware perspective.

1- Enabling uber mode– Like Hadoop 1 there is no JVM resuse feature in YARN Hadoop but you can enable the task to run in Uber mode, by default uber is not enabled. If uber mode is enabled ApplicationMaster can calculate that the overhead of negotiating resources with ResourceManager, communicating with NodeManagers on different nodes to launch the containers and running the tasks on those containers is much more that running MapReduce job sequentially in the same JVM, it can run a job as uber task.

2- For compression try to use native library- When using compression and decompression in Hadoop it is better to use native library as native library will outperform codec written in programming language like Java.

3- Increasing the block size- In case input file is of very large size you can consider improving the hdfs block size to 512 M. That can be done by setting the parameter dfs.blocksize. If you set the dfs.blocksize to a higher value input split size will increase to same size because the input size is calculated using the formula.

Math.max(mapreduce.input.fileinputformat.split.minsize, Math.min(mapreduce.input.fileinputformat.split.maxsize, dfs.blocksize))

thus making it of same size as HDFS block size. By increasing the block size you will have less overhead in terms of metadata as there will be less number of blocks.

If input split is larger, Map tasks will get more data to process. In Hadoop as many map tasks are started as there are input splits so having less input splits means the overhead to initialize map tasks is reduced.

4- Time taken by map tasks- A map task should run for at least a minute (1-3 minutes) if it is finishing with in less than a minute that means input data to a map task is less. If there are many small files in your map reduce job then try to use a container file format like Sequence file or Avro that contains those small files.

You can also use CombineFileInputFormat which put many files into an input split so that there is more data for mapper to process.

5- Input data compression is splittable or not- If input data is compressed then the compression format used is splittable or not is also one of the thing to consider. If input data is not splittable there would only be a single split processed by a single map task making the processing very slow and no parallelism at all.

For compressing input data compress using bzip2 which is splittable or using lzo with indexing to make it splittable.

6- Setting number of reduce tasks- The number of maps is usually driven by the number of input splits but number of reducers can be controlled. As per the documentation; the right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

7- Data skew at reducer side- If data is skewed in such a way that more values are grouped with a single key rather than having an even distribution of values then reduce tasks which process keys with more values will take more time to finish where as other reducers will get less data because of the uneven distribution and finish early.

In this type of scenario try to analyze the partition of data and look at the possibility of writing a custom partitioner so that data is evenly distributed among keys.

8- Shuffle phase performance improvements- Shuffle phase in Hadoop framework is very network intensive as files are transferred from mappers to reducers. There is lots of IO involve as map output is written to local disk, there is lots of processing also in form of partitioning the data as per reducers, sorting data by keys, merging.

Optimization for reducing the shuffle phase time helps in reducing the overall job time. Some of the performance improvement tips are as follows-

  • Compressing the map output- Since Map output is written to disk and also transferred to the reducer, compressing map output saves storage space, makes it faster to write to disk and reduces data that has to be transferred to reducer node.
  • Filtering data- See how you can cut down on data emitted by Map tasks. Filter the records to remove unwanted records entirely. Also, reduce the record size by taking only the relevant record fields.
  • Using Combiner- Using combiner in MapReduce is a good way to improve performance of the overall MapReduce job. By using combiner you can aggregate data in the map phase itself and reduce the number of records sent to the reducer.
  • Raw Comparator- During sorting and merging Hadoop framework uses comparator to compare keys. If you are using a custom comparator then try to write it to be a raw comparator so that comparison can be done at the byte level itself. Otherwise keys in the map tasks are to be deserialized to create an object and then compare making the process time consuming.
  • Setting parameters with optimum values- Another action you can take to improve performance of the MapReduce job is to change values of some of the configuration parameters.

    Your goal is to reduce the records spilled to disk at map as well as reduce side. At map side you can change the setting for the following parameters to try to reduce the number of spills to disk.

    • mapreduce.task.io.sort.mb- The total amount of buffer memory to use while sorting files, in megabytes.
    • mapreduce.map.sort.spill.percent- The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.At reduce side you can change the setting for the following parameters to try to keep data in memory itself.
    • mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
    • mapreduce.reduce.input.buffer.percent- The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce.
    • mapreduce.reduce.shuffle.memory.limit.percent- Maximum percentage of the in-memory limit that a single shuffle can consume.

9-Improvements in MapReduce coding- You should also optimize your MapReduce code so that it runs efficiently.

  • Reusing objects- Since map method is called many times so creating new objects judiciously will help you to reduce overhead associated with object creation. Try to reuse objects as much as you can. One of the mistake which is very frequent is writing code as follows.
    String[] stringArr = value.toString().split("\\s+");
    Text value = new Text(stringArr[0]);
    context.write(key, value);
    

    You should write it as following-

    private Text value = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      String[] stringArr = value.toString().split("\\s+");
      value.set(stringArr[0]);// reusing object
      context.write(key, value);
    }
    
  • String concatenation- Since String in Java is immutable so String concatenation results in String object creation. For appending prefer StringBuffer or StringBuilder instead.

That's all for the topic How to Improve Map-Reduce Performance. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment