Home  Synchronization and concurrency  wait/notify  final  volatile  synchronized keyword  Java threading  Deadlock (and avoiding it)  Java 5: ConcurrentHashMap  Atomic variables  Explicit locks  Queues  Semaphores  CountDownLatch  CyclicBarrier

BlockingQueue example:
a background logger thread

A simple use of a BlockingQueue is where we want one or more threads to pass jobs to some "background" or "processing" thread. The processing thread will sit waiting for jobs and execute them one at a time. On a server, for example, we might want to perform "lazy logging":

  • we want a busy thread to be able to add a string to the queue of "things to be logged";
  • at moments when the server is less busy, a logger thread will then pick strings off the queue and actually log them to the console (or disk or "place where things are logged"...).

With a BlockingQueue, the task becomes simple. The logger thread holds a queue instance, and its run() method affectively sits waiting for things to log until it is told to shut down. Then from other threads, we call a method to add an item to the queue. The BlockingQueue implementation will handle thread-safety and the actual notification to the logger thread when an item is added to the queue.

Creating the thread

Our logger will run in its own thread and so our logger class will be an extension of Thread. We won't dwell too much on thread creation in Java, which is covered in a separate section of this web site. However, we will mention that we want our logger to be a singleton: there'll be a maximum of one static instance of it. So the outer shell of the class looks as follows:

public class LoggerThread extends Thread {
  private static final LoggerThread instance = new LoggerThread();

  public static LoggerThread getLogger() {
    return instance;
  }
  private LoggerThread() {
    start();
  }
}

With this pattern, all accesses to the logger thread must be made via the getLogger() method. And the first caller to getLogger() will actually cause the logger thread to be started.

Inside the LoggerThread constructor, we could consider setting options such as the thread name and thread priority though, as discussed in the latter article, thread priority actually means different things on different systems.

Constructing the queue

First, we need to create our queue. There are two flavours of "normal" blocking queue: a LinkedBlockingQueue, which uses a linked list as its internal structure and thus has no fixed capacity, and an ArrayBlockingQueue, which uses a fixed array and thus has a maximum capacity. In this case, we go for a fixed capacity, but that's just an arbitrary design decision. We'll create a queue with space for up to 100 strings:

public class LoggerThread extends Thread {
  private BlockingQueue<String> itemsToLog =
    new ArrayBlockingQueue<String>(100);
  ...
}

In this case, if the queue filled up quicker than the logger thread could process the strings, then further threads trying to add strings to the queue would either hang until the logger caught up, or just drop the surplus strings. Which behaviour is adopted depends on which method we decide to call when adding a string to the queue, and is thus a design decision we have to make. Arguably, ArrayBlockingQueue is also slightly more efficient in terms of object overhead, though that's not such a great concern here: we won't be logging so many things per second!

Notice that the queue, like much of the collections framework, is parametrised: thanks to Java 5 generics, we can declare it as a queue of Strings and from then on avoid ugly casts.

Notice that, like blocking methods in Java in general, the take() method could be interrupted. If that ever happened, we just let the interruption cause the thread's run method to terminate.

Pulling items off the queue

Now we need a run() method with a loop that continually takes the next item off the queue and logs it. We need to solve two main issues:

  • when there's no item on the queue, we need wo wait for one to appear;
  • we need a mechanism for shutting down the logger.

First, the problem of waiting. Recall that in a non-blocking queue, the possibilities would have been remove() or poll(). But these methods return immediately (either with an item if there's one on the queue, or else a null return value or exception if the queue is empty). To get round this, the BlockingQueue provides a take() method. This method will wait for an item to appear if there is none on the queue. It then returns the item at the head of the queue.

At some point, we assume that the logger will be shut down "cleanly" in response to the user quitting the application. At that moment, we want the logger to log all pending strings on the queue before shutting down. One way to handle this is to post a special object to the queue that is a signal for the logger thread to shut down. Our run() method then looks as follows:

public class LoggerThread extends Thread {
  private static final String SHUTDOWN_REQ = "SHUTDOWN";
  private volatile boolean shuttingDown, loggerTerminated;
  ...
  // Sit in a loop, pulling strings off the queue and logging
  public void run() {
    try {
      String item;
      while ((item = itemsToLog.take()) != SHUTDOWN_REQ) {
        System.out.println(item);
      }
    } catch (InterruptedException iex) {
    } finally {
      loggerTerminated = true;
    }
  }
}

So in each iteration of the while loop, we wait for and take the next item on the queue. If that's the special "shutdown request" string, then we exit the while loop and let the run() method exit (thus terminating the logger thread). Otherwise, we print the string. When the logger eventually does terminate, we set a flag to say so, which we'll come back to in a moment. (See the separate section for information on why this flag is declared as a volatile variable.)

Adding items to the queue

Now we need to write our log() method, which we can call from any thread to add a string to the queue for subsequent logging. The method we use on the blocking queue for this is put(). This adds an item immediately to the queue if it has space, else waits for space to become available:

  public void log(String str) {
    if (shuttingDown || loggerTerminated) return;
    try {
      itemsToLog.put(str);
    } catch (InterruptedException iex) {
      Thread.currentThread().interrupt();
      throw new RuntimeException("Unexpected interruption");
    }
  }

When we add an item to the queue, the logic of BlockingQueue will automatically handle "waking up" the logger thread (or any thread waiting for a job on the queue). This means that some time in the future, when the logger thread is scheduled in, it will pick up the item that we added to the queue (and if already running, say, on another processor, it is likely to pick it up immediately).

Again, because it's a blocking operation, we must handle the possibility of put() being interrupted. We could declare our log() method to throw InterruptionException. But requiring every log operation to handle this exception makes for a slightly messy API for what will be an unlikely scenario in practice. So we just re-cast the exception as a RuntimeException (which we don't have to explicitly declare and which the caller doesn't have to explicitly catch), remembering to re-mark the current thread as interrupted, in case the handler that eventually catches the RuntimeException wants to know this. (For more details on why this is good practice, see the section on thread interruption.)

Notice the first line of the method, where we check if the logger has shut down: if it has, then nothing is going to pull jobs off the queue, and the put() method would block forever if the queue was empty.

Shutting down the queue

We've really already dealt with this issue: we saw that our run() method waits for a special "shutdown" notification to be posted to the queue. So we just need our shutdown method to post this object to the queue:

public void shutDown() throws InterruptedException {
  shuttingDown = true;
  itemsToLog.put(SHUTDOWN_REQ);
}

We also set a "shutting down" flag, checked from the log() method, so that more messages won't be posted to the queue from this moment onwards.

In the design we've chosen, we assume that there'll be a well-defined "shutdown routine" in our application which will call the logger's shutDown() method, presumably towards the end of the shutdown process. Other things we could think about in a more sophisticated implementation include:

  • what happens if a string is posted to the queue after a shutdown request has been issued? (in our implementation, such strings will effectively be ignored)
  • what if the application is shut down at various arbitrary moments?: we could consider using a shutdown hook, and/or making the logger thread a daemon thread.

Article written by Neil Coffey (@BitterCoffey).

Software

 LetterMeister (word puzzle game for iPhone)
 Currency Quoter (currency converter/predictor)
 French Vocab Games for iPhone/iPad
 Vocabularium: create Spanish vocab podcasts


Java programming articles and tutorials on this site are written by Neil Coffey (@BitterCoffey). Suggestions are always welcome if you wish to suggest topics for Java tutorials or programming articles, or if you simply have a programming question that you would like to see answered on this site. Most topics will be considered. But in particular, the site aims to provide tutorials and information on topics that aren't well covered elsewhere, or on Java performance information that is poorly described or understood. Suggestions may be made via the Javamex blog (see the site's front page for details).
Copyright © Neil Coffey 2014. All rights reserved.