Shuffle Phase in Hadoop MapReduce

In a MapReduce job when Map tasks start producing output, the output is sorted by keys and the map outputs are also transferred to the nodes where reducers are running. This whole process is known as shuffle phase in the Hadoop MapReduce.

Though shuffle phase is internal to Hadoop framework but there are several configuration parameters to control it. This tuning helps in running your MapReduce job efficiently. In this post we’ll see what happens during sorting and shuffling at both mapper as well as reducer end.

Shuffling and sorting at Map end

When the map task starts producing output it is first written to a memory buffer which is 100 MB by default. It is configured using parameter in mapred-site.xml.

When the memory buffer reaches a certain threshold then only the map output is spilled to the disk. Configuration parameter for it is which is by default 80% of the allotted memory buffer size. Once this threshold is reached, a thread will begin to spill the contents to disk in the background.

Before the map output is written to disk following actions are taken-

  1. Output is divided into partitions as per the number of reducers. For example if there are 4 reducers then each map output is divided into 4 partitions. A partition can have data for more than one key but the data for any specific key resides in a single partition. If there are 10 mappers running then output of each mapper is divided into 4 partitions and then the partition having the similar kind of keys is transferred to a reducer.
  2. With in each partition data is also sorted by key.
  3. If there is a combiner defined that is also executed.

Every time buffer memory reaches the threshold a new spill file is created and the actions as stated above are executed. At the end before the map tasks finishes all these files spilled to the disk are merged to create a single file while still honoring the partition boundaries and the sorting of keys with in each partition.

Shuffle and sort in MapReduce

Shuffle phase at Reduce end

Once the Map output is written to the local disk of the node where Map task is running, the partitions are to be transferred to the reducers. Each reducer will get the data of its particular partition from all the mappers.

For example if there are 4 map tasks and 2 reducers then output of all these 4 maps will be divided into 2 partitions, one for each reducer.

Copy phase in reduce

As soon as the map task finishes and notifies ApplicationMaster the reduce tasks start copying the data of that particular map. It doesn’t wait for all the running map tasks to finish. Reducer uses threads to copy map outputs in parallel. How many threads to run is configurable and the parameter for the same is mapreduce.reduce.shuffle.parallelcopies. The default number of parallel transfers run by reduce during the copy (shuffle) phase is 5.

On the reduce side also data is kept in the memory buffer, if it fits in the memory itself then it helps in reduce task to execute faster. The size of the memory buffer is configured using the mapreduce.reduce.shuffle.input.buffer.percent parameter. It denotes the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%.

If data doesn’t fit the memory then it is spilled to the disk. Threshold for that is set using the following 2 configuration parameters-

  • mapreduce.reduce.merge.inmem.threshold- The threshold, in terms of the number of files for the in-memory merge process. When we accumulate threshold number of files we initiate the in-memory merge and spill to disk. Default number of files is 1000.
  • mapreduce.reduce.shuffle.merge.percent- The usage threshold at which an in-memory merge will be initiated, expressed as a percentage of the total memory allocated to storing in-memory map outputs, as defined by mapreduce.reduce.shuffle.input.buffer.percent.

Once the data from all the mappers is copied and merged to create a single sorted file (partitions from all the mappers, sorted by keys) that becomes the input for the reduce task.

shuffle phase reduce side
Related Posts

That's all for the topic Shuffle Phase in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.

You may also like

What is Hadoop

Apache Hadoop is an open source framework for storing data and processing of data set of big data on a cluster of nodes (commodity hardware) in parallel.

Hadoop framework is designed to scale up from single server to thousand of machines with each machine offering both storage and computation. It is also reliable and fault tolerant, framework itself is designed to detect and handle failures at the application layer, that way Hadoop framework provides a highly-available service using a cluster of nodes.

Modules of Hadoop

Hadoop framework is written in Java and it includes these modules-

  1. Hadoop Common– This module contains libraries and utilities used by other modules.
  2. Hadoop Distributed File System (HDFS)– This is the storage part of the Hadoop framework. It is a distributed file system that works on the concept of breaking the huge file into blocks and storing those blocks in different nodes. That way HDFS provides high-throughput access to application data.
  3. Hadoop Yarn (Yet Another Resource Negotiator)– This module is responsible for scheduling jobs and managing cluster resources. Refer YARN in Hadoop to read more about YARN.
  4. Hadoop MapReduce– This is the implementation of the MapReduce programming model to process the data in parallel.
Modules in Hadoop

Brief history of Hadoop

Hadoop was created by Doug Cutting and it has its origins in Nutch which is an open source web crawler. When Doug Cutting and Mike Cafarella were working on Nutch and trying to scale it they came across two google white papers about GFS (Google’s Distributed File System) and MapReduce. Using the architecture described in those papers Nutch’s developers came up with open source implementation of distributed file system NDFS (Nutch Distributed File System) and MapReduce.

It was realized that NDFS and MapReduce can be created as a separate project and that way Hadoop initially became a sub-project. Yahoo also helped by providing resources and team to develop the framework by improving scalability, performance, and reliability and adding many new features. In 2008 Hadoop became a top-level project in Apache rather than being a sub-project and now it is a widely used framework with its own ecosystem.

How Hadoop works

Here I’ll try to explain how Hadoop works in very simple terms without going into the complexities what all daemons like NameNode or Resource Manager do.

Once you copy a huge file into HDFS, framework splits the file into blocks and distribute those blocks across nodes in a cluster.

Then you write a MapReduce program having some logic to process that data. You package your code as a jar and that packaged code is transferred to DataNodes where data blocks are stored. That way your MapReduce code work on the part of the file (HDFS block that resides on the node where code is running) and processes data in parallel.

Other advantage is that rather than sending data to code (like traditional programming where data is fetched from DB server) you send the code to data. Obviously data is much larger in size so that way Hadoop uses network bandwidth more proficiently.

Here is a high level diagram which tells in a simple way how Hadoop framework works.

How Hadoop works
Related Posts

That's all for the topic What is Hadoop. If something is missing or you have something to share about the topic please write a comment.

You may also like