December 18, 2023

Counters in Hadoop MapReduce

Counters in Hadoop MapReduce help in getting statistics about the MapReduce job. With counters in Hadoop you can get general information about the executed job like launched map and reduce tasks, map input records, use the information to diagnose if there is any problem with data, use information provided by counters to do some performance tuning, as example from counters you get information about spilled records and memory used, using that information you can try to fine tune your job.

Types of counters in Hadoop

With in Hadoop there are many built-in counters for the MapReduce job that are displayed on the console after running the job or you can use UI to analyze those counters.

You can also have user defined counters. So there are two types of counters in Hadoop.

  1. Built-in counters
  2. User defined counters

Built-in counters in Hadoop

Built-in counters in Hadoop can be divided into following groups, these counters are defined as Enum in the Hadoop framework.

  1. File System Counters- org.apache.hadoop.mapreduce.FileSystemCounter
  2. Map-Reduce Framework Counters- org.apache.hadoop.mapreduce.TaskCounter
  3. File Input Format Counters- org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
  4. File Output Format Counters- org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
  5. Job Counters- org.apache.hadoop.mapreduce.JobCounter

File System Counters in Hadoop

  • Number of bytes read (BYTES_READ)- Shows the number of bytes read by Map and Reduce tasks. There will be a separate entry for each file system. As example if bytes are read from both local file system and HDFS then there will be two entries prefixed with FILE: and HDFS:.
  • Number of bytes written (BYTES_WRITTEN)- Shows the number of bytes written by Map and Reduce tasks.
  • Number of read operations (READ_OPS)- Shows the number of read operations (like opening a file) by both Map and Reduce tasks.
  • Number of large read operations (LARGE_READ_OPS)- Shows the number of large operations (like going through a large directory structure) by both Map and Reduce tasks.
  • Number of write operations (WRITE_OPS)- Shows the number of write operations (like creating a file, appending to it) by both Map and Reduce tasks.

Map-Reduce Framework Counters

  • Map input records (MAP_INPUT_RECORDS)- The number of records processed by all the maps.
  • Map output records (MAP_OUTPUT_RECORDS)- The number of output records emitted by all the maps.
  • Map skipped records (MAP_SKIPPED_RECORDS)– The number of records skipped by all the maps.
  • Map output bytes (MAP_OUTPUT_BYTES)- Output of all the maps in bytes.
  • Map output materialized bytes (MAP_OUTPUT_MATERIALIZED_BYTES)- Output bytes written to the disk.
  • Input split bytes (SPLIT_RAW_BYTES)- Metadata about the input splits in bytes.
  • Combine input records (COMBINE_INPUT_RECORDS)- The number of input records processed by combiner.
  • Combine output records (COMBINE_OUTPUT_RECORDS)- The number of output records emitted by combiner.
  • Reduce input groups (REDUCE_INPUT_GROUPS)- The number of key groups processed by all the Reducers.
  • Reduce shuffle bytes (REDUCE_SHUFFLE_BYTES)- Map output copied to Reducers in bytes.
  • Reduce input records (REDUCE_INPUT_RECORDS)- The number of input records processed by all the Reducers.
  • Reduce output records (REDUCE_OUTPUT_RECORDS)- The number of output records emitted by all the Reducers.
  • Reduce skipped records (REDUCE_SKIPPED_RECORDS)- The number of records skipped by Reducer.
  • Spilled Records (SPILLED_RECORDS)- The number of records spilled to the disk.
  • Shuffled Maps (SHUFFLED_MAPS)- The number of map output files copied to nodes where reducers are running.
  • Failed Shuffles (FAILED_SHUFFLE)- The number of map output files failed during shuffle.
  • Merged Map outputs (MERGED_MAP_OUTPUTS)- The number of map outputs merged to create input for the Reducers.
  • GC time elapsed (GC_TIME_MILLIS)- Time spent in garbage collection.
  • CPU time spent (CPU_MILLISECONDS)- CPU time spent for task processing.
  • Physical memory snapshot (PHYSICAL_MEMORY_BYTES)- Total physical memory used.
  • Virtual memory snapshot (VIRTUAL_MEMORY_BYTES)- Total virtual memory used.
  • Total committed heap usage (COMMITTED_HEAP_BYTES)- Total amount of heap memory available.

File Input Format Counters in Hadoop

  • Bytes Read (BYTES_READ)– Bytes read by Map tasks using the Input format used for the task.

File Output Format Counters in Hadoop

  • Bytes Written (BYTES_WRITTEN)- Bytes written by Map and reduce tasks using the Output format used for the task.

Job Counters in Hadoop

  • Launched map tasks (TOTAL_LAUNCHED_MAPS)- Total number of launched map tasks.
  • Launched reduce tasks (TOTAL_LAUNCHED_REDUCES)- Total number of launched reduce tasks.
  • Failed map tasks (NUM_FAILED_MAPS)- The number of failed map tasks.
  • Failed reduce tasks (NUM_FAILED_REDUCES)- The number of failed reduce tasks.
  • Killed map tasks (NUM_KILLED_MAPS)- The number of killed map tasks.
  • Killed reduce tasks (NUM_KILLED_REDUCES)- The number of killed reduce tasks.
  • Data-local map tasks (DATA_LOCAL_MAPS)- The number of map taks running on the same node where the data they process also resides.
  • rack-local map tasks (RACK_LOCAL_MAPS)- The number of map taks running on the node in the rack where the data they process also resides.
  • Launched uber tasks (TOTAL_LAUNCHED_UBERTASKS)- Total number of launched uber tasks.
  • Map in uber tasks (NUM_UBER_SUBMAPS)- The number of maps run as uber task.
  • Reduce in uber tasks (NUM_UBER_SUBREDUCES)- The number of reduces run as uber task.
  • failed uber tasks (NUM_FAILED_UBERTASKS)- The number of failed uber tasks.
  • Total time spent by all map tasks (ms) (MILLIS_MAPS)- Time spent in running all the map tasks.
  • Total time spent by all reduce tasks (ms) (MILLIS_REDUCES)- Time spent in running all the reduce tasks.
  • Total vcore-milliseconds taken by all map tasks (VCORES_MILLIS_MAPS)- Total Vcore time taken by all map tasks.
  • Total vcore-milliseconds taken by all reduce tasks (VCORES_MILLIS_REDUCES)- Total Vcore time taken by all reduce tasks.

As you can see from the description of the counters; File System Counters, Map-Reduce Framework Counters, File Input Format Counters, File Output Format Counters are providing statistics about the tasks in the MapReduce job. On the other hand Job counter provides statistics about the overall job.

User defined counters in Hadoop

You can also create user defined counter in Hadoop MapReduce. Using counters also help with debugging as you can create a counter and increment it for some condition and then check the counter output that will also give you an idea if there is anything wrong with the data.

For creating a counter you can use Java enum. Each field in an enum is a counter name where as enum is a group these counters belong to.

User defined counter Hadoop MapReduce example

As example if you have data about stock symbol, price and number of transactions and you want to check the records where transactions are missing you can create a counter in MapReduce to do that.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockData extends Configured implements Tool{
  enum Stock {
    TRANSACTION_MISSING
  }
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    Integer trans = 0;
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      if(stringArr[2] != null && !stringArr[2].trim().equals("")) {
        trans = Integer.parseInt(stringArr[2]);
      }else {
        // incrementing counter
        context.getCounter(Stock.TRANSACTION_MISSING).increment(1);
        trans = 0;
      }      
        context.write(symbol, new IntWritable(trans));
     }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockData(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "Stock data");
    job.setJarByClass(getClass());
    job.setMapperClass(StockFieldMapper.class);    
    job.setReducerClass(TotalTransReducer.class);	 
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

Then in the counters that are displayed you would see some thing similar to as follows-

org.knpcode.StockData$Stock
	TRANSACTION_MISSING=3

That's all for the topic Counters in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment