View Javadoc

1   /*
2    * This file is part of Domingo
3    * an Open Source Java-API to Lotus Notes/Domino
4    * hosted at http://domingo.sourceforge.net
5    *
6    * Copyright (c) 2003-2007 Beck et al. projects GmbH Munich, Germany (http://www.bea.de)
7    *
8    * This library is free software; you can redistribute it and/or
9    * modify it under the terms of the GNU Lesser General Public
10   * License as published by the Free Software Foundation; either
11   * version 2.1 of the License, or (at your option) any later version.
12   *
13   * This library is distributed in the hope that it will be useful,
14   * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   * Lesser General Public License for more details.
17   *
18   * You should have received a copy of the GNU Lesser General Public
19   * License along with this library; if not, write to the Free Software
20   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21   */
22  
23  package de.bea.domingo.threadpool;
24  
25  import de.bea.domingo.DNotesMonitor;
26  import de.bea.domingo.i18n.ResourceManager;
27  import de.bea.domingo.i18n.Resources;
28  import de.bea.domingo.monitor.ConsoleMonitor;
29  import de.bea.domingo.proxy.DNotesThread;
30  import de.bea.domingo.queue.MTQueue;
31  
32  /***
33   * A simple implementation of a ThreadPool which is constructed with a given
34   * number of threads.
35   *
36   * @author <a href="mailto:kriede@users.sourceforge.net">Kurt Riede</a>
37   */
38  public final class SimpleThreadPool implements Runnable, ThreadPool {
39  
40      /*** Maximum amount of time to wait for a task from the queue in milli seconds. */
41      public static final int MAX_WAIT_FOR_TASK = 10000;
42  
43      /*** Maximum amount of time to wait for a thread to stop. */
44      public static final int MAX_WAIT_FOR_STOP = 5000;
45  
46      /*** Default number of threads in the thread pool. */
47      public static final int DEFAULT_NUM_THREAD = 1;
48  
49      /*** Internationalized resources. */
50      private static final Resources RESOURCES = ResourceManager.getPackageResources(SimpleThreadPool.class);
51  
52      /*** Reference to the associated queue. */
53      private final MTQueue queue;
54  
55      /*** Indicates if the ThreadPool has been stopped. */
56      private boolean stopped = false;
57  
58      /*** Number of threads that should be running. */
59      private int size = 0;
60  
61      /*** Number of threads currently running. */
62      private int threadCount = 0;
63  
64      /*** Priority of threads in the pool. */
65      private int threadPriority = 0;
66  
67      /*** Reference to the associated monitor. */
68      private final DNotesMonitor monitor;
69  
70      /*** Reference to the thread factory. */
71      private ThreadFactory threadFactory;
72  
73      /*** First exception during initialization. */
74      private Throwable initException;
75  
76      private boolean initialized;
77  
78      private Object mutex = new Object();
79  
80      /***
81       * Constructor.
82       * @throws ThreadPoolException if any error occurs during starting threads
83       */
84      public SimpleThreadPool() throws ThreadPoolException {
85          this(null, DEFAULT_NUM_THREAD);
86      }
87  
88      /***
89       * Constructor.
90       *
91       * @param monitor the monitor
92       * @throws ThreadPoolException if any error occurs during starting threads
93       */
94      public SimpleThreadPool(final DNotesMonitor monitor) throws ThreadPoolException {
95          this(monitor, DEFAULT_NUM_THREAD);
96      }
97  
98      /***
99       * Constructor.
100      *
101      * @param numberOfThreads number of threads in pool
102      * @throws ThreadPoolException if any error occurs during starting threads
103      */
104     public SimpleThreadPool(final int numberOfThreads) throws ThreadPoolException {
105         this(1, numberOfThreads);
106     }
107 
108     /***
109      * Constructor.
110      *
111      * @param monitor ThreadPool Monitor
112      * @param numberOfThreads number of threads in pool
113      * @throws ThreadPoolException if any error occurs during starting threads
114      * @throws ThreadPoolException if any error occurs during starting threads
115      */
116     public SimpleThreadPool(final DNotesMonitor monitor, final int numberOfThreads) throws ThreadPoolException {
117         this(monitor, numberOfThreads, Thread.NORM_PRIORITY);
118     }
119 
120     /***
121      * Constructor.
122      *
123      * @param numberOfThreads number of threads in pool
124      * @param threadPriority priority of threads in pool
125      * @throws ThreadPoolException if any error occurs during starting threads
126      */
127     public SimpleThreadPool(final int numberOfThreads, final int threadPriority) throws ThreadPoolException {
128         this(new ConsoleMonitor(), numberOfThreads, threadPriority);
129     }
130 
131     /***
132      * Constructor.
133      *
134      * @param monitor ThreadPool Monitor
135      * @param numberOfThreads number of threads in pool
136      * @param threadPriority priority of threads in pool
137      * @throws ThreadPoolException if any error occurs during starting threads
138      */
139     public SimpleThreadPool(final DNotesMonitor monitor, final int numberOfThreads,
140                             final int threadPriority) throws ThreadPoolException {
141         this(monitor, null, numberOfThreads, threadPriority);
142     }
143 
144     /***
145      * Constructor.
146      *
147      * @param monitor ThreadPool Monitor
148      * @param threadFactory ThreadFactory to us to create the new Threads
149      * @param numberOfThreads number of threads in pool
150      * @throws ThreadPoolException if any error occurs during starting threads
151      */
152     public SimpleThreadPool(final DNotesMonitor monitor, final ThreadFactory threadFactory,
153                             final int numberOfThreads) throws ThreadPoolException {
154         this(monitor, threadFactory, numberOfThreads, Thread.NORM_PRIORITY);
155     }
156 
157     /***
158      * Constructor.
159      *
160      * @param theMonitor ThreadPool monitor
161      * @param theThreadFactory ThreadFactory to us to create the new Threads
162      * @param theNumberOfThreads number of threads in pool
163      * @param theThreadPriority priority of threads in pool
164      * @throws ThreadPoolException if any error occurs during starting threads
165      */
166     public SimpleThreadPool(final DNotesMonitor theMonitor, final ThreadFactory theThreadFactory,
167                             final int theNumberOfThreads, final int theThreadPriority) throws ThreadPoolException {
168         threadFactory = theThreadFactory;
169         size = theNumberOfThreads;
170         threadPriority = theThreadPriority;
171         if (theMonitor != null) {
172             monitor = theMonitor;
173         } else {
174             monitor = new ConsoleMonitor();
175         }
176         if (theThreadFactory != null) {
177             threadFactory = theThreadFactory;
178         } else {
179             threadFactory = new DefaultThreadFactory(theMonitor);
180         }
181         queue = new MTQueue(mutex, monitor);
182         initThreads(theNumberOfThreads, threadPriority);
183     }
184 
185 
186     ////////////////////////////////////////////////
187     //    private helper methods
188     ////////////////////////////////////////////////
189 
190     /***
191      * Initialize a number of threads in the pool.
192      *
193      * @param count number of threads in pool
194      * @throws ThreadPoolException if any error occurs during starting threads
195      */
196     private void initThreads(final int count) throws ThreadPoolException {
197         initThreads(count, Thread.NORM_PRIORITY);
198     }
199 
200     /***
201      * Initialize a number of threads in the pool with given priority.
202      *
203      * @param count number of threads in pool
204      * @param priority priority of threads in pool
205      * @throws ThreadPoolException if any error occurs during starting threads
206      */
207     private void initThreads(final int count, final int priority) throws ThreadPoolException {
208         for (int i = 0; i < count; i++) {
209             startThread(priority);
210         }
211     }
212 
213     /***
214      * Create and start a new thread.
215      *
216      * @param priority priority of new thread
217      * @return new Thread
218      * @throws ThreadPoolException if any error occurs during starting threads
219      */
220     private Thread startThread(final int priority) throws ThreadPoolException {
221         initialized = false;
222         initException = null;
223         final DNotesThread thread = (DNotesThread) threadFactory.createThread(this);
224         if (priority != Thread.NORM_PRIORITY) {
225             thread.setPriority(priority);
226         }
227         try {
228             thread.start();
229         } catch (Throwable t) {
230             throw new ThreadPoolException("Thread cannot be started", t);
231         }
232         synchronized (mutex) {
233             while (!isInitialized() && !isFailed()) {
234                 try {
235                     mutex.wait();
236                 } catch (InterruptedException e) {
237                     // ignore the interrupt exception an continue
238                 }
239             }
240         }
241         if (isFailed()) {
242             throw new ThreadPoolException("Cannot start thread", initException);
243         }
244         return thread;
245     }
246 
247     private boolean isInitialized() {
248         synchronized (mutex) {
249             return initialized;
250         }
251     }
252 
253     private boolean isFailed() {
254         synchronized (mutex) {
255             return initException != null;
256         }
257     }
258 
259     ////////////////////////////////////////////////
260     //    public methods
261     ////////////////////////////////////////////////
262 
263     /***
264      * Returns the number of currently active threads.
265      *
266      * @return number of currently active threads
267      */
268     private int getThreadCount() {
269         return threadCount;
270     }
271 
272     /***
273      * Returns number of runnable objects in the queue.
274      * @return number of objects in the queue
275      */
276     public int getRunnableCount() {
277         return queue.size();
278     }
279 
280     ////////////////////////////////////////////////
281     //    interface ThreadPool
282     ////////////////////////////////////////////////
283 
284     /***
285      * Dispatch a new task onto this pool to be invoked asynchronously later.
286      *
287      * @param task the task to execute
288      */
289     public void invokeLater(final Runnable task) {
290         if (stopped) {
291             throw new IllegalStateException(RESOURCES.getString("threadpool.not.started"));
292         }
293         queue.enqueue(task);
294     }
295 
296     /***
297      * @see de.bea.domingo.threadpool.ThreadPool#stop()
298      */
299     public void stop() {
300         synchronized (mutex) {
301             stopped = true;
302             synchronized (queue) {
303                 mutex.notifyAll();
304             }
305             while (getThreadCount() > 0) {
306                 try {
307                     mutex.wait(MAX_WAIT_FOR_STOP);
308                 } catch (InterruptedException e) {
309                     monitor.debug(RESOURCES.getString("threadpool.wait.stop"));
310                 }
311             }
312             monitor.info(RESOURCES.getString("threadpool.stopped"));
313         }
314     }
315 
316     /***
317      * {@inheritDoc}
318      * @see de.bea.domingo.threadpool.ThreadPool#resize(int)
319      */
320     public void resize(final int newSize) throws ThreadPoolException {
321         synchronized (mutex) {
322             size = newSize;
323             if (size > threadCount) {
324                 initThreads(size - threadCount);
325             } else if (size < threadCount) {
326                 while (threadCount > size) {
327                     try {
328                         mutex.wait(MAX_WAIT_FOR_STOP);
329                     } catch (InterruptedException e) {
330                         monitor.debug(RESOURCES.getString("threadpool.wait.resize"));
331                     }
332                 }
333             }
334         }
335     }
336 
337     ////////////////////////////////////////////////
338     //    interface Runnable
339     ////////////////////////////////////////////////
340 
341     /***
342      * The method ran by the pool of background threads.
343      */
344     public void run() {
345         monitor.info(RESOURCES.getString("thread.started"));
346         try {
347             threadFactory.initThread();
348         } catch (Throwable t) {
349             synchronized (mutex) {
350                 initException = t;
351                 mutex.notifyAll();
352             }
353             return;
354         }
355         synchronized (mutex) {
356             initialized = true;
357             threadCount++;
358             mutex.notifyAll();
359         }
360         while (!stopped) {
361             synchronized (mutex) {
362                 if (threadCount > size) {
363                     break;
364                 }
365             }
366             final Runnable task = (Runnable) queue.dequeue(MAX_WAIT_FOR_TASK);
367             if (task != null) {
368                 try {
369                     task.run();
370                 } catch (Throwable t) {
371                     monitor.fatalError(RESOURCES.getString("task.execute.failed"), t);
372                 }
373                 synchronized (task) {
374                     task.notifyAll();
375                 }
376             }
377         }
378         synchronized (mutex) {
379             threadCount--;
380         }
381         try {
382             threadFactory.termThread();
383         } catch (Throwable t) {
384             threadFactory.handleThrowable(t);
385         }
386         monitor.info(RESOURCES.getString("thread.stopped"));
387         synchronized (mutex) {
388             mutex.notifyAll();
389         }
390     }
391 }