CyclicBarrier example: a parallel sort algorithm (ctd)

On the previous page, we outlined a technique for performing a multithreaded parallel sort. We end up with a sort that takes place in three stages, where each stage has a parallel part and a serial part. The three stages can be summarised as follows:

StageParallel step (executed by all participating threads)Serial step (executed by one random thread after the parallel step)
1. Find split pointsChoose a random sample of (say) 16 values from a portion of the list. Each thread considers an equal-sized sublist (e.g. if there are 300 elements and 3 threads, thread 1 chooses samples from elements 0-99, 2 considers 100-199 etc). Amalgamate and sort the list of samples, then make a new list of "split points" containing the 16th, 32nd etc element from the sorted list.
2. Allocate into bucketsEach thread goes through a sublist of (size / noThreads) elements and, given the split points, puts each element in the sublist into its correct bucket.Essentially none, although in our example we perform a small "memory clearing" task at this point.
3. Sort bucketsEach thread calls Collections.sort() on its bucket.Amalgamate the sorted buckets.

Overall pattern

First, let's take a top-level view of how the above will be put together using a CyclicBarrier. That will give you an idea of how CyclicBarrier works in case your interest is really to adapt the pattern to another purpose. If you actually want to implement a parallel sort, then stay tuned, because we'll look at a possible implementation in a moment.

We'll start with an outline of a worker thread. Each thread will essentially be identical, and will have a structure roughly as follows:

class SorterThread extends Thread {
  ...
  public void run() {
    // Work out which portion of data our thread is working on
    double div = (double) data.size() / noThreads;
    int startPos = (int) (div * threadNo),
        endPos = (int) (div * (threadNo + 1));

    try {
      // Stage 1
      gatherSplitPointSample(data, startPos, endPos);
      barrier.await();

      // Stage 2
      assignItemsToBuckets(data, threadNo, startPos, endPos);
      barrier.await();

      // Stage 3
      sortMyBucket();
      barrier.await();
    } catch (InterruptedException e) {
      ...
    } catch (BrokenBarrierException e) {
      ...
    }
  }
}

Notice, then, that the essential pattern is that the thread executes each of the process, then calls barrier.await() at the end of stage (where of course barrier is an instance of CyclicBarrier). In other applications where there weren't discrete stages but rather some arbitrary number of iterations, the thread would sit in a loop, performing work then calling await(). But the principle is essentially the same.

So much for the parallel parts. We then need a a method that handles the serial parts of the sort— that is a method that will be called by one of our threads each time all of them have got to the point of calling await(). Logically, we want three separate methods, one for each stage. But for reasons we'll see in a minute, we actually need to wrap them in a single method that will be called each time. But a simple variable to record the current stage number will suffice:

private void sortStageComplete() {
  switch (stageNo) {
  case 0 :
    amalgamateSplitPointData();
    break;
  case 1 :
    // not necessarily any action at this point
    break;
  case 2 :
    combineBuckets();
    break;
  default :
    throw new RuntimeException("Don't expect to be called at stage " + stageNo);
  }
  stageNo++;
}

Putting things together with CyclicBarrier

Armed with our outline worker thread and our methods to amalgamate the data from each stage (OK, bar actually implementing the above methods...), we now put things together with a CyclicBarrier. Typically, we'll have some method that is called to actually start the process off, and we might want that method to return only once the sort is complete. So we'll actually make the caller thread an extra participant in the barrier. The process will thus be as follows:

The code looks something as follows:

private final int noThreads = Runtime.getRuntime().availableProcessors();
private final CyclicBarrier barrier =
  new CyclicBarrier(noThreads + 1, new Runnable() {
    public void run() {
      sortStageComplete();
    }
  });

for (int i = 0; i < noThreads; i++) {
  SorterThread thr = new SorterThread(i);
  thr.start();
}
try {
  barrier.await();
  barrier.await();
  barrier.await();
} catch (BrokenBarrierException bb) {
  // process was interrupted for some reason
}

If the thread setting up the operation didn't specifically need to know about completion of the task or interruptions thereof, then it of course need not actually participate in the barrier. Any of the other regular means for communicating between threads could also be used if they serve your purpose, such as a CountDownLatch.

Next: error handling and implementing the parallel sort

That's essentially it for the high-level overview of CyclicBarrier. An issue that you may wish to deal with is error handling: notifying the controller thread of what exception caused the barrier to be broken, rather than simply the fact that it was broken.

Then, on the following pages, we consider how to implement the parallel sort, which basically means filling in the methods in the above skeleton code.


If you enjoy this Java programming article, please share with friends and colleagues. Follow the author on Twitter for the latest news and rants.

Editorial page content written by Neil Coffey. Copyright © Javamex UK 2021. All rights reserved.