September 1, 2021

Java Parallel Streams With Examples

When you create a Stream using Java Stream API it is always serial stream by default. You can also create parallel stream in Java to execute a stream in parallel. In that case Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.

How to create Parallel streams in Java

There are two ways to create a parallel stream-

1- Using the parallelStream() method of the Collection which returns a parallel Stream with this collection as its source.

List<Integer> myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);  
long count = myList.parallelStream().count();

2- Using the parallel() method of BaseStream.

int value = Stream.of(1, 2, 3, 4, 5).parallel().reduce(0, (a, b) -> a+b);

Points about parallel stream

  1. When parallel stream is used. Multiple substreams are processed in parallel by separate threads and the partial results are combined later.
  2. By default processing in parallel stream uses common fork-join thread pool for obtaining threads.
  3. Operations applied to a parallel stream must be stateless and non-interfering.
  4. Parallel streams enable you to implement parallelism with non-thread-safe collections provided that you do not modify the collection while you are operating on it. Any shared variable should also not be updated by any operation in the parallel stream.
  5. Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores.

Java parallel stream examples

Let’s say you have an Employee class with name, dept, salary fields and you want to calculate average salary for Finance department.

public class Employee {
  private String name;
  private String dept;
  private int salary;

  Employee(String name, String dept, int salary){
    this.name = name;
    this.dept = dept;
    this.salary = salary;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public int getSalary() {
    return salary;
  }
  public void setSalary(int salary) {
    this.salary = salary;
  }
  public String getDept() {
    return dept;
  }
  public void setDept(String dept) {
    this.dept = dept;
  }
}

To calculate average salary in parallel-

List<Employee> employeeList = new ArrayList<>(); 
  
employeeList.add(new Employee("Jack", "Finance", 5500)); 
employeeList.add(new Employee("Lisa", "Finance", 5600)); 
employeeList.add(new Employee("Scott", "Finance", 7000));
employeeList.add(new Employee("Nikita", "IT", 4500));
employeeList.add(new Employee("Tony", "IT", 8000)); 
		  
double avgSalary = employeeList.parallelStream()
		               .filter(e -> e.getDept() == "Finance")
		               .mapToInt(e -> e.getSalary())
		               .average()
		               .getAsDouble();
  
System.out.println("Average salary in Finance dept- " + avgSalary);
Output
Average salary in Finance dept- 6033.333333333333

Concurrent Reduction using Collectors.groupingByConcurrent

With parallel stream groupingByConcurrent() method should be used rather than groupingBy() because the operation groupingBy performs poorly with parallel streams. This is because it operates by merging two maps by key, which is computationally expensive.

groupingByConcurrent() method returns an instance of ConcurrentMap instead of Map.

Collectors.groupingByConcurrent() example

Here is an example that groups employees by departments. This example invokes the collect operation and grouping is done concurrently which reduces the collection into a ConcurrentMap.

List<Employee> employeeList = new ArrayList<>(); 
  
employeeList.add(new Employee("Jack", "Finance", 5500)); 
employeeList.add(new Employee("Lisa", "Finance", 5600)); 
employeeList.add(new Employee("Scott", "Finance", 7000));
employeeList.add(new Employee("Nikita", "IT", 4500));
employeeList.add(new Employee("Tony", "IT", 8000)); 

ConcurrentMap<String, List<Employee>> Departments = employeeList.parallelStream()
																.collect(Collectors.groupingByConcurrent(e -> e.getDept()));
Departments.forEach((K, V)->{
    System.out.println("Key- " + K );
    System.out.println("Values");
    V.forEach(e->System.out.println(e.getName()));
});
Output
Key- Finance
Values
Scott
Lisa
Jack
Key- IT
Values
Tony
Nikita

Using reduce method with combiner

When you use parallel stream, stream is partitioned into multiple substreams. These substreams are processed in parallel and the partial results of these substreams are combined to get the final result. In such scenario that version of reduce() method where combiner is also passed as an argument is very helpful.

  • reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)

In this version of reduce() method, combiner function is used for combining results of accumulator function.

Consider the scenario where you want to get the product of square of all the elements in a List.

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); 
int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b), (a, b) -> a*b);
System.out.println("Product of square of elements in the list- " + value);
Output
Product of square of elements in the list- 14400

In the example you can see that the combiner function differs from accumulator function. Accumulator function is performing the logic of getting the product of squares where as combiner function is combining the partial result which can be done by multiplying partial results that is why combiner function is (a, b) -> a*b

If you don’t specify a combiner function in such scenario and use the version of reduce method where accumulator function is passed and that works as a combiner also then you may get wrong results.

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); 
int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b));
System.out.println("Product of square of elements in the list: " + value);
Output
Product of square of elements in the list: -1055916032

As you can see now the result is wrong as same accumulator function (a, b) -> a*(b*b) is used as combiner function also which is not correct in this scenario.

Ordering of elements in Parallel stream

In case of serial stream if source is ordered then the stream is also ordered.

For example if source is a List which is an ordered collection, stream is also ordered.

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.stream().forEach(System.out::println);
Output
1
2
3
4
5
6
7
8
9
10

When you execute a stream in parallel, the Java compiler and runtime determine the order in which to process the stream's elements to maximize the benefits of parallel computing thus the ordering may change even for ordered collection.

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.parallelStream().forEach(System.out::println);
Output
7
2
1
5
4
9
10
8
3
6

If you want to process the elements of the stream in the order specified by its source, regardless of whether you executed the stream in serial or parallel you can use forEachOrdered() method. Note that you may lose the benefits of parallelism if you use operations like forEachOrdered with parallel streams.

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.parallelStream().forEachOrdered(System.out::println);
Output
1
2
3
4
5
6
7
8
9
10

That's all for the topic Java Parallel Streams With Examples. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment