Java threading introduction  Thread-safety  Thread methods  Interruption  Thread scheduling  Context switching  Thread priorities  sleep()  yield()  Deadlock  Threading with Swing  invokeLater()  Thread pools  CoundDownLatch  ThreadPoolExecutor  CyclicBarrier

CyclicBarrier example: a parallel sort algorithm (ctd)

(Continued from our explanation of implementing a parallel sort with a CyclicBarrier.)

The second stage of sorting is for each thread to go through a subset of the data and put each item from that subset into its relevant "bucket". Things are pretty much what they say on the tin. A couple of interesting points to note in the implementation below are that each thread first builds up a set of thread-local buckets, then at the end adds its thread-local buckets to the "master" buckets. That's just designed to reduce contention on the master buckets during the main part of the operation. The other part worthy of note is the use of Collections.binarySearch(). Normally, Collections.binarySearch() is used when we want to insert a new item into an already-ordered list, while still maintaining order after the item is inserted. In this case, we use it on the order list of split values (without ever inserting), and it in effect tells us the index of the split value that a given item comes before (or is equal to). The strange comparison with zero is just because Collections.binarySearch() returns a negative number if the item is not already in the list, and positive if it is.

private void assignItemsToBuckets(List data,
    int threadNo, int startPos, int endPos) {
  List<E> spl;
  synchronized (splitPoints) {
    spl = new ArrayList(splitPoints);
  }
  List<List<E>> bucketData = new ArrayList>(noThreads);
  for (int i = 0; i < noThreads; i++) {
    bucketData.add(new ArrayList(dataSize /
        (noThreads * noThreads)));
  }

  Lock lck = dataLock.readLock();
  lck.lock();
  try {
    for (int i = startPos; i < endPos; i++) {
      E item = data.get(i);
      int bucket = Collections.binarySearch(spl, item);
      if (bucket < 0)
        bucket = (-bucket) -1;
      if (bucket >= noThreads)
        bucket = noThreads-1;
      bucketData.get(bucket).add(item);
    }
  } finally {
    lck.unlock();
  }
  for (int i = 0; i < noThreads; i++) {
    List l = bucketsToSort.get(i);
    synchronized (l) {
      l.addAll(bucketData.get(i));
    }
  }
}

At the end of this second stage, the CyclicBarrier calls our sortStageComplete method, which then calls clearData(). This simply calls clear() on the data list, having first remembered to obtain the write lock while doing so:

  private void clearData() {
    Lock lck = dataLock.writeLock();
    lck.lock();
    try {
      data.clear();
    } finally {
      lck.unlock();
    }
  }

The only reason for clearing the data list here is because "we may as well": the references to the data objects are now safely the various buckets, so we may as well free up the memory used by the list while the sorting is taking place. Aside from the memory consideration, we could just leave data is it is, and clear it at the beginning of the combineBuckets() method (see next paragraph).

The third stage of actually sorting each bucket is disappointingly boring: we just call Collections.sort() on the given bucket, having remembered to synchronize first on that particuar bucket (it's just the sortMyBucket() method given on the previous page). At the end of the sorting phase, the CyclicBarrier will call our sortStageComplete() method again, which this time calls combineBuckets(). This simply adds each thread's bucket back to data. Again, the only moderately tricky thing is remembering to acquire the appropriate locks:

  private void combineBuckets() {
    Lock lck = dataLock.writeLock();
    lck.lock();
    try {
      for (int i = 0; i < noThreads; i++) {
        List l = bucketsToSort.get(i);
        synchronized (l) {
          data.addAll(l);
        }
      }
    } finally {
      lck.unlock();
    }
  }

And finally...

That's more or less it. We're just missing the controller method that the caller will invoke to actually perform the sort. Essentially, we're going to just start the sorter threads going, wait at the barrier three times, and handle any resulting exceptions. We also add a slight "sanity check" at the beginning of the sort method: if the size of the list to be sorted is too small to warrant the parallel sort, then we just call boring old Collections.sort() on it. The resulting code is as follows:

public void sort() throws InterruptedException {
  // See if it's not worth doing a parallel sort
  Lock l = dataLock.writeLock();
  l.lockInterruptibly();
  try {
    if (data.size() < noSamplesPerThread * 4 * noThreads) {
      Collections.sort(data);
      return;
    }
  } finally {
     l.unlock();
  }
    
  // Start sorter threads going
  List threads = new ArrayList(noThreads);
  for (int i = 0; i < noThreads; i++) {
    SorterThread thr = new SorterThread(i);
    threads.add(thr);
    thr.start();
  }
  
  // Wait for sorter threads
  try {
    barrier.await();
    barrier.await();
    barrier.await();
  } catch (BrokenBarrierException bb) {
    // Find the error that caused the broken barrier
    for (int i = 0; i < noThreads; i++) {
      SorterThread thr = threads.get(i);
      Throwable t = thr.error;
      if (t != null)
        throw new RuntimeException("Error during sort", t);
    }
    if (completionStageError != null)
      throw completionStageError;
    else
      throw new RuntimeException("Misc error during sort", bb);
  }
}

And there you have it— finally! You may be interested in reviewing some of the topics that arose during this section:


 Java threading articles  Java threading and concurrency  Java profiling  Java performance graph index

Unless otherwise stated, the Java programming articles and tutorials on this site are written by Neil Coffey. Suggestions are always welcome if you wish to suggest topics for Java tutorials or programming articles, or if you simply have a programming question that you would like to see answered on this site. Most topics will be considered. But in particular, the site aims to provide tutorials and information on topics that aren't well covered elsewhere, or on Java performance information that is poorly described or understood. Suggestions may be made via the Javamex blog (see the site's front page for details).
Copyright © Javamex UK 2016. All rights reserved.