How MapReduce Works in Hadoop

In the post WordCount MapReduce program we have seen how to write a MapReduce program in Java, create a jar and run it. There are a lot of things that you do to create a MapReduce job and Hadoop framework also do a lot of processing internally. In this post we’ll see in detail how MapReduce works in Hadoop internally using the word count MapReduce program as example.

What is MapReduce

Hadoop MapReduce is a framework for writing applications that can process huge data in parallel, by working on small chunks of data in parallel on cluster of nodes. The framework ensures that this distributed processing happens in a reliable, fault-tolerant manner.

Map and Reduce

A MapReduce job in Hadoop consists of two phases-

  • Map phase– It has a Mapper class which has a map function specified by the developer. The input and output for Map phase is a (key, value) pair. When you copy the file that has to be processed to HDFS it is split into independent chunks. Hadoop framework creates one map task for each chunk and these map tasks run in parallel.
  • Reduce phase- It has a Reducer class which has a reduce function specified by the developer. The input and output for Reduce phase is also a (key, value) pair. The output of Map phase after some further processing by Hadoop framework (known as sorting and shuffling) becomes the input for reduce phase. So the output of Map phase is the intermediate output and it is processed by Reduce phase to generate the final output.

Since input and output for both map and reduce functions are key, value pair so if we say input for map is (K1, V1) and output is (K2, V2) then map function input and output will have the following form-

(K1, V1) -> list(K2, V2)

The intermediate output of the map function goes through some further processing with in the framework, known as shuffle and sort phase, before inputting to reduce function. The general form for the reduce function can be depicted as follows-

(K2, list(V2)) -> list(K3, V3)

Here note that the types of the reduce input matches the types of map output.

MapReduce explanation with example

Let’s take Word count MapReduce code as example and see what all happens in both Map and Reduce phases and how MapReduce works in Hadoop.

When we put the input text file into HDFS it is split into chunks of data. For simplicity sake let’s say we have two lines in the file and it is split into two parts with each part having one line.

If the text file has following two lines-

This is a test file
This is a Hadoop MapReduce program file

Then there will be two splits and two map tasks will get those two splits as input.

Mapper class

// Map function
public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
    // Splitting the line on spaces
    String[] stringArr = value.toString().split("\\s+");
    for (String str : stringArr) {
      word.set(str);
      context.write(word, one);
    }
  }
}

In the Mapper class you can see that it has four parameters first two specify the input to the map function and other to specify the output of the map function.

In this Word count program input key value pair will be as follows-

key- byte offset into the file at which the line starts.

Value– Content of the line.

As we assumed there will be two splits (each having one line of the file) and two map tasks let’s say Map-1 and Map-2, so input to Map-1 and Map-2 will be as follows.

Map-1– (0, This is a test file)

Map-2– (0, This is a Hadoop MapReduce program file)

Logic in map function is to split the line on spaces and the write each word to the context with value as 1.

So output from Map-1 will be as follows-

(This, 1)
(is, 1)
( a, 1)
(test, 1)
(file, 1)

And output from Map-2 will be as follows-

(This, 1)
(is, 1)
(a, 1)
(Hadoop, 1)
(MapReduce, 1)
(program, 1)
(file, 1)
Reducer class
// Reduce function
public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{	   
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

In the Reducer class again there are four parameters two for input types and two for output types of the reduce function.

Note that input type of the reduce function must match the output types of the map function.

This intermediate output from Map will be further processed by the Hadoop framework in the shuffle phase where it will be sorted and grouped as per keys, after this internal processing input to reduce will look like this-

[Hadoop, (1)]
[MapReduce, (1)]
[This, (1, 1)]
[a, (1, 1)]
[file, (1, 1)]
[is, (1, 1)]
[program, (1)]
[test, (1)]

You can see that the input to the reduce function is in the form (key, list(values)). In the logic of the reduce function, for each key value pair list of values is iterated and values are added. That will be the final output.

Hadoop 1
MapReduce 1
This 2
a 2
file. 2
is 2
program 1
test 1

Related Posts

That's all for the topic How MapReduce Works in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Hadoop MapReduce Word Count Program

Once you have installed Hadoop on your system and initial verification is done you would be looking to write your first MapReduce program. Before digging deeper into the intricacies of MapReduce programming first step is the word count MapReduce program in Hadoop which is also known as the "Hello World" of the Hadoop framework.

So here is a simple Hadoop MapReduce word count program written in Java to get you started with MapReduce programming.

What you need

  1. It will be good if you have any IDE like Eclipse to write the Java code.
  2. A text file which is your input file. It should be copied to HDFS. This is the file which Map task will process and produce output in (key, value) pairs. This Map task output becomes input for the Reduce task.

Process

These are the steps you need for executing your Word count MapReduce program in Hadoop.

  1. Start daemons by executing the start-dfs and start-yarn scripts.
  2. Create an input directory in HDFS where you will keep your text file.
    bin/hdfs dfs -mkdir /user
    
    bin/hdfs dfs -mkdir /user/input
    
  3. Copy the text file you created to /usr/input directory.
    bin/hdfs dfs -put /home/knpcode/Documents/knpcode/Hadoop/count /user/input
    

    I have created a text file called count with the following content

    This is a test file.
    This is a test file.
    

    If you want to verify that the file is copied or not, you can run the following command-

    bin/hdfs dfs -ls /user/input
    
    Found 1 items
    -rw-r--r--   1 knpcode supergroup         42 2017-12-22 18:12 /user/input/count
    

Word count MapReduce Java code

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
  // Map function
  public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, one);
      }       
    }
  }
	
  // Reduce function
  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
	
  public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordMapper.class);    
    job.setReducerClass(CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

You will need at least the given jars to compile your MapReduce code, you will find them in the share directory of your Hadoop installation.

Word count MapReduce program jars

Running the word count MapReduce program

Once your code is successfully compiled, create a jar. If you are using eclipse IDE you can use it to create the jar by Right clicking on project – export – Java (Jar File)

Once jar is created you need to run the following command to execute your MapReduce code.

bin/hadoop jar /home/knpcode/Documents/knpcode/Hadoop/wordcount.jar org.knpcode.WordCount /user/input /user/output

In the above command

/home/knpcode/Documents/knpcode/Hadoop/wordcount.jar is the path to your jar.

org.knpcode.WordCount is the fully qualified name of Java class that you need to run.

/user/input is the path to input file.

/user/output is the path to output

In the java program in the main method there were these two lines-

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

That’s where input and output directories will be set.

To see an explanation of word count MapReduce program working in detail, check this post- How MapReduce Works in Hadoop

After execution you can check the output directory for the output.

bin/hdfs dfs -ls /user/output

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2017-12-22 18:15 /user/output/_SUCCESS
-rw-r--r--   1 knpcode supergroup         31 2017-12-22 18:15 /user/output/part-r-00000

The output can be verified by listing the content of the created output file.

bin/hdfs dfs -cat /user/output/part-r-00000
This	2
a	2
file.	2
is	2
test	2

Related Posts

That's all for the topic Hadoop MapReduce Word Count Program. If something is missing or you have something to share about the topic please write a comment.


You may also like

What is Big Data

Big Data means a very large volume of data. Term big data is used to describe data so huge and ever growing that has gone beyond the storage and processing capabilities of traditional data management and processing tools.

Some Examples

What to do with Big Data

Giving such examples of having petabytes of data is fantastic but the question is what to do with that kind of data. Big Data is not just examples of huge volume of data generation. One aspect of Big Data is to come up with technologies to store such huge data but another, and more important aspect, is to be able to analyze that data and use it to make business decisions faster, more accurately, to have more understanding of consumer behavior.

Data in Big Data

Data in Big Data can be any type of data; structured, semi-structured, unstructured such as text, video, audio, sensor data, log files etc.

  1. Structured data– Any data that is organized in a format that is fixed can be termed as structured data such as data stored in relational databases or in spread sheet. For creating structured data you will have predefined rules on what type of data will be stored and how that data will be stored.
  2. Semi-structured data– Any data that doesn’t confirm to the rigid structure associated with the structured data but still have some structure like having tags or other markers to separate and identify different elements and have hierarchies of records and fields with in the data can be termed as semi-structured data. As example– XML,  JSON.
  3. Unstructured data– As the name suggests unstructured data is exact opposite of structured data which means it doesn’t confirm to any predefined rules in terms of type of data and field positions with in a file or record. Unstructured data usually include multiple types of data where you may have a combination of text, videos, images that too in no defined manner. Examples of unstructured data are books, any web page, email message etc. Because of it’s not fitting to any defined format it becomes very difficult to analyze unstructured data.

3 Vs of Big Data

Big Data can be described by following characteristics-

  • Volume– This characteristic refers to the volume of data that is generated and stored. It’s the size of data that determines the potential insight that can be derived from it and even determines whether the data can actually be considered as big data or not.
  • Velocity– This characteristic refers to the speed at which data is generated and processed. As example- Processing trade data created each day in a stock exchange to identify potential fraud. Analyzing click stream data of a consumer in real time to provide consumer with suitable alternatives or products.
  • Variety- This characteristic refers to the type and nature of the data. Data may be structured, unstructured, semi-structured. Analyzing all these types of data together provide better insights.

These 3 Vs are expanded and now even termed as 5 Vs to add new characteristics to Big Data.

  • Variability– This characteristic refers to the inconsistency of the data flow. There may be some peak times when data flow is quite huge which may render the processes in place, to handle and manage data, ineffective.
  • Veracity- This characteristic refers to the quality of data collected from multiple sources.

Some Big Data technologies

Some of the Big data technologies for storing and analyzing big data are-

  • Apache Hadoop– Actually over the years Hadoop has grown to have a whole ecosystem of related technologies like Hadoop, HDFS, Hive, PIG even Apache Spark.
  • NoSQL Databases- For storing unstructured data and providing very fast performance. Some of the NoSQL databases are MongoDB, Cassandra, Hbase.
  • Presto– Developed by Facebook, Presto is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes.

Related Posts

That's all for the topic What is Big Data. If something is missing or you have something to share about the topic please write a comment.


You may also like

How to Improve Map-Reduce Performance

In this post we’ll see some of the ways to improve performance of the Map-Reduce job in Hadoop.

The tips given here for improving the performance of MapReduce job are more from the MapReduce code and configuration perspective rather than cluster and hardware perspective.

1- Enabling uber mode– Like Hadoop 1 there is no JVM resuse feature in YARN Hadoop but you can enable the task to run in Uber mode, by default uber is not enabled. If uber mode is enabled ApplicationMaster can calculate that the overhead of negotiating resources with ResourceManager, communicating with NodeManagers on different nodes to launch the containers and running the tasks on those containers is much more that running MapReduce job sequentially in the same JVM, it can run a job as uber task.

2- For compression try to use native library- When using compression and decompression in Hadoop it is better to use native library as native library will outperform codec written in programming language like Java.

3- Increasing the block size- In case input file is of very large size you can consider improving the hdfs block size to 512 M. That can be done by setting the parameter dfs.blocksize. If you set the dfs.blocksize to a higher value input split size will increase to same size because the input size is calculated using the formula.

Math.max(mapreduce.input.fileinputformat.split.minsize, Math.min(mapreduce.input.fileinputformat.split.maxsize, dfs.blocksize))

thus making it of same size as HDFS block size. By increasing the block size you will have less overhead in terms of metadata as there will be less number of blocks.

If input split is larger, Map tasks will get more data to process. In Hadoop as many map tasks are started as there are input splits so having less input splits means the overhead to initialize map tasks is reduced.

4- Time taken by map tasks- A map task should run for at least a minute (1-3 minutes) if it is finishing with in less than a minute that means input data to a map task is less. If there are many small files in your map reduce job then try to use a container file format like Sequence file or Avro that contains those small files.

You can also use CombineFileInputFormat which put many files into an input split so that there is more data for mapper to process.

5- Input data compression is splittable or not- If input data is compressed then the compression format used is splittable or not is also one of the thing to consider. If input data is not splittable there would only be a single split processed by a single map task making the processing very slow and no parallelism at all.

For compressing input data compress using bzip2 which is splittable or using lzo with indexing to make it splittable.

6- Setting number of reduce tasks- The number of maps is usually driven by the number of input splits but number of reducers can be controlled. As per the documentation; the right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

7- Data skew at reducer side- If data is skewed in such a way that more values are grouped with a single key rather than having an even distribution of values then reduce tasks which process keys with more values will take more time to finish where as other reducers will get less data because of the uneven distribution and finish early.

In this type of scenario try to analyze the partition of data and look at the possibility of writing a custom partitioner so that data is evenly distributed among keys.

8- Shuffle phase performance improvements- Shuffle phase in Hadoop framework is very network intensive as files are transferred from mappers to reducers. There is lots of IO involve as map output is written to local disk, there is lots of processing also in form of partitioning the data as per reducers, sorting data by keys, merging.

Optimization for reducing the shuffle phase time helps in reducing the overall job time. Some of the performance improvement tips are as follows-

  • Compressing the map output- Since Map output is written to disk and also transferred to the reducer, compressing map output saves storage space, makes it faster to write to disk and reduces data that has to be transferred to reducer node.
  • Filtering data- See how you can cut down on data emitted by Map tasks. Filter the records to remove unwanted records entirely. Also, reduce the record size by taking only the relevant record fields.
  • Using Combiner- Using combiner in MapReduce is a good way to improve performance of the overall MapReduce job. By using combiner you can aggregate data in the map phase itself and reduce the number of records sent to the reducer.
  • Raw Comparator- During sorting and merging Hadoop framework uses comparator to compare keys. If you are using a custom comparator then try to write it to be a raw comparator so that comparison can be done at the byte level itself. Otherwise keys in the map tasks are to be deserialized to create an object and then compare making the process time consuming.
  • Setting parameters with optimum values- Another action you can take to improve performance of the MapReduce job is to change values of some of the configuration parameters.

    Your goal is to reduce the records spilled to disk at map as well as reduce side. At map side you can change the setting for the following parameters to try to reduce the number of spills to disk.

    • mapreduce.task.io.sort.mb- The total amount of buffer memory to use while sorting files, in megabytes.
    • mapreduce.map.sort.spill.percent- The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.At reduce side you can change the setting for the following parameters to try to keep data in memory itself.
    • mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
    • mapreduce.reduce.input.buffer.percent- The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce.
    • mapreduce.reduce.shuffle.memory.limit.percent- Maximum percentage of the in-memory limit that a single shuffle can consume.

9-Improvements in MapReduce coding- You should also optimize your MapReduce code so that it runs efficiently.

  • Reusing objects- Since map method is called many times so creating new objects judiciously will help you to reduce overhead associated with object creation. Try to reuse objects as much as you can. One of the mistake which is very frequent is writing code as follows.
    String[] stringArr = value.toString().split("\\s+");
    Text value = new Text(stringArr[0]);
    context.write(key, value);
    

    You should write it as following-

    private Text value = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      String[] stringArr = value.toString().split("\\s+");
      value.set(stringArr[0]);// reusing object
      context.write(key, value);
    }
    
  • String concatenation- Since String in Java is immutable so String concatenation results in String object creation. For appending prefer StringBuffer or StringBuilder instead.
Related Posts

That's all for the topic How to Improve Map-Reduce Performance. If something is missing or you have something to share about the topic please write a comment.


You may also like

OutputCommitter in Hadoop MapReduce

In Hadoop framework distributed processing happens where map and reduce tasks are spawned on different nodes and process part of the data. In this type of distributed processing it is important to ensure that framework knows when a particular task finishes or there is a need to abort the task and when does the over all job finish. For that purpose, like many other distributed systems, Hadoop also uses commit protocol. Class that implements it in Hadoop is OutputCommitter.

OutputCommitter class in Hadoop describes the commit of task output for a Map-Reduce job. In Hadoop 2 OutputCommitter implementation can be set using the getOutputCommitter() method of the OutputFormat class. FileOutputCommitter is the default OutputCommitter. Note that OutputCommitter is an abstract class in Hadoop framework which can be extended to provide OutputCommitter implementation.

Tasks performed by OutputCommitter in Hadoop Map-Reduce

  1. 1-Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. For that setupJob() method is used. This is called from the application master process for the entire job. This will be called multiple times, once per job attempt.
  2. 2- Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion.

    Cleaning up is done when either commitJob() or abortJob() method is called.

    • commitJob() method is invoked for jobs with final runstate as SUCCESSFUL. This is called from the application master process for the entire job. This method is guaranteed to only be called once to maintain atomicity.
    • abortJob() method is invoked for jobs with final runstate as JobStatus.State.FAILED or JobStatus.State.KILLED. This is called from the application master process for the entire job. This may be called multiple times.
  3. Setup the task temporary output. This is done by invoking setupTask() method. This method is called from each individual task's process that will output to HDFS, and it is called just for that task. This may be called multiple times for the same task, but for different task attempts.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
    Checking if the task needs commit is done using needsTaskCommit() method. This method returning false means commit phase is disabled for the tasks.
  5. Commit of the task output. During this step task's temporary output is promoted to final output location.
    Method used is commitTask(). There may be multiple task attempts for the same task, Hadoop framework ensures that the failed task attempts are aborted and only one task is committed.
  6. Discard the task commit. If a task doesn't finish abortTask() method is called. This method may be called multiple times for the same task, but for different task attempts.

Related Posts

That's all for the topic OutputCommitter in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

GenericOptionsParser And ToolRunner in Hadoop

When you run MapReduce program from command line you provide the jar name, the class that has the code, input and output paths in HDFS. That’s the bare minimum you have to provide to run a MapReduce job. There may be other configurations that you can set with in your driver class using conf.set() method. But there is a drawback to setting configurations with in the code, any configuration change would require code change, repackaging the jar and then run it. To avoid that you can opt to provide configurations through the command line at the time of execution. For that purpose you can use GenericOptionsParser class in Hadoop.

GenericOptionsParser class in Hadoop

GenericOptionsParser class is a utility class with in the org.apache.hadoop.util package. This class parses the standard command line arguments and sets them on a configuration object which can then be used with in the application.

The conventional way to use GenericOptionsParser class is to implement Tool interface and then use ToolRunner to run your application. ToolRunner internally uses GenericOptionsParser to parse the generic Hadoop command line arguments and then modify the Configuration of the Tool by setting the command line arguments.

Supported generic options

Options that are supported by ToolRunner through GenericOptionsParser are as follows-

  • -conf <configuration file>- Specify an application configuration file. So you can prepare an XML file and set it using -conf option that way you can set many properties at once.
  • -D <property>=<value>- Sets value for given property. Specifying a property with -D option will override any property with the same name in the configuration file or with in the driver code.
  • -fs <> or <hdfs://namenode:port>- This generic option is used to specify default filesystem URL to use. Overrides ‘fs.defaultFS’ property from configurations.
  • -jt <local> or <resourcemanager:port>- Used to set YARN ResourceManager.
  • -files <comma separated list of files>- Specify comma separated files to be copied to the map reduce cluster. Applies only to job. If you want to add a file to distributed cache then rather than hardcoding it with in your driver code by using job.addCacheFile() method you can specify it using -files generic option.
  • -libjars <comma seperated list of jars>- Specify comma separated jar files to include in the classpath. Applies only to job. If you want to add a jar to distributed cache then rather than hardcoding it with in your driver by using job.addFileToClassPath() method you can specify it using -libjars generic option.
  • -archives <comma separated list of archives>- Specify comma separated archives to be unarchived on the compute machines. Applies only to job. If you want to add an archived file (zip, tar and tgz/tar.gz files) then rather than hard coding it with in your driver by using job.addCacheArchive() method you can specify it using -libjars generic option.

Examples using generic options

1- If you want to set a configuration file.

hadoop jar test.jar com.knpcode.MyClass -conf hadoop/conf/my-hadoop-config.xml /inputfile /outputfile

2- If you want to set value for any configuration. As example setting the number of reducers to 10.

hadoop jar test.jar com.knpcode.MyClass -D mapreduce.job.reduces=10 /inputfile /outputfile
Note that mapred.reduce.tasks property is deprecated, mapreduce.job.reduces property should be used instead.

3- Setting files, jars and archived file in distributed cache.

hadoop jar test.jar com.knpcode.MyClass -files /input/test.txt -libjars /lib/test.jar /inputfile /outputfile
Related Posts

That's all for the topic GenericOptionsParser And ToolRunner in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

How to Chain MapReduce Job in Hadoop

In many scenarios you would like to create a sequence of MapReduce jobs to completely transform and process the data. This is better than putting every thing in a single MapReduce job job and making it very complex.

In fact you can get your data through various sources and use a sequence of various applications too. That can be done by creating a work flow using Oozie but that is a topic for another post. In this post we’ll see how to chain MapReduce job in Hadoop using ChainMapper and ChainReducer.

ChainMapper in Hadoop

ChainMapper is one of the predefined MapReduce class in Hadoop. ChainMapper class allows you to use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion where the output of the first mapper becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

You can add mappers to a ChainMapper using addMapper() method.

ChainReducer in Hadoop

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

To add a Mapper class to the chain reducer you can use addMapper() method.

To set the Reducer class to the chain job you can use setReducer() method.

Chaining MapReduce job

Using the ChainMapper and the ChainReducer classes it is possible to compose MapReduce jobs that look like [MAP+ / REDUCE MAP*].

When you are using chained MapReduce you can have a combination as follows-

  1. One or more mappers
  2. Single Reducer
  3. Zero or more mappers (optional and to be used only if chained reducer is used)

When you are using chained MapReduce job the data from mappers or reducer is stored (and used) in the memory rather than on disk that reduces the disk IO to a large extent.

MapReduce chaining example

There is data of stocks with stock symbol, price and transaction in a day in the following format.

AAA		23	5677
BBB		23	12800
aaa		26	23785
.....
.....

In the data symbols are not always in the uppercase. So there are two mappers, in first relevant fields are extracted (symbol and transaction). In the second mapper symbols are converted to upper case.

Then there is a reducer that adds the transaction per symbol. Then with in the reduce task there is an InverseMapper that inverses the key, value pair. Note that InverseMapper is a predefined Mapper class with in the Hadoop framework that is why there is no implementation of it in the example code.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockTrans extends Configured implements Tool{
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      Integer trans = Integer.parseInt(stringArr[2]);
      context.write(symbol, new IntWritable(trans));
    }
  }
	
  // Mapper 2
  public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
    public void map(Text key, IntWritable value, Context context) 
        throws IOException, InterruptedException {
    
      String symbol = key.toString().toUpperCase();       
      context.write(new Text(symbol), value);
    }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }	

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockTrans(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock transactio");
    job.setJarByClass(getClass());
    // MapReduce chaining
    Configuration map1Conf = new Configuration(false);
    ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
        Text.class, IntWritable.class,  map1Conf);
    
    Configuration map2Conf = new Configuration(false);
    ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
           Text.class, IntWritable.class, map2Conf);
    
    Configuration reduceConf = new Configuration(false);		
    ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

On running this code after creating the jar.

hadoop jar /home/knpcode/Documents/knpcode/knpcodehadoop.jar org.knpcode.StockTrans /user/input/StockTrans.txt /user/output/stock
Output
hdfs dfs -cat /user/output/stock/part-r-00000

50483	AAA
180809	BBB
Related Posts

That's all for the topic How to Chain MapReduce Job in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Distributed Cache in Hadoop

In this post we’ll see what Distributed cache in Hadoop is.

What is a distributed cache

As the name suggests distributed cache in Hadoop is a cache where you can store a file (text, archives, jars etc.) which is distributed across the nodes where mappers and reducers for the MapReduce job are running. That way the cached files are localized for the running map and reduce tasks.

Methods for adding the files in Distributed Cache

There is a DistributedCache class with relevant methods but the whole class is deprecated in Hadoop2. You should be using the methods in Job class instead.

  • public void addCacheFile(URI uri)- Add a file to be localized.
  • public void addCacheArchive(URI uri)- Add archives to be localized.
  • public void addFileToClassPath(Path file)- Adds file path to the current set of classpath entries. It adds the file to cache as well. Files added with this method will not be unpacked while being added to the classpath.
  • public void addArchiveToClassPath(Path archive)- Adds an archive path to the current set of classpath entries. It adds the archive to cache as well. Archive files will be unpacked and added to the classpath when being distributed.

How to use distributed cache

In order to make available a file through distributed cache in Hadoop.

  1. Copy the file you want to make available through distributed cache to HDFS if it is not there already.
  2. Based on the file type use the relevant method to add it to distributed cache.

As example if you want to add a text file to distributed cache then you can use the following statement in your driver class.

job.addCacheFile(new URI("/user/input/test.txt#test"));

If you want to add a jar to the class path then you can do it as follows-

job.addFileToClassPath(new Path("/myapp/mylib.jar"));

Distributed cache example MapReduce code

Here is an Avro MapReduce word count example program. Output file is an Avro data file which uses an Avro schema. This Avro schema is added to the distributed cache using the addCacheFile() method and used by the mappers and reducers.

import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroWordCount extends Configured implements Tool{
	
  // Map function
  public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey, AvroValue>{
    private Text word = new Text();
    private GenericRecord record;
     
    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      Schema AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
      record = new GenericData.Record(AVRO_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        // creating Avro record
        record.put("word", str);
        record.put("count", 1);
        context.write(new AvroKey(word), new AvroValue(record));
      }
    }
  }
	
  // Reduce function
  public static class AvroWordReducer extends Reducer<AvroKey, AvroValue,
      AvroKey, NullWritable>{	  
    Schema AVRO_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
    }
    public void reduce(AvroKey key, Iterable<AvroValue> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue value : values) {
        GenericRecord	record = value.datum();
        sum += (Integer)record.get("count");
      }
      GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
      record.put("word", key.datum());
      record.put("count", sum);
      context.write(new AvroKey(record), NullWritable.get());
    }
  }

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroWordCount(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "AvroWC");
    job.setJarByClass(getClass());
    job.setMapperClass(AvroWordMapper.class);    
    job.setReducerClass(AvroWordReducer.class);
    // Name after the # sign in the file location
    // will be used as the file name in Mapper/Reducer
    job.addCacheFile(new URI("/user/input/wcschema.avsc#wcschema"));
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    FileSystem fs = FileSystem.get(conf);
    // Need schema file stored in HDFS here also
    Path path = new Path("/user/input/wcschema.avsc".toString());
    Schema sc = new Schema.Parser().parse((fs.open(path)));
    AvroJob.setMapOutputValueSchema(job, sc);
    AvroJob.setOutputKeySchema(job,	sc);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

Related Posts

That's all for the topic Distributed Cache in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Combiner in Hadoop MapReduce

This post shows what is combiner in Hadoop MapReduce and how combiner function can be used to reduce the overall memory, I/O and network requirement of the overall MapReduce execution.

Why is combiner needed in MapReduce

When a MapReduce job is executed and the mappers start producing output a lot of processing happens with in the Hadoop framework knows as the shuffling and sorting phase.

Map output is partitioned based on the number of reducers, those partitions are also sorted and then written to local disk.

Then the data, from the nodes where maps are running, is transferred to the nodes where reducers are running. Since a single reducer will get its input from several mappers so all that data from several maps is transferred to the reducer and merged again to form the complete input for the reduce task.

As you can see all this processing requires memory, network bandwidth and I/O. That is where combiner in Hadoop can help by minimizing the data that is sent to the reducer.

Combiner function in MapReduce

Combiner in Hadoop is an optimization that can aggregate data at the map-side itself. Combiner function runs on the map output, aggregates the data (so the data size becomes less) and the output of combiner function becomes the input for reduce task. Note that using combiner is optional.

Most of the times you will use your Reducer class as the combiner class too. If you are not, then also your Combiner class implementation must extend the Reducer and implement the reduce method.

Since combiner has the same semantics as reducer so the input and output types follows the same requirement. In a MapReduce job reduce input types must match the map output types, same way combiner input types must match the map output types. Since output of combiner becomes the input to reducer so output types of combiner must match the reduce input types.

combiner in hadoop

For example– Suppose you are trying to get maximum price for a stock. There are two input splits which are processed by two different maps.

Split 1
AAA		23
AAA		26
AAA		21
AAA		19
Split 2-
AAA		27
AAA		28
AAA		25
AAA		24
Output of Map-1
(AAA, 23)
(AAA, 26)
(AAA, 21)
(AAA, 19)
Output of Map-2
(AAA, 27)
(AAA, 28)
(AAA, 25)
(AAA, 24)

After the shuffle and sort phase reduce task will get its input as follows-

[AAA, (23, 26, 21, 19, 27, 28, 25, 24)] 

And the reduce output– (AAA, 28)

Here if you specify the combiner class same as reducer then the combiner will aggregate the respective map outputs.

Combiner for Map-1 output
(AAA, 26)
Combiner for Map-2 output
(AAA, 28)
Now the input to the reduce is as follows-
[AAA, (26, 28)]

So you can see how the data that is transferred to the reducer is minimized.

How to specify a combiner in MapReduce job

You can specify a combiner using the setCombinerClass() method of the Job class in your MapReduce driver. For example if your Reducer class is MaxStockPriceReducer and you want to set the Reducer class as the Combiner class too then it can be done as follows.

job.setCombinerClass(MaxStockPriceReducer.class);

One thing you will have to ensure when using combiner is; however inputs are combined the end result should be identical.

As example if you are calculating average where map-1 (3,4,5) and map-2 (6, 8)

Then reduce function will calculate average as- (3, 4, 5, 6, 8) = 5.2

With combiner-

Average of (3,4,5) = 4

Average of (6, 8) = 7

Then in reduce function– Average of (4, 7) = 5.5

In this example you can see with combiner the result is different, so you’ll have to write your logic in such a way that even if combiner is used the result should be identical.

MapReduce Example using combiner

Here is a MapReduce example where the max price per stock symbol is calculated using MapReduce. Input file has tab separated data comprising of Stock symbol and price.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Map function
  public static class MaxStockPriceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      
    private final static IntWritable one = new IntWritable(1);
    private Text symbol = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      symbol.set(stringArr[0]);
      Integer price = Integer.parseInt(stringArr[1]);
      context.write(symbol, new IntWritable(price));
    }
  }
	
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {

      int maxValue = Integer.MIN_VALUE;
      for (IntWritable val : values) {
        maxValue = Math.max(maxValue, val.get());
      }      
      context.write(key, new IntWritable(maxValue));
    }
  }
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxStockPriceMapper.class);    
    job.setReducerClass(MaxStockPriceReducer.class);		
    //job.setCombinerClass(MaxStockPriceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

As you can see initially the line where combiner class is set is commented. If you run this MapReduce job without specifying any combiner and see the counters in the console.

Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=106
Reduce input records=10
Reduce output records=2
Spilled Records=20

Now the line setting the combiner is uncommented and the MapReduce job is run again now the same counters are as follows.

Combine input records=10
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4

So you can see combiner itself minimized the data sent to reducer and the shuffled bytes are also reduced in the process.

Related Posts

That's all for the topic Combiner in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Mapper Only Job in Hadoop MapReduce

Generally when we think of MapReduce job in Hadoop we think of both mappers and reducers doing their share of processing. That is true for most of the cases but you can have scenarios where you want to have a mapper only job in Hadoop.

When do you need map only job

You may opt for a map only job in Hadoop when you do need to process the data and get the output as (key, value) pairs but don’t want to aggregate those (key, value) pairs.

For example– If you are converting a text file to sequence file using MapReduce. In this case you just want to read a line from text file and write it to a sequence file so you can opt for a MapReduce with only map method.

Same way if you are converting a text file to parquet file using MapReduce you can opt for a mapper only job in Hadoop.

What you need to do for mapper only job

For a mapper only job you need to write only map method in the code, which will do the processing. Number of reducers is set to zero.

In order to set the number of reducers to zero you can use the setNumReduceTasks() method of the Job class. So you need to add the following in your job configuration in your MapReduce code driver.

job.setNumReduceTasks(0);

Benefits of Mapper only job

As already stated if you just want to process the data with out any aggregation then better go for a mapper only job as you can save on some of the processing done internally by the Hadoop framework.

Since reducer is not there so no need of shuffle and sort phase also transferring of data to the nodes where reducers are running is not required.

Also note that in a MapReduce job the output of map phase is written to the local disk on the node rather than to HDFS. Where as, in the case of Mapper only job, Map output is written to the HDFS.

Mapper only job in Hadoop example

If you have to convert a text file to sequence file that can be done using only a map function, you can set the number of reducers to zero.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileWriter extends	Configured implements Tool{
  // Map function
  public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
  public static void main(String[] args)  throws Exception{
    int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
    System.exit(exitFlag);	   
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "sfwrite");
    job.setJarByClass(SequenceFileWriter.class);
    job.setMapperClass(SFMapper.class);
    // Setting reducer to zero
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // Compression related settings
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    int returnFlag = job.waitForCompletion(true) ? 0 : 1;
    return returnFlag;
  }
}

You can run the MapReduce job using the followng command.

$ hadoop jar /pathto/jar/knpcodehadoop.jar org.knpcode.SequenceFileWriter /user/input/count /user/output/seq

By listing the output directory you can see that a sequence file is created.

hdfs dfs -ls /user/output/seq

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2018-06-14 12:26 /user/output/seq/_SUCCESS
-rw-r--r--   1 knpcode supergroup        287 2018-06-14 12:26 /user/output/seq/part-m-00000

Related Posts

That's all for the topic Mapper Only Job in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

How to See Logs And Sysouts in Hadoop MapReduce

While writing a program, in order to debug we do put some logs or system.out to display messages. In your MapReduce program also you can use logger or sysouts for debugging purposes. In this post we’ll see how you can access those logs or system.out.print messages in Hadoop MR2.

How to see log messages in MapReduce2

First thing of course is to put logs in your code. Then at the time of running your MapReduce job you can note the application_id of the job from the console. Once you run your MapReduce job you will get a line as following displayed on the console showing the application id.

18/06/13 15:20:59 INFO impl.YarnClientImpl: Submitted application application_1528883210739_0001

With the same application_id a folder will be created in the location HADOOP_INSTALLATION_DIR/logs/userlogs/ there you will find folders having logs for your mappers and reducers. In those folders you can check stdout file for any system.out.print and syslog for log messages.

Example MapReduce showing how to put logs

You can use Apache commons logging which comes with the Hadoop bundle for logging purposes. Here is a simple word count MapReduce program with some log.info and sysout messages put in.

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount{
  public static final Log log = LogFactory.getLog(WordCount.class);
  // Map function
  public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
				
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      log.info("in map method");
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        System.out.println("word -- " + word.toString());
        context.write(word, one);
      }	 
    }
  }
	
  // Reduce function
  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      log.info("in reducer ");
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      log.info(key + " -- Sum is --- " + sum);
      result.set(sum);
      context.write(key, result);
    }
  }
	
  public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordMapper.class);  
    //job.setNumReduceTasks(0);
    job.setReducerClass(CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Once you run it and you know the application id just go to the location and check the stdout and syslog files.

As example after running this code I can access the stdout at the path-HADOOP_INSTALLATION_DIR/logs/userlogs/application_1528883210739_0001/container_1528883210739_0001_01_000002/stdout and see my sysouts there-

word -- This
word -- is
word -- a
word -- test
word -- file.
word -- This
word -- is
word -- a
word -- Hadoop
word -- MapReduce
word -- program
word – file.

Or I can access syslog at the path- HADOOP_INSTALLATION_DIR/logs/userlogs/application_1528883210739_0001/container_1528883210739_0001_01_000003/syslog and see the loggers for the reducer.

2018-06-13 15:21:15,321 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,321 INFO [main] org.knpcode.WordCount$WordMapper: Hadoop -- Sum is --- 1
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: MapReduce -- Sum is --- 1
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: This -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: a -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: file. -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: is -- Sum is --- 2
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,322 INFO [main] org.knpcode.WordCount$WordMapper: program -- Sum is --- 1
2018-06-13 15:21:15,323 INFO [main] org.knpcode.WordCount$WordMapper: in reducer 
2018-06-13 15:21:15,323 INFO [main] org.knpcode.WordCount$WordMapper: test -- Sum is --- 1
Related Posts

That's all for the topic How to See Logs And Sysouts in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like