December 31, 2023

Java HashMap With Examples

HashMap in Java is the HashTable implementation of the Map interface and it is part of the Java Collections framework. HashMap class in Java extends AbstractMap class and implements Map, Cloneable and Serializable inerfaces.

HashMap stores its elements as (key, value) pairs and to get a value you will need to provide the key paired with that value. For storing values in HashMap, hashing technique is used where a hash is calculated using the key and that hash value decides in which bucket the value will be stored.

Features of HashMap

Some of the features of the HashMap in Java which are discussed in this post are as follows-

  1. In HashMap values may be duplicate but a key has to be unique. If same key is used then the value will be overwritten.
  2. HashMap uses hashing technique to store values.
  3. HashMap storage is unordered, which means insertion order is not maintained as in the case of ArrayList.
  4. HashMap in Java permits both null values and null keys.
  5. HashMap is not thread safe.
  6. The iterators returned by all of HashMap's "collection view methods" are fail-fast. Which means, if the map is structurally modified at any time after the iterator is created, in any way except through the iterator's own remove method, the Iterator throws a ConcurrentModificationException.

Java HashMap constructors

  • HashMap()- This constructor constructs an empty HashMap with the default initial capacity (16) and the default load factor (0.75).
  • HashMap(int initialCapacity)- This constructor constructs an empty HashMap with the specified initial capacity and the default load factor (0.75).
  • HashMap(int initialCapacity, float loadFactor)- This constructor constructs an empty HashMap with the specified initial capacity and load factor.
  • HashMap(Map<? extends K,? extends V> m)- Constructs a new HashMap with the same mappings as the specified Map.

Initial capacity, load factor and buckets in HashMap

HashMap in Java internally uses an array of type Node to store elements. Where Node<K, V> is an inner class with in HashMap class. You should have clear understanding of the terms initial capacity, load factor and buckets to understand HashMaps better.

  • Capacity- If you don’t specify any capacity while creating HashMap then the array will have default initial capacity of 16. If you use the constructor where initial capacity is also passed then the array will have the specified initial capacity.
  • Bucket- In HashMap concept of bucket is used so each index of array is conceptualized as one bucket. So, total there are 16 buckets. For every (key, value) pair that is added to HashMap a hash is calculated using the key, based on that hash value one of these buckets is chosen to store the element. That way HashMap is able to offer constant time performance for basic operations like get and put.
  • Load factor- Load factor is the threshold for the HashMap storage. Once the threshold is reached the capacity of the HashMap is doubled. Default load factor is 0.75 which means if the 75% of the capacity is reached the HashMap is resized.

Refer HashMap Internal Implementation in Java to get a better understanding of how does HashMap internally works in Java.

Java example creating a HashMap

This example shows how HashMap is created and elements added to it.

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class HashMapDemo {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put(null, "Volga");
    carMap.put(null, "Volks Wagon");
    carMap.put("4", null);
    carMap.put("3", "Mini Cooper");
        
    Set<String> carSet =  carMap.keySet();
    for(String key : carSet){
      System.out.println("Key is " + key + " Value is " + carMap.get(key));
    }
  }
}
Output
Key is null Value is Volks Wagon
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Mini Cooper
Key is 4 Value is null

In the code HashMap of default capacity is created using this statement.

Map<String, String> carMap = new HashMap<String, String>();

All Collection classes are generic now, so you can specify in the beginning itself what type of elements will be stored in the Map. The Map used in this example can store only Strings as both keys and values.

From the output you can see some of the points as already mentioned above.

  1. Insertion order is not maintained in HashMap. Values are not displayed in the order they were inserted.
  2. Two values are inserted with null as key, second insertion overwrites the first one as only one null key is allowed in Java HashMap.
  3. One null value is also inserted.
  4. Two values are inserted with the same key “3”. The second insertion overwrites the first one in case of same key.

Methods in the HashMap class

Here is a list of some of the methods in the HashMap class in Java.

  1. put(K key, V value)- Associates the specified value with the specified key in this map.
  2. putAll(Map<? extends K,? extends V> m)- Copies all of the mappings from the specified map to this map.
  3. get(Object key)- Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
  4. containsKey(Object key)- Returns true if this map contains a mapping for the specified key.
  5. containsValue(Object value)- Returns true if this map maps one or more keys to the specified value.
  6. remove(Object key)- Removes the mapping for the specified key from this map if present.
  7. clear()- Removes all of the mappings from this map.
  8. entrySet()- Returns a Set view of the mappings contained in this map.
  9. keySet()- Returns a Set view of the keys contained in this map.
  10. values()- Returns a Collection view of the values contained in this map.
  11. size()- Returns the number of key-value mappings in this map.
  12. isEmpty()- Returns true if this map contains no key-value mappings.
  13. compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)- Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping).
  14. computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)- If the specified key is not already associated with a value (or is mapped to null), attempts to compute its value using the given mapping function and enters it into this map unless null.
  15. computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)- If the value for the specified key is present and non-null, attempts to compute a new mapping given the key and its current mapped value.

Java Example removing and replacing elements from HashMap

public class HashMapDemo {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
    // removing element
    carMap.remove("2");
        
    // replacing element
    carMap.replace("3", "Land Rover");
    Set<String> carSet =  carMap.keySet();
    for(String key : carSet){
        System.out.println("Key is " + key + " Value is " + carMap.get(key));
    }
  }
}
Output
Key is 1 Value is Audi
Key is 3 Value is Land Rover
Key is 4 Value is Mini Cooper

Example with computeIfPresent and computeIfAbsent using lambdas

public class HashMapDemo {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
    // returns value for new key
    carMap.computeIfAbsent("5", k -> {return "Land Rover";});
    // change value for existing key
    carMap.computeIfPresent("4", (String k, String v) -> {
    if (carMap.get(k).equals("Mini Cooper")){
        return "Mazda";} 
    else
        return v;});
        
    Set<String> carSet =  carMap.keySet();
    for(String key : carSet){
      System.out.println("Key is " + key + " Value is " + carMap.get(key));
    }
  }
}
Output
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mazda
Key is 5 Value is Land Rover

Java HashMap iterator example

You can’t directly use an iterator with HashMap. You will have to get the collection view of the Map and then iterate it. The iterators returned by the iterator methods are fail-fast. If the Map is modified at any time after the iterator is created, in any way except through the iterator's own remove method, the Iterator throws a ConcurrentModificationException.

Refer Different Ways to Iterate a HashMap in Java to see your options for iterating a HahsMap.

Let’s try to clarify it with an example. In the code while iterating the HashMap after getting it’s set view using keySet() we’ll try to remove an element using the HashMap’s remove() method not the iterator’s remove method. All these methods like entrySet() or keySet() are fail-fast. Which means, if the map is structurally modified at any time after the iterator is created, in any way except through the iterator's own remove method, the Iterator throws a ConcurrentModificationException.

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class HashMapDemo {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
        
    Set<String> carSet =  carMap.keySet();
    Iterator<String> itr = carSet.iterator();
    while (itr.hasNext()) {
      String key = itr.next();
      System.out.println("Key is " + key + " Value is " + carMap.get(key));    
      // removing value using HashMap's remove method
      if(key.equals("2")){
          carMap.remove(key);
      }
    }
  }
}
Output
Key is 1 Value is Audi
Key is 2 Value is BMW
Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(Unknown Source)
	at java.util.HashMap$KeyIterator.next(Unknown Source)
	at com.knpcode.HashMapDemo.main(HashMapDemo.java:22)

As you can see ConcurrentModificationException exception is thrown as you are trying to structurally modify the HashMap while it is iterated using an iterator.

Using iterator’s remove method

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class HashMapDemo {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
        
    Set<String> carSet =  carMap.keySet();
    Iterator<String> itr = carSet.iterator();
    while (itr.hasNext()) {
      String key = itr.next();
      System.out.println("Key is " + key + " Value is " + carMap.get(key));    
      // removing value using HashMap's remove method
      if(key.equals("2")){
          itr.remove();
      }
    }
        
    System.out.println("** After element removal **");
    for(String key : carMap.keySet()){
      System.out.println("Key is " + key + " Value is " + carMap.get(key));
    }
  }
}
Output
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
** After element removal **
Key is 1 Value is Audi
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper

HashMap is not threadsafe

HashMap in Java is not threadsafe. If you are using HashMap in multithreaded environment where instance of HashMap is shared among many threads, you should synchronize it externally. In order to synchronize Map you can use Collections.synchronizedMap() method which returns a synchronized Map backed by the specified map.

As example-
Map<String, String> tempMap = Collections.synchronizedMap(carMap);

That's all for the topic Java HashMap With Examples. If something is missing or you have something to share about the topic please write a comment.


You may also like

Spring Boot With Docker Example

In this tutorial you’ll see how to build a Docker image for running a Spring Boot application. We’ll create a basic DockerFile to dockerize a Spring Boot MVC application where view is created using Thymeleaf.

Maven Dependencies

Since we are creating a web application so we need a spring-boot-starter-web, for Thymeleaf we need spring-boot-starter-thymeleaf, spring-boot-maven-plugin is also added to our pom.xml. This plugin provides many convenient features-

  • It helps to create an executable jar (über-jar), which makes it more convenient to execute and transport your service.
  • It also searches for the public static void main() method to flag the class having this method as a runnable class.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.knpcode</groupId>
  <artifactId>SprinBootProject</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>SpringBootProject</name>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
  </parent>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
     </dependency>
     <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

Classes for Spring Boot Web Application

We’ll add a simple controller for our web application.

import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;

@Controller
public class MessageController {
  @GetMapping("/")
  public String showMessage(Model model) { 
    model.addAttribute("msg", "Welome to Docker");
    return "message";
  }
}
View class (Thymeleaf template)

In src/main/resources added a new folder Templates and in that created a message.html file.

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Spring Boot With Docker</title>
</head>
<body>
 <div>
    <p th:text="${msg}"></p>
 </div>
</body>
</html>

Application Class

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootProjectApp {
  public static void main(String[] args) {
    SpringApplication.run(SpringBootProjectApp.class, args);
  }
}
Running the application

You can run this Spring Boot web application as a stand alone Java application but we'll run it by creating an executable jar.

For creating a completely self-contained executable jar file run mvn package from the command line. Note that you should be in your Spring Boot project directory.

knpcode:SprinBootProject$ mvn package

To run application using the created jar, you can use the java -jar command, as follows-

java -jar target/SprinBootProject-0.0.1-SNAPSHOT.jar

But we’ll do the samething by creating a DockerFile.

DockerFile

For running in your application in Docker container you need to create an image which is a read-only template with instructions for creating a Docker container.

For creating Docker image you create a Dockerfile which is a text file with a simple syntax for defining the steps needed to create the image and run it. Each instruction in a Dockerfile creates a layer in the image.

Create a text file with in your project directory named DockerFile and copy the following text in it.

FROM openjdk:8-jdk-alpine

ARG JAR_FILE=target/SprinBootProject-0.0.1-SNAPSHOT.jar

COPY ${JAR_FILE} app.jar

ENTRYPOINT ["java","-jar","/app.jar"]
  1. Often, an image is based on another image, with some additional customization. This is true in our case too and the base image used here is openjdk:8-jdk-alpine This image is based on the popular Alpine Linux project which is much smaller than most distribution base images (~5MB), and thus leads to much slimmer images in general.
  2. Then assign a name to the jar path.
  3. Copy jar file.
  4. Execute jar using the ENTRYPOINT instruction by providing arguments in the following form- ENTRYPOINT ["executable", "param1", "param2"] Which makes it equivalent to java -jar target/SprinBootProject-0.0.1-SNAPSHOT.jar

Create a docker image

You can create a Docker image by running command in the following form-

sudo docker build -t name:tag .

For our project command to create a docker image-

sudo docker build -t sbexample:1.0 .

. means using the current directory as context

tag the image as sbexample:1.0

To create a container (run an image)

The docker run command must specify an image to derive the container from.

sudo docker run -d -p 8080:8080 sbexample:1.0

Here options are-

-d To start a container in detached mode (to run the container in the background)

-p Publish all exposed ports to the host interfaces

If every thing works fine then you will have a dockerized Spring Boot application at this point which you can access by typing URL http://localhost:8080/ in a browser

Spring Boot With Docker

If you want to see the running containers use following command

sudo docker ps

To stop a running container use following command

sudo docker stop container_id

That's all for the topic Spring Boot With Docker Example. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 30, 2023

How to Iterate a Java HashMap

In this post we’ll see different ways to iterate a Map or HashMap in Java. One thing you should know is you can’t directly loop a Map in Java (except when you use forEach statement). There are methods that return a "collection view" of the Map using that view you can iterate a HashMap in Java.

The methods that can be used for getting a "collection view" of the Map are as follows-

  • Set<Map.Entry<K,V>> entrySet()- Returns a Set view of the mappings contained in this map.
  • Set<K> keySet()- Returns a Set view of the keys contained in this map.
  • Collection<V> values()- Returns a Collection view of the values contained in this map.

Options for iterating a Java HashMap

As you can see from the above methods you either get a Set with Map.entry elements, a Set of keys of the maps or a Collection view of the Map values.

Using that view you can iterate a Map in Java using one of the following options-

  1. You can use For-Each loop (enhanced for loop), available from Java 5.
  2. You can iterate using Iterator. Using iterator() method you can get an iterator and then using the hashNext() and next() method of the iterator you can iterate a HashMap.
  3. You can also use forEach statement available from Java 8 to loop through Map.

Iterate a HashMap in Java - Examples

Here are some examples using all of the above mentioned methods for iterating a HashMap.

1. Using entrySet() method

Using entrySet() method you get the set view of the mappings stored in the HashMap in the form of Map.entry elements. Using that set view you can iterate a HashMap and get both key and value.

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class HashMapIteration {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
    System.out.println("***Looping using entrySet***");
    Set<Map.Entry<String, String>> carSet =  carMap.entrySet();
        
    System.out.println("***Using for-each loop***");
    for(Map.Entry<String, String> entry : carSet){
      System.out.println("Key is " + entry.getKey() + " Value is " + entry.getValue());
    }
        
    System.out.println("***Using iterator***");
    Iterator<Map.Entry<String, String>> itr = carSet.iterator();
    while (itr.hasNext()) {
      Map.Entry<String, String> entry = itr.next();
      System.out.println("Key is " + entry.getKey() + " Value is " + entry.getValue());    
    }
        
    System.out.println("***Using forEach statement***");
    carSet.forEach((c)->System.out.println("Key is " + c.getKey() + " Value is " + c.getValue()));
  }
}
Output
***Looping using entrySet***
***Using for-each loop***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
***Using iterator***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
***Using forEach statement***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
2. Using keySet() method

Using keySet() method you get the set view of the HashMap keys. Once you have the keys you can also get the values mapped to those keys using the get() method but that makes the looping of the Map slow in comparison to other ways.

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class HashMapIteration {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
    System.out.println("***Looping using keySet***");
    Set<String> carSet =  carMap.keySet();
    System.out.println("***Using for-each loop***");
    for(String key : carSet){
      System.out.println("Key is " + key + " Value is " + carMap.get(key));
    }
    System.out.println("***Using iterator***");
    Iterator<String> itr = carSet.iterator();
    while (itr.hasNext()) {
      String key = itr.next();
      System.out.println("Key is " + key + " Value is " + carMap.get(key));    
    }
    System.out.println("***Using forEach statement***");
    carSet.forEach((c)->System.out.println("Key is " + c + " Value is " + carMap.get(c)));
  }
}
Output
***Looping using keySet***
***Using for-each loop***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
***Using iterator***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
***Using forEach statement***
Key is 1 Value is Audi
Key is 2 Value is BMW
Key is 3 Value is Jaguar
Key is 4 Value is Mini Cooper
3. Using values() method

If you just want to iterate over the values of the HashMap you can use the values() method.

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class HashMapIteration {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
        
    System.out.println("***Looping using values***");
    Collection<String> cars = carMap.values();
    System.out.println("***Using for-each loop***");
    for(String car : cars){
      System.out.println("Value is " + car);
    }
    System.out.println("***Using iterator***");
    Iterator<String> itr = cars.iterator();
    while (itr.hasNext()) {
      System.out.println("Value is " + itr.next());
    }
    System.out.println("***Using forEach statement***");
    cars.forEach((c)->System.out.println("Value is " + c));
    // forEach with method reference
    cars.forEach(System.out::println);
  }
}
Output
***Looping using values***
***Using for-each loop***
Value is Audi
Value is BMW
Value is Jaguar
Value is Mini Cooper
***Using iterator***
Value is Audi
Value is BMW
Value is Jaguar
Value is Mini Cooper
***Using forEach statement***
Value is Audi
Value is BMW
Value is Jaguar
Value is Mini Cooper
Audi
BMW
Jaguar
Mini Cooper
4. Iterating a Map directly using forEach

As the saying goes “save the best for the last” here is a way to iterate a HashMap in Java directly using forEach statement (Java 8 onward).

public class HashMapIteration {
  public static void main(String[] args) {
    // Creating HashMap
    Map<String, String> carMap = new HashMap<String, String>();
    // Storing elements
    carMap.put("1", "Audi");
    carMap.put("2", "BMW");
    carMap.put("3", "Jaguar");
    carMap.put("4", "Mini Cooper");
    
    carMap.forEach((K, V) -> System.out.println("Key is " + K + " value is " + V));
  }
}
Output
Key is 1 value is Audi
Key is 2 value is BMW
Key is 3 value is Jaguar
Key is 4 value is Mini Cooper

That's all for the topic How to Iterate a Java HashMap. If something is missing or you have something to share about the topic please write a comment.


You may also like

How MapReduce Works in Hadoop

In the post WordCount MapReduce program we have seen how to write a MapReduce program in Java, create a jar and run it. There are a lot of things that you do to create a MapReduce job and Hadoop framework also do a lot of processing internally. In this post we’ll see in detail how MapReduce works in Hadoop internally using the word count MapReduce program as example.

What is MapReduce

Hadoop MapReduce is a framework for writing applications that can process huge data in parallel, by working on small chunks of data in parallel on cluster of nodes. The framework ensures that this distributed processing happens in a reliable, fault-tolerant manner.

Map and Reduce

A MapReduce job in Hadoop consists of two phases-

  • Map phase– It has a Mapper class which has a map function specified by the developer. The input and output for Map phase is a (key, value) pair. When you copy the file that has to be processed to HDFS it is split into independent chunks. Hadoop framework creates one map task for each chunk and these map tasks run in parallel.
  • Reduce phase- It has a Reducer class which has a reduce function specified by the developer. The input and output for Reduce phase is also a (key, value) pair. The output of Map phase after some further processing by Hadoop framework (known as sorting and shuffling) becomes the input for reduce phase. So the output of Map phase is the intermediate output and it is processed by Reduce phase to generate the final output.

Since input and output for both map and reduce functions are key, value pair so if we say input for map is (K1, V1) and output is (K2, V2) then map function input and output will have the following form-

(K1, V1) -> list(K2, V2)

The intermediate output of the map function goes through some further processing with in the framework, known as shuffle and sort phase, before inputting to reduce function. The general form for the reduce function can be depicted as follows-

(K2, list(V2)) -> list(K3, V3)

Here note that the types of the reduce input matches the types of map output.

MapReduce explanation with example

Let’s take Word count MapReduce code as example and see what all happens in both Map and Reduce phases and how MapReduce works in Hadoop.

When we put the input text file into HDFS it is split into chunks of data. For simplicity sake let’s say we have two lines in the file and it is split into two parts with each part having one line.

If the text file has following two lines-

This is a test file
This is a Hadoop MapReduce program file

Then there will be two splits and two map tasks will get those two splits as input.

Mapper class

// Map function
public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
    // Splitting the line on spaces
    String[] stringArr = value.toString().split("\\s+");
    for (String str : stringArr) {
      word.set(str);
      context.write(word, one);
    }
  }
}

In the Mapper class you can see that it has four parameters first two specify the input to the map function and other to specify the output of the map function.

In this Word count program input key value pair will be as follows-

key- byte offset into the file at which the line starts.

Value– Content of the line.

As we assumed there will be two splits (each having one line of the file) and two map tasks let’s say Map-1 and Map-2, so input to Map-1 and Map-2 will be as follows.

Map-1– (0, This is a test file)

Map-2– (0, This is a Hadoop MapReduce program file)

Logic in map function is to split the line on spaces and the write each word to the context with value as 1.

So output from Map-1 will be as follows-

(This, 1)
(is, 1)
( a, 1)
(test, 1)
(file, 1)

And output from Map-2 will be as follows-

(This, 1)
(is, 1)
(a, 1)
(Hadoop, 1)
(MapReduce, 1)
(program, 1)
(file, 1)
Reducer class
// Reduce function
public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{	   
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

In the Reducer class again there are four parameters two for input types and two for output types of the reduce function.

Note that input type of the reduce function must match the output types of the map function.

This intermediate output from Map will be further processed by the Hadoop framework in the shuffle phase where it will be sorted and grouped as per keys, after this internal processing input to reduce will look like this-

[Hadoop, (1)]
[MapReduce, (1)]
[This, (1, 1)]
[a, (1, 1)]
[file, (1, 1)]
[is, (1, 1)]
[program, (1)]
[test, (1)]

You can see that the input to the reduce function is in the form (key, list(values)). In the logic of the reduce function, for each key value pair list of values is iterated and values are added. That will be the final output.

Hadoop 1
MapReduce 1
This 2
a 2
file. 2
is 2
program 1
test 1

That's all for the topic How MapReduce Works in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 29, 2023

OutputCommitter in Hadoop MapReduce

In Hadoop framework distributed processing happens where map and reduce tasks are spawned on different nodes and process part of the data. In this type of distributed processing it is important to ensure that framework knows when a particular task finishes or there is a need to abort the task and when does the over all job finish. For that purpose, like many other distributed systems, Hadoop also uses commit protocol. Class that implements it in Hadoop is OutputCommitter.

OutputCommitter class in Hadoop describes the commit of task output for a Map-Reduce job. In Hadoop 2 OutputCommitter implementation can be set using the getOutputCommitter() method of the OutputFormat class. FileOutputCommitter is the default OutputCommitter. Note that OutputCommitter is an abstract class in Hadoop framework which can be extended to provide OutputCommitter implementation.

Tasks performed by OutputCommitter in Hadoop Map-Reduce

  1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. For that setupJob() method is used. This is called from the application master process for the entire job. This will be called multiple times, once per job attempt.
  2. Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion.

    Cleaning up is done when either commitJob() or abortJob() method is called.

    • commitJob() method is invoked for jobs with final runstate as SUCCESSFUL. This is called from the application master process for the entire job. This method is guaranteed to only be called once to maintain atomicity.
    • abortJob() method is invoked for jobs with final runstate as JobStatus.State.FAILED or JobStatus.State.KILLED. This is called from the application master process for the entire job. This may be called multiple times.
  3. Setup the task temporary output. This is done by invoking setupTask() method. This method is called from each individual task's process that will output to HDFS, and it is called just for that task. This may be called multiple times for the same task, but for different task attempts.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
    Checking if the task needs commit is done using needsTaskCommit() method. This method returning false means commit phase is disabled for the tasks.
  5. Commit of the task output. During this step task's temporary output is promoted to final output location.
    Method used is commitTask(). There may be multiple task attempts for the same task, Hadoop framework ensures that the failed task attempts are aborted and only one task is committed.
  6. Discard the task commit. If a task doesn't finish abortTask() method is called. This method may be called multiple times for the same task, but for different task attempts.

That's all for the topic OutputCommitter in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 27, 2023

Distributed Cache in Hadoop

In this post we’ll see what Distributed cache in Hadoop is.

What is a distributed cache

As the name suggests distributed cache in Hadoop is a cache where you can store a file (text, archives, jars etc.) which is distributed across the nodes where mappers and reducers for the MapReduce job are running. That way the cached files are localized for the running map and reduce tasks.

Methods for adding the files in Distributed Cache

There is a DistributedCache class with relevant methods but the whole class is deprecated in Hadoop2. You should be using the methods in Job class instead.

  • public void addCacheFile(URI uri)- Add a file to be localized.
  • public void addCacheArchive(URI uri)- Add archives to be localized.
  • public void addFileToClassPath(Path file)- Adds file path to the current set of classpath entries. It adds the file to cache as well. Files added with this method will not be unpacked while being added to the classpath.
  • public void addArchiveToClassPath(Path archive)- Adds an archive path to the current set of classpath entries. It adds the archive to cache as well. Archive files will be unpacked and added to the classpath when being distributed.

How to use distributed cache

In order to make available a file through distributed cache in Hadoop.

  1. Copy the file you want to make available through distributed cache to HDFS if it is not there already.
  2. Based on the file type use the relevant method to add it to distributed cache.

As example if you want to add a text file to distributed cache then you can use the following statement in your driver class.

job.addCacheFile(new URI("/user/input/test.txt#test"));

If you want to add a jar to the class path then you can do it as follows-

job.addFileToClassPath(new Path("/myapp/mylib.jar"));

Distributed cache example MapReduce code

Here is an Avro MapReduce word count example program. Output file is an Avro data file which uses an Avro schema. This Avro schema is added to the distributed cache using the addCacheFile() method and used by the mappers and reducers.

import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroWordCount extends Configured implements Tool{
	
  // Map function
  public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey, AvroValue>{
    private Text word = new Text();
    private GenericRecord record;
     
    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      Schema AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
      record = new GenericData.Record(AVRO_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        // creating Avro record
        record.put("word", str);
        record.put("count", 1);
        context.write(new AvroKey(word), new AvroValue(record));
      }
    }
  }
	
  // Reduce function
  public static class AvroWordReducer extends Reducer<AvroKey, AvroValue,
      AvroKey, NullWritable>{	  
    Schema AVRO_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // That's where file stored in distributed cache is used
      AVRO_SCHEMA = new Schema.Parser().parse(new File("./wcschema"));
    }
    public void reduce(AvroKey key, Iterable<AvroValue> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue value : values) {
        GenericRecord	record = value.datum();
        sum += (Integer)record.get("count");
      }
      GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
      record.put("word", key.datum());
      record.put("count", sum);
      context.write(new AvroKey(record), NullWritable.get());
    }
  }

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroWordCount(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "AvroWC");
    job.setJarByClass(getClass());
    job.setMapperClass(AvroWordMapper.class);    
    job.setReducerClass(AvroWordReducer.class);
    // Name after the # sign in the file location
    // will be used as the file name in Mapper/Reducer
    job.addCacheFile(new URI("/user/input/wcschema.avsc#wcschema"));
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    FileSystem fs = FileSystem.get(conf);
    // Need schema file stored in HDFS here also
    Path path = new Path("/user/input/wcschema.avsc".toString());
    Schema sc = new Schema.Parser().parse((fs.open(path)));
    AvroJob.setMapOutputValueSchema(job, sc);
    AvroJob.setOutputKeySchema(job,	sc);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for the topic Distributed Cache in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 26, 2023

How to Improve Map-Reduce Performance

In this post we’ll see some of the ways to improve performance of the Map-Reduce job in Hadoop.

The tips given here for improving the performance of MapReduce job are more from the MapReduce code and configuration perspective rather than cluster and hardware perspective.

1- Enabling uber mode– Like Hadoop 1 there is no JVM resuse feature in YARN Hadoop but you can enable the task to run in Uber mode, by default uber is not enabled. If uber mode is enabled ApplicationMaster can calculate that the overhead of negotiating resources with ResourceManager, communicating with NodeManagers on different nodes to launch the containers and running the tasks on those containers is much more that running MapReduce job sequentially in the same JVM, it can run a job as uber task.

2- For compression try to use native library- When using compression and decompression in Hadoop it is better to use native library as native library will outperform codec written in programming language like Java.

3- Increasing the block size- In case input file is of very large size you can consider improving the hdfs block size to 512 M. That can be done by setting the parameter dfs.blocksize. If you set the dfs.blocksize to a higher value input split size will increase to same size because the input size is calculated using the formula.

Math.max(mapreduce.input.fileinputformat.split.minsize, Math.min(mapreduce.input.fileinputformat.split.maxsize, dfs.blocksize))

thus making it of same size as HDFS block size. By increasing the block size you will have less overhead in terms of metadata as there will be less number of blocks.

If input split is larger, Map tasks will get more data to process. In Hadoop as many map tasks are started as there are input splits so having less input splits means the overhead to initialize map tasks is reduced.

4- Time taken by map tasks- A map task should run for at least a minute (1-3 minutes) if it is finishing with in less than a minute that means input data to a map task is less. If there are many small files in your map reduce job then try to use a container file format like Sequence file or Avro that contains those small files.

You can also use CombineFileInputFormat which put many files into an input split so that there is more data for mapper to process.

5- Input data compression is splittable or not- If input data is compressed then the compression format used is splittable or not is also one of the thing to consider. If input data is not splittable there would only be a single split processed by a single map task making the processing very slow and no parallelism at all.

For compressing input data compress using bzip2 which is splittable or using lzo with indexing to make it splittable.

6- Setting number of reduce tasks- The number of maps is usually driven by the number of input splits but number of reducers can be controlled. As per the documentation; the right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

7- Data skew at reducer side- If data is skewed in such a way that more values are grouped with a single key rather than having an even distribution of values then reduce tasks which process keys with more values will take more time to finish where as other reducers will get less data because of the uneven distribution and finish early.

In this type of scenario try to analyze the partition of data and look at the possibility of writing a custom partitioner so that data is evenly distributed among keys.

8- Shuffle phase performance improvements- Shuffle phase in Hadoop framework is very network intensive as files are transferred from mappers to reducers. There is lots of IO involve as map output is written to local disk, there is lots of processing also in form of partitioning the data as per reducers, sorting data by keys, merging.

Optimization for reducing the shuffle phase time helps in reducing the overall job time. Some of the performance improvement tips are as follows-

  • Compressing the map output- Since Map output is written to disk and also transferred to the reducer, compressing map output saves storage space, makes it faster to write to disk and reduces data that has to be transferred to reducer node.
  • Filtering data- See how you can cut down on data emitted by Map tasks. Filter the records to remove unwanted records entirely. Also, reduce the record size by taking only the relevant record fields.
  • Using Combiner- Using combiner in MapReduce is a good way to improve performance of the overall MapReduce job. By using combiner you can aggregate data in the map phase itself and reduce the number of records sent to the reducer.
  • Raw Comparator- During sorting and merging Hadoop framework uses comparator to compare keys. If you are using a custom comparator then try to write it to be a raw comparator so that comparison can be done at the byte level itself. Otherwise keys in the map tasks are to be deserialized to create an object and then compare making the process time consuming.
  • Setting parameters with optimum values- Another action you can take to improve performance of the MapReduce job is to change values of some of the configuration parameters.

    Your goal is to reduce the records spilled to disk at map as well as reduce side. At map side you can change the setting for the following parameters to try to reduce the number of spills to disk.

    • mapreduce.task.io.sort.mb- The total amount of buffer memory to use while sorting files, in megabytes.
    • mapreduce.map.sort.spill.percent- The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.At reduce side you can change the setting for the following parameters to try to keep data in memory itself.
    • mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
    • mapreduce.reduce.input.buffer.percent- The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce.
    • mapreduce.reduce.shuffle.memory.limit.percent- Maximum percentage of the in-memory limit that a single shuffle can consume.

9-Improvements in MapReduce coding- You should also optimize your MapReduce code so that it runs efficiently.

  • Reusing objects- Since map method is called many times so creating new objects judiciously will help you to reduce overhead associated with object creation. Try to reuse objects as much as you can. One of the mistake which is very frequent is writing code as follows.
    String[] stringArr = value.toString().split("\\s+");
    Text value = new Text(stringArr[0]);
    context.write(key, value);
    

    You should write it as following-

    private Text value = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      String[] stringArr = value.toString().split("\\s+");
      value.set(stringArr[0]);// reusing object
      context.write(key, value);
    }
    
  • String concatenation- Since String in Java is immutable so String concatenation results in String object creation. For appending prefer StringBuffer or StringBuilder instead.

That's all for the topic How to Improve Map-Reduce Performance. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 25, 2023

Hadoop MapReduce Word Count Program

Once you have installed Hadoop on your systemand initial verification is done you would be looking to write your first MapReduce program. Before digging deeper into the intricacies of MapReduce programming first step is the word count MapReduce program in Hadoop which is also known as the "Hello World" of the Hadoop framework.

So here is a simple Hadoop MapReduce word count program written in Java to get you started with MapReduce programming.

What you need

  1. It will be good if you have any IDE like Eclipse to write the Java code.
  2. A text file which is your input file. It should be copied to HDFS. This is the file which Map task will process and produce output in (key, value) pairs. This Map task output becomes input for the Reduce task.

Process

These are the steps you need for executing your Word count MapReduce program in Hadoop.

  1. Start daemons by executing the start-dfs and start-yarn scripts.
  2. Create an input directory in HDFS where you will keep your text file.
    bin/hdfs dfs -mkdir /user
    
    bin/hdfs dfs -mkdir /user/input
    
  3. Copy the text file you created to /usr/input directory.
    bin/hdfs dfs -put /home/knpcode/Documents/knpcode/Hadoop/count /user/input
    

    I have created a text file called count with the following content

    This is a test file.
    This is a test file.
    

    If you want to verify that the file is copied or not, you can run the following command-

    bin/hdfs dfs -ls /user/input
    
    Found 1 items
    -rw-r--r--   1 knpcode supergroup         42 2017-12-22 18:12 /user/input/count
    

Word count MapReduce Java code

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
  // Map function
  public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, one);
      }       
    }
  }
	
  // Reduce function
  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
	
  public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordMapper.class);    
    job.setReducerClass(CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

You will need at least the given jars to compile your MapReduce code, you will find them in the share directory of your Hadoop installation.

Word count MapReduce program jars

Running the word count MapReduce program

Once your code is successfully compiled, create a jar. If you are using eclipse IDE you can use it to create the jar by Right clicking on project – export – Java (Jar File)

Once jar is created you need to run the following command to execute your MapReduce code.

bin/hadoop jar /home/knpcode/Documents/knpcode/Hadoop/wordcount.jar org.knpcode.WordCount /user/input /user/output

In the above command

/home/knpcode/Documents/knpcode/Hadoop/wordcount.jar is the path to your jar.

org.knpcode.WordCount is the fully qualified name of Java class that you need to run.

/user/input is the path to input file.

/user/output is the path to output

In the java program in the main method there were these two lines-

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

That’s where input and output directories will be set.

To see an explanation of word count MapReduce program working in detail, check this post- How MapReduce Works in Hadoop

After execution you can check the output directory for the output.

bin/hdfs dfs -ls /user/output

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2017-12-22 18:15 /user/output/_SUCCESS
-rw-r--r--   1 knpcode supergroup         31 2017-12-22 18:15 /user/output/part-r-00000

The output can be verified by listing the content of the created output file.

bin/hdfs dfs -cat /user/output/part-r-00000
This	2
a	2
file.	2
is	2
test	2

That's all for the topic Hadoop MapReduce Word Count Program. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 24, 2023

How to Chain MapReduce Job in Hadoop

In many scenarios you would like to create a sequence of MapReduce jobs to completely transform and process the data. This is better than putting every thing in a single MapReduce job job and making it very complex.

In fact you can get your data through various sources and use a sequence of various applications too. That can be done by creating a work flow using Oozie but that is a topic for another post. In this post we’ll see how to chain MapReduce job in Hadoop using ChainMapper and ChainReducer.

ChainMapper in Hadoop

ChainMapper is one of the predefined MapReduce class in Hadoop. ChainMapper class allows you to use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion where the output of the first mapper becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

You can add mappers to a ChainMapper using addMapper() method.

ChainReducer in Hadoop

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

To add a Mapper class to the chain reducer you can use addMapper() method.

To set the Reducer class to the chain job you can use setReducer() method.

Chaining MapReduce job

Using the ChainMapper and the ChainReducer classes it is possible to compose MapReduce jobs that look like [MAP+ / REDUCE MAP*].

When you are using chained MapReduce you can have a combination as follows-

  1. One or more mappers
  2. Single Reducer
  3. Zero or more mappers (optional and to be used only if chained reducer is used)

When you are using chained MapReduce job the data from mappers or reducer is stored (and used) in the memory rather than on disk that reduces the disk IO to a large extent.

MapReduce chaining example

There is data of stocks with stock symbol, price and transaction in a day in the following format.

AAA		23	5677
BBB		23	12800
aaa		26	23785
.....
.....

In the data symbols are not always in the uppercase. So there are two mappers, in first relevant fields are extracted (symbol and transaction). In the second mapper symbols are converted to upper case.

Then there is a reducer that adds the transaction per symbol. Then with in the reduce task there is an InverseMapper that inverses the key, value pair. Note that InverseMapper is a predefined Mapper class with in the Hadoop framework that is why there is no implementation of it in the example code.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockTrans extends Configured implements Tool{
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      Integer trans = Integer.parseInt(stringArr[2]);
      context.write(symbol, new IntWritable(trans));
    }
  }
	
  // Mapper 2
  public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
    public void map(Text key, IntWritable value, Context context) 
        throws IOException, InterruptedException {
    
      String symbol = key.toString().toUpperCase();       
      context.write(new Text(symbol), value);
    }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }	

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockTrans(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock transactio");
    job.setJarByClass(getClass());
    // MapReduce chaining
    Configuration map1Conf = new Configuration(false);
    ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
        Text.class, IntWritable.class,  map1Conf);
    
    Configuration map2Conf = new Configuration(false);
    ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
           Text.class, IntWritable.class, map2Conf);
    
    Configuration reduceConf = new Configuration(false);		
    ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

On running this code after creating the jar.

hadoop jar /home/knpcode/Documents/knpcode/knpcodehadoop.jar org.knpcode.StockTrans /user/input/StockTrans.txt /user/output/stock
Output
hdfs dfs -cat /user/output/stock/part-r-00000

50483	AAA
180809	BBB

That's all for the topic How to Chain MapReduce Job in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

December 23, 2023

Mapper Only Job in Hadoop MapReduce

Generally when we think of MapReduce job in Hadoop we think of both mappers and reducers doing their share of processing. That is true for most of the cases but you can have scenarios where you want to have a mapper only job in Hadoop.

When do you need map only job

You may opt for a map only job in Hadoop when you do need to process the data and get the output as (key, value) pairs but don’t want to aggregate those (key, value) pairs.

For example– If you are converting a text file to sequence file using MapReduce. In this case you just want to read a line from text file and write it to a sequence file so you can opt for a MapReduce with only map method.

Same way if you are converting a text file to parquet file using MapReduce you can opt for a mapper only job in Hadoop.

What you need to do for mapper only job

For a mapper only job you need to write only map method in the code, which will do the processing. Number of reducers is set to zero.

In order to set the number of reducers to zero you can use the setNumReduceTasks() method of the Job class. So you need to add the following in your job configuration in your MapReduce code driver.

job.setNumReduceTasks(0);

Benefits of Mapper only job

As already stated if you just want to process the data with out any aggregation then better go for a mapper only job as you can save on some of the processing done internally by the Hadoop framework.

Since reducer is not there so no need of shuffle and sort phase also transferring of data to the nodes where reducers are running is not required.

Also note that in a MapReduce job the output of map phase is written to the local disk on the node rather than to HDFS. Where as, in the case of Mapper only job, Map output is written to the HDFS.

Mapper only job in Hadoop example

If you have to convert a text file to sequence file that can be done using only a map function, you can set the number of reducers to zero.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileWriter extends	Configured implements Tool{
  // Map function
  public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
  public static void main(String[] args)  throws Exception{
    int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
    System.exit(exitFlag);	   
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "sfwrite");
    job.setJarByClass(SequenceFileWriter.class);
    job.setMapperClass(SFMapper.class);
    // Setting reducer to zero
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // Compression related settings
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    int returnFlag = job.waitForCompletion(true) ? 0 : 1;
    return returnFlag;
  }
}

You can run the MapReduce job using the followng command.

$ hadoop jar /pathto/jar/knpcodehadoop.jar org.knpcode.SequenceFileWriter /user/input/count /user/output/seq

By listing the output directory you can see that a sequence file is created.

hdfs dfs -ls /user/output/seq

Found 2 items
-rw-r--r--   1 knpcode supergroup          0 2018-06-14 12:26 /user/output/seq/_SUCCESS
-rw-r--r--   1 knpcode supergroup        287 2018-06-14 12:26 /user/output/seq/part-m-00000

That's all for the topic Mapper Only Job in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like

Combiner in Hadoop MapReduce

This post shows what is combiner in Hadoop MapReduce and how combiner function can be used to reduce the overall memory, I/O and network requirement of the overall MapReduce execution.

Why is combiner needed in MapReduce

When a MapReduce job is executed and the mappers start producing output a lot of processing happens with in the Hadoop framework knows as the shuffling and sorting phase.

Map output is partitioned based on the number of reducers, those partitions are also sorted and then written to local disk.

Then the data, from the nodes where maps are running, is transferred to the nodes where reducers are running. Since a single reducer will get its input from several mappers so all that data from several maps is transferred to the reducer and merged again to form the complete input for the reduce task.

As you can see all this processing requires memory, network bandwidth and I/O. That is where combiner in Hadoop can help by minimizing the data that is sent to the reducer.

Combiner function in MapReduce

Combiner in Hadoop is an optimization that can aggregate data at the map-side itself. Combiner function runs on the map output, aggregates the data (so the data size becomes less) and the output of combiner function becomes the input for reduce task. Note that using combiner is optional.

Most of the times you will use your Reducer class as the combiner class too. If you are not, then also your Combiner class implementation must extend the Reducer and implement the reduce method.

Since combiner has the same semantics as reducer so the input and output types follows the same requirement. In a MapReduce job reduce input types must match the map output types, same way combiner input types must match the map output types. Since output of combiner becomes the input to reducer so output types of combiner must match the reduce input types.

combiner in hadoop

For example– Suppose you are trying to get maximum price for a stock. There are two input splits which are processed by two different maps.

Split 1
AAA		23
AAA		26
AAA		21
AAA		19
Split 2-
AAA		27
AAA		28
AAA		25
AAA		24
Output of Map-1
(AAA, 23)
(AAA, 26)
(AAA, 21)
(AAA, 19)
Output of Map-2
(AAA, 27)
(AAA, 28)
(AAA, 25)
(AAA, 24)

After the shuffle and sort phase reduce task will get its input as follows-

[AAA, (23, 26, 21, 19, 27, 28, 25, 24)] 

And the reduce output– (AAA, 28)

Here if you specify the combiner class same as reducer then the combiner will aggregate the respective map outputs.

Combiner for Map-1 output
(AAA, 26)
Combiner for Map-2 output
(AAA, 28)
Now the input to the reduce is as follows-
[AAA, (26, 28)]

So you can see how the data that is transferred to the reducer is minimized.

How to specify a combiner in MapReduce job

You can specify a combiner using the setCombinerClass() method of the Job class in your MapReduce driver. For example if your Reducer class is MaxStockPriceReducer and you want to set the Reducer class as the Combiner class too then it can be done as follows.

job.setCombinerClass(MaxStockPriceReducer.class);

One thing you will have to ensure when using combiner is; however inputs are combined the end result should be identical.

As example if you are calculating average where map-1 (3,4,5) and map-2 (6, 8)

Then reduce function will calculate average as- (3, 4, 5, 6, 8) = 5.2

With combiner-

Average of (3,4,5) = 4

Average of (6, 8) = 7

Then in reduce function– Average of (4, 7) = 5.5

In this example you can see with combiner the result is different, so you’ll have to write your logic in such a way that even if combiner is used the result should be identical.

MapReduce Example using combiner

Here is a MapReduce example where the max price per stock symbol is calculated using MapReduce. Input file has tab separated data comprising of Stock symbol and price.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Map function
  public static class MaxStockPriceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      
    private final static IntWritable one = new IntWritable(1);
    private Text symbol = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      symbol.set(stringArr[0]);
      Integer price = Integer.parseInt(stringArr[1]);
      context.write(symbol, new IntWritable(price));
    }
  }
	
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {

      int maxValue = Integer.MIN_VALUE;
      for (IntWritable val : values) {
        maxValue = Math.max(maxValue, val.get());
      }      
      context.write(key, new IntWritable(maxValue));
    }
  }
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxStockPriceMapper.class);    
    job.setReducerClass(MaxStockPriceReducer.class);		
    //job.setCombinerClass(MaxStockPriceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

As you can see initially the line where combiner class is set is commented. If you run this MapReduce job without specifying any combiner and see the counters in the console.

Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=106
Reduce input records=10
Reduce output records=2
Spilled Records=20

Now the line setting the combiner is uncommented and the MapReduce job is run again now the same counters are as follows.

Combine input records=10
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4

So you can see combiner itself minimized the data sent to reducer and the shuffled bytes are also reduced in the process.

That's all for the topic Combiner in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.


You may also like