December 23, 2023

Combiner in Hadoop MapReduce

This post shows what is combiner in Hadoop MapReduce and how combiner function can be used to reduce the overall memory, I/O and network requirement of the overall MapReduce execution.

Why is combiner needed in MapReduce

When a MapReduce job is executed and the mappers start producing output a lot of processing happens with in the Hadoop framework knows as the shuffling and sorting phase.

Map output is partitioned based on the number of reducers, those partitions are also sorted and then written to local disk.

Then the data, from the nodes where maps are running, is transferred to the nodes where reducers are running. Since a single reducer will get its input from several mappers so all that data from several maps is transferred to the reducer and merged again to form the complete input for the reduce task.

As you can see all this processing requires memory, network bandwidth and I/O. That is where combiner in Hadoop can help by minimizing the data that is sent to the reducer.

Combiner function in MapReduce

Combiner in Hadoop is an optimization that can aggregate data at the map-side itself. Combiner function runs on the map output, aggregates the data (so the data size becomes less) and the output of combiner function becomes the input for reduce task. Note that using combiner is optional.

Most of the times you will use your Reducer class as the combiner class too. If you are not, then also your Combiner class implementation must extend the Reducer and implement the reduce method.

Since combiner has the same semantics as reducer so the input and output types follows the same requirement. In a MapReduce job reduce input types must match the map output types, same way combiner input types must match the map output types. Since output of combiner becomes the input to reducer so output types of combiner must match the reduce input types.

combiner in hadoop

For example– Suppose you are trying to get maximum price for a stock. There are two input splits which are processed by two different maps.

Split 1
AAA		23
AAA		26
AAA		21
AAA		19
Split 2-
AAA		27
AAA		28
AAA		25
AAA		24
Output of Map-1
(AAA, 23)
(AAA, 26)
(AAA, 21)
(AAA, 19)
Output of Map-2
(AAA, 27)
(AAA, 28)
(AAA, 25)
(AAA, 24)

After the shuffle and sort phase reduce task will get its input as follows-

[AAA, (23, 26, 21, 19, 27, 28, 25, 24)] 

And the reduce output– (AAA, 28)

Here if you specify the combiner class same as reducer then the combiner will aggregate the respective map outputs.

Combiner for Map-1 output
(AAA, 26)
Combiner for Map-2 output
(AAA, 28)
Now the input to the reduce is as follows-
[AAA, (26, 28)]

So you can see how the data that is transferred to the reducer is minimized.

How to specify a combiner in MapReduce job

You can specify a combiner using the setCombinerClass() method of the Job class in your MapReduce driver. For example if your Reducer class is MaxStockPriceReducer and you want to set the Reducer class as the Combiner class too then it can be done as follows.

job.setCombinerClass(MaxStockPriceReducer.class);

One thing you will have to ensure when using combiner is; however inputs are combined the end result should be identical.

As example if you are calculating average where map-1 (3,4,5) and map-2 (6, 8)

Then reduce function will calculate average as- (3, 4, 5, 6, 8) = 5.2

With combiner-

Average of (3,4,5) = 4

Average of (6, 8) = 7

Then in reduce function– Average of (4, 7) = 5.5

In this example you can see with combiner the result is different, so you’ll have to write your logic in such a way that even if combiner is used the result should be identical.

MapReduce Example using combiner

Here is a MapReduce example where the max price per stock symbol is calculated using MapReduce. Input file has tab separated data comprising of Stock symbol and price.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Map function
  public static class MaxStockPriceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      
    private final static IntWritable one = new IntWritable(1);
    private Text symbol = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      symbol.set(stringArr[0]);
      Integer price = Integer.parseInt(stringArr[1]);
      context.write(symbol, new IntWritable(price));
    }
  }
	
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {

      int maxValue = Integer.MIN_VALUE;
      for (IntWritable val : values) {
        maxValue = Math.max(maxValue, val.get());
      }      
      context.write(key, new IntWritable(maxValue));
    }
  }
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxStockPriceMapper.class);    
    job.setReducerClass(MaxStockPriceReducer.class);		
    //job.setCombinerClass(MaxStockPriceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

As you can see initially the line where combiner class is set is commented. If you run this MapReduce job without specifying any combiner and see the counters in the console.

Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=106
Reduce input records=10
Reduce output records=2
Spilled Records=20

Now the line setting the combiner is uncommented and the MapReduce job is run again now the same counters are as follows.

Combine input records=10
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4

So you can see combiner itself minimized the data sent to reducer and the shuffled bytes are also reduced in the process.

That's all for the topic Combiner in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment