Thread Pools

Thread Pools are useful when you need to limit the number of threads running in your application at the same time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed. Internally the tasks are inserted into aBlocking Queuewhich the threads in the pool are dequeuing from. When a new task is inserted into the queue one of the idle threads will dequeue it successfully and execute it. The rest of the idle threads in the pool will be blocked waiting to dequeue tasks.

Thread pools are often used in multi threaded servers. Each connection arriving at the server via the network is wrapped as a task and passed on to a thread pool. The threads in the thread pool will process the requests on the connections concurrently. A later trail will get into detail about implementing multithreaded servers in Java.

Java 5 comes with built in thread pools in thejava.util.concurrentpackage, so you don't have to implement your own thread pool. You can read more about it in my text on thejava.util.concurrent.ExecutorService. Still it can be useful to know a bit about the implementation of a thread pool anyways.

Here is a simple thread pool implementation. Please note that this implementation uses my ownBlockingQueueclass as explained in myBlocking Queuestutorial. In a real life implementation you would probably use one of Java's built-in blocking queues instead.

public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThread> threads = new ArrayList<PoolThread>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        taskQueue = new BlockingQueue(maxNoOfTasks);

        for(int i=0; i<noOfThreads; i++){
            threads.add(new PoolThread(taskQueue));
        }
        for(PoolThread thread : threads){
            thread.start();
        }
    }

    public synchronized void  execute(Runnable task) throws Exception{
        if(this.isStopped) throw
            new IllegalStateException("ThreadPool is stopped");

        this.taskQueue.enqueue(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThread thread : threads){
           thread.doStop();
        }
    }

}
public class PoolThread extends Thread {

    private BlockingQueue taskQueue = null;
    private boolean       isStopped = false;

    public PoolThread(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        this.interrupt(); //break pool thread out of dequeue() call.
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

The thread pool implementation consists of two parts. AThreadPoolclass which is the public interface to the thread pool, and aPoolThreadclass which implements the threads that execute the tasks.

To execute a task the methodThreadPool.execute(Runnable r)is called with aRunnableimplementation as parameter. TheRunnableis enqueued in theblocking queueinternally, waiting to be dequeued.

TheRunnablewill be dequeued by an idlePoolThreadand executed. You can see this in thePoolThread.run()method. After execution thePoolThreadloops and tries to dequeue a task again, until stopped.

To stop theThreadPoolthe methodThreadPool.stop()is called. The stop called is noted internally in theisStoppedmember. Then each thread in the pool is stopped by callingdoStop()on each thread. Notice how theexecute()method will throw anIllegalStateExceptionifexecute()is called afterstop()has been called.

The threads will stop after finishing any task they are currently executing. Notice thethis.interrupt()call inPoolThread.doStop(). This makes sure that a thread blocked in await()call inside thetaskQueue.dequeue()call breaks out of thewait()call, and leaves thedequeue()method call with anInterruptedExceptionthrown. This exception is caught in thePoolThread.run()method, reported, and then theisStoppedvariable is checked. SinceisStoppedis now true, thePoolThread.run()will exit and the thread dies.

results matching ""

    No results matching ""