December 23, 2023

Mapper Only Job in Hadoop MapReduce

Generally when we think of MapReduce job in Hadoop we think of both mappers and reducers doing their share of processing. That is true for most of the cases but you can have scenarios where you want to have a mapper only job in Hadoop.

When do you need map only job

You may opt for a map only job in Hadoop when you do need to process the data and get the output as (key, value) pairs but don’t want to aggregate those (key, value) pairs.

For example– If you are converting a text file to sequence file using MapReduce. In this case you just want to read a line from text file and write it to a sequence file so you can opt for a MapReduce with only map method.

Same way if you are converting a text file to parquet file using MapReduce you can opt for a mapper only job in Hadoop.

What you need to do for mapper only job

For a mapper only job you need to write only map method in the code, which will do the processing. Number of reducers is set to zero.

In order to set the number of reducers to zero you can use the setNumReduceTasks() method of the Job class. So you need to add the following in your job configuration in your MapReduce code driver.

job.setNumReduceTasks(0);

Benefits of Mapper only job

As already stated if you just want to process the data with out any aggregation then better go for a mapper only job as you can save on some of the processing done internally by the Hadoop framework.

Since reducer is not there so no need of shuffle and sort phase also transferring of data to the nodes where reducers are running is not required.

Also note that in a MapReduce job the output of map phase is written to the local disk on the node rather than to HDFS. Where as, in the case of Mapper only job, Map output is written to the HDFS.

Mapper only job in Hadoop example

If you have to convert a text file to sequence file that can be done using only a map function, you can set the number of reducers to zero.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileWriter extends	Configured implements Tool{
  // Map function
  public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
  public static void main(String[] args)  throws Exception{
    int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
    System.exit(exitFlag);	   
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "sfwrite");
    job.setJarByClass(SequenceFileWriter.class);
    job.setMapperClass(SFMapper.class);
    // Setting reducer to zero
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // Compression related settings
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    int returnFlag = job.waitForCompletion(true) ? 0 : 1;
    return returnFlag;
  }
}

You can run the MapReduce job using the followng command.

$ hadoop jar /pathto/jar/knpcodehadoop.jar org.knpcode.SequenceFileWriter /user/input/count /user/output/seq

By listing the output directory you can see that a sequence file is created.

hdfs dfs -ls /user/output/seq

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2018-06-14 12:26 /user/output/seq/_SUCCESS
-rw-r--r--   1 knpcode supergroup        287 2018-06-14 12:26 /user/output/seq/part-m-00000

That's all for the topic Mapper Only Job in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment