October 15, 2022

CyclicBarrier in Java With Examples

The java.util.concurrent package provides many synchronizer utilities that cover the use cases where several thread communicate with each other and some sort of synchronizer is needed to regulate that interaction among the threads. CyclicBarrier in Java is one such synchronization aid that is useful when you want threads to wait at a common execution point until all the threads in the set have reached that common barrier point.

CyclicBarrier class in Java

CyclicBarrier class was added in Java 1.5 and it is part of java.util.concurrent package in Java. The class is named as CyclicBarrier because it can be reused after the waiting threads are released.

How CyclicBarrier works in Java

A CyclicBarrier is initialized with a given value and the barrier is tripped when the number of threads waiting at the barrier is equal to that value.

To make the thread wait at this barrier await() method is called upon the thread.

A thread calling the await() method signifies that the thread has reached the common barrier point and that thread is disabled until the required number of threads have invoked await() method at which point the barrier is tripped and the threads are unblocked. That way threads can synchronize their execution.

Java CyclicBarrier constructors

  • CyclicBarrier(int parties)- Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it.
  • CyclicBarrier(int parties, Runnable barrierAction)- Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

Java CyclicBarrier methods

  1. await()- Waits until all parties have invoked await on this barrier.
  2. await(long timeout, TimeUnit unit)- Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.
  3. getNumberWaiting()- Returns the number of parties currently waiting at the barrier.
  4. getParties()- Returns the number of parties required to trip this barrier.
  5. isBroken()- Queries if this barrier is in a broken state.
  6. reset()- Resets the barrier to its initial state.

CyclicBarrier Java example

In the example three threads are created and an ArrayList is shared among these threads. All of these threads process some data and put the results in the ArrayList. Further processing should start only after all the three threads have finished and called await().

For this scenario we’ll use a CyclicBarrier initialized with the value 3 and also a runnable action for further processing.

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicDemo {
  public static void main(String[] args) {		
    List<Integer> dataList = Collections.synchronizedList(new ArrayList<Integer>());
    // Initializing cyclicbarrier
    CyclicBarrier cb = new CyclicBarrier(3, new ListBarrierAction(dataList));
    // starting threads
    for(int i = 0; i < 3; i++) {
      new Thread(new ListWorker(dataList, cb)).start();;
    }
  }    
}

class ListWorker implements Runnable{
  private CyclicBarrier cb;
  private List<Integer> dataList;
  ListWorker(List<Integer> dataList, CyclicBarrier cb) {
    this.dataList = dataList;
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("Executing run method for thread - " + Thread.currentThread().getName());
    for(int i = 0; i < 10; i++) {
      dataList.add(i);
    }
    
    try {
      System.out.println("Calling await.. " + Thread.currentThread().getName());
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
// Barrier action to be executed when barrier is tripped
class ListBarrierAction implements Runnable {
  private List<Integer> dataList;
  ListBarrierAction(List<Integer> dataList){
    this.dataList = dataList;
  }
  @Override
  public void run() {
    System.out.println("In ListBarrierAction, start further processing on list with length " + dataList.size());
  }
}
Output
Executing run method for thread - Thread-0
Calling await.. Thread-0
Executing run method for thread - Thread-2
Executing run method for thread - Thread-1
Calling await.. Thread-2
Calling await.. Thread-1
In ListBarrierAction, start further processing on list with length 30

What happens when await method is called

When the await() method of CyclicBarrier class is called upon the current thread and the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens-

  • The last thread arrives; or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.

If the current thread is the last thread to arrive then-

If a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue. If an exception occurs during the barrier action then that exception will be propagated in the current thread and the barrier is placed in the broken state.

BrokenBarrierException in CyclicBarrier

If any thread is interrupted while waiting at a barrier point, then all other waiting threads will throw BrokenBarrierException and the barrier is placed in the broken state.

If the barrier is reset() while any thread is waiting then BrokenBarrierException is thrown.

CyclicBarrier is reusable

Unlike one of the other synchronization aid CountDownLatch which can’t be reused, CyclicBarrier in Java can be reused after the waiting threads are released.

Let's reuse the same example as above but now the CyclicBarrier is initialized with the value 4 as we’ll wait for the main thread too. Three more threads will be started once the initial set of three threads are released from the barrier these new threads use the same CyclicBarrier object.

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicDemo {
  public static void main(String[] args) {     
  List<Integer> dataList = Collections.synchronizedList(new ArrayList<Integer>());
  // Initializing cyclicbarrier
  CyclicBarrier cb = new CyclicBarrier(4, new ListBarrierAction(dataList));
  // starting threads
  for(int i = 0; i < 3; i++) {
    new Thread(new ListWorker(dataList, cb)).start();;
  }
  try {
    // Calling await for main thread
    cb.await();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  System.out.println("done with initial set of threads, starting again reusing the same cyclicbarrier object");
  dataList = Collections.synchronizedList(new ArrayList<Integer>());
  // Starting another set of threads
  for(int i = 0; i < 3; i++) {
    new Thread(new ListWorker(dataList, cb)).start();;
  }    
  try {
    // Calling await for main thread
    cb.await();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  System.out.println("In main thread...");
  }    
}

class ListWorker implements Runnable{
  private CyclicBarrier cb;
  private List<Integer> dataList;
  ListWorker(List<Integer> dataList, CyclicBarrier cb) {
    this.dataList = dataList;
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("Executing run method for thread - " + Thread.currentThread().getName());
    for(int i = 0; i < 10; i++) {
      dataList.add(i);
    }
    
    try {
      System.out.println("Calling await.. " + Thread.currentThread().getName());
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
// Barrier action to be executed when barrier is tripped
class ListBarrierAction implements Runnable {
  private List<Integer> dataList;
  ListBarrierAction(List<Integer> dataList){
    this.dataList = dataList;
  }
  @Override
  public void run() {
    System.out.println("In ListBarrierAction, start further processing on list with length " + dataList.size());
  }
}
Output
Executing run method for thread - Thread-0
Executing run method for thread - Thread-1
Executing run method for thread - Thread-2
Calling await.. Thread-2
Calling await.. Thread-0
Calling await.. Thread-1
In ListBarrierAction, start further processing on list with length 30
done with initial set of threads, starting again reusing the same cyclicbarrier object
Executing run method for thread - Thread-4
Calling await.. Thread-4
Executing run method for thread - Thread-3
Executing run method for thread - Thread-5
Calling await.. Thread-5
Calling await.. Thread-3
In ListBarrierAction, start further processing on list with length 30
In main thread...

That's all for the topic CyclicBarrier in Java With Examples. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment