1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package de.bea.domingo.threadpool;
24
25 import de.bea.domingo.DNotesMonitor;
26 import de.bea.domingo.i18n.ResourceManager;
27 import de.bea.domingo.i18n.Resources;
28 import de.bea.domingo.monitor.ConsoleMonitor;
29 import de.bea.domingo.proxy.DNotesThread;
30 import de.bea.domingo.queue.MTQueue;
31
32 /***
33 * A simple implementation of a ThreadPool which is constructed with a given
34 * number of threads.
35 *
36 * @author <a href="mailto:kriede@users.sourceforge.net">Kurt Riede</a>
37 */
38 public final class SimpleThreadPool implements Runnable, ThreadPool {
39
40 /*** Maximum amount of time to wait for a task from the queue in milli seconds. */
41 public static final int MAX_WAIT_FOR_TASK = 10000;
42
43 /*** Maximum amount of time to wait for a thread to stop. */
44 public static final int MAX_WAIT_FOR_STOP = 5000;
45
46 /*** Default number of threads in the thread pool. */
47 public static final int DEFAULT_NUM_THREAD = 1;
48
49 /*** Internationalized resources. */
50 private static final Resources RESOURCES = ResourceManager.getPackageResources(SimpleThreadPool.class);
51
52 /*** Reference to the associated queue. */
53 private final MTQueue queue;
54
55 /*** Indicates if the ThreadPool has been stopped. */
56 private boolean stopped = false;
57
58 /*** Number of threads that should be running. */
59 private int size = 0;
60
61 /*** Number of threads currently running. */
62 private int threadCount = 0;
63
64 /*** Priority of threads in the pool. */
65 private int threadPriority = 0;
66
67 /*** Reference to the associated monitor. */
68 private final DNotesMonitor monitor;
69
70 /*** Reference to the thread factory. */
71 private ThreadFactory threadFactory;
72
73 /*** First exception during initialization. */
74 private Throwable initException;
75
76 private boolean initialized;
77
78 private Object mutex = new Object();
79
80 /***
81 * Constructor.
82 * @throws ThreadPoolException if any error occurs during starting threads
83 */
84 public SimpleThreadPool() throws ThreadPoolException {
85 this(null, DEFAULT_NUM_THREAD);
86 }
87
88 /***
89 * Constructor.
90 *
91 * @param monitor the monitor
92 * @throws ThreadPoolException if any error occurs during starting threads
93 */
94 public SimpleThreadPool(final DNotesMonitor monitor) throws ThreadPoolException {
95 this(monitor, DEFAULT_NUM_THREAD);
96 }
97
98 /***
99 * Constructor.
100 *
101 * @param numberOfThreads number of threads in pool
102 * @throws ThreadPoolException if any error occurs during starting threads
103 */
104 public SimpleThreadPool(final int numberOfThreads) throws ThreadPoolException {
105 this(1, numberOfThreads);
106 }
107
108 /***
109 * Constructor.
110 *
111 * @param monitor ThreadPool Monitor
112 * @param numberOfThreads number of threads in pool
113 * @throws ThreadPoolException if any error occurs during starting threads
114 * @throws ThreadPoolException if any error occurs during starting threads
115 */
116 public SimpleThreadPool(final DNotesMonitor monitor, final int numberOfThreads) throws ThreadPoolException {
117 this(monitor, numberOfThreads, Thread.NORM_PRIORITY);
118 }
119
120 /***
121 * Constructor.
122 *
123 * @param numberOfThreads number of threads in pool
124 * @param threadPriority priority of threads in pool
125 * @throws ThreadPoolException if any error occurs during starting threads
126 */
127 public SimpleThreadPool(final int numberOfThreads, final int threadPriority) throws ThreadPoolException {
128 this(new ConsoleMonitor(), numberOfThreads, threadPriority);
129 }
130
131 /***
132 * Constructor.
133 *
134 * @param monitor ThreadPool Monitor
135 * @param numberOfThreads number of threads in pool
136 * @param threadPriority priority of threads in pool
137 * @throws ThreadPoolException if any error occurs during starting threads
138 */
139 public SimpleThreadPool(final DNotesMonitor monitor, final int numberOfThreads,
140 final int threadPriority) throws ThreadPoolException {
141 this(monitor, null, numberOfThreads, threadPriority);
142 }
143
144 /***
145 * Constructor.
146 *
147 * @param monitor ThreadPool Monitor
148 * @param threadFactory ThreadFactory to us to create the new Threads
149 * @param numberOfThreads number of threads in pool
150 * @throws ThreadPoolException if any error occurs during starting threads
151 */
152 public SimpleThreadPool(final DNotesMonitor monitor, final ThreadFactory threadFactory,
153 final int numberOfThreads) throws ThreadPoolException {
154 this(monitor, threadFactory, numberOfThreads, Thread.NORM_PRIORITY);
155 }
156
157 /***
158 * Constructor.
159 *
160 * @param theMonitor ThreadPool monitor
161 * @param theThreadFactory ThreadFactory to us to create the new Threads
162 * @param theNumberOfThreads number of threads in pool
163 * @param theThreadPriority priority of threads in pool
164 * @throws ThreadPoolException if any error occurs during starting threads
165 */
166 public SimpleThreadPool(final DNotesMonitor theMonitor, final ThreadFactory theThreadFactory,
167 final int theNumberOfThreads, final int theThreadPriority) throws ThreadPoolException {
168 threadFactory = theThreadFactory;
169 size = theNumberOfThreads;
170 threadPriority = theThreadPriority;
171 if (theMonitor != null) {
172 monitor = theMonitor;
173 } else {
174 monitor = new ConsoleMonitor();
175 }
176 if (theThreadFactory != null) {
177 threadFactory = theThreadFactory;
178 } else {
179 threadFactory = new DefaultThreadFactory(theMonitor);
180 }
181 queue = new MTQueue(mutex, monitor);
182 initThreads(theNumberOfThreads, threadPriority);
183 }
184
185
186
187
188
189
190 /***
191 * Initialize a number of threads in the pool.
192 *
193 * @param count number of threads in pool
194 * @throws ThreadPoolException if any error occurs during starting threads
195 */
196 private void initThreads(final int count) throws ThreadPoolException {
197 initThreads(count, Thread.NORM_PRIORITY);
198 }
199
200 /***
201 * Initialize a number of threads in the pool with given priority.
202 *
203 * @param count number of threads in pool
204 * @param priority priority of threads in pool
205 * @throws ThreadPoolException if any error occurs during starting threads
206 */
207 private void initThreads(final int count, final int priority) throws ThreadPoolException {
208 for (int i = 0; i < count; i++) {
209 startThread(priority);
210 }
211 }
212
213 /***
214 * Create and start a new thread.
215 *
216 * @param priority priority of new thread
217 * @return new Thread
218 * @throws ThreadPoolException if any error occurs during starting threads
219 */
220 private Thread startThread(final int priority) throws ThreadPoolException {
221 initialized = false;
222 initException = null;
223 final DNotesThread thread = (DNotesThread) threadFactory.createThread(this);
224 if (priority != Thread.NORM_PRIORITY) {
225 thread.setPriority(priority);
226 }
227 try {
228 thread.start();
229 } catch (Throwable t) {
230 throw new ThreadPoolException("Thread cannot be started", t);
231 }
232 synchronized (mutex) {
233 while (!isInitialized() && !isFailed()) {
234 try {
235 mutex.wait();
236 } catch (InterruptedException e) {
237
238 }
239 }
240 }
241 if (isFailed()) {
242 throw new ThreadPoolException("Cannot start thread", initException);
243 }
244 return thread;
245 }
246
247 private boolean isInitialized() {
248 synchronized (mutex) {
249 return initialized;
250 }
251 }
252
253 private boolean isFailed() {
254 synchronized (mutex) {
255 return initException != null;
256 }
257 }
258
259
260
261
262
263 /***
264 * Returns the number of currently active threads.
265 *
266 * @return number of currently active threads
267 */
268 private int getThreadCount() {
269 return threadCount;
270 }
271
272 /***
273 * Returns number of runnable objects in the queue.
274 * @return number of objects in the queue
275 */
276 public int getRunnableCount() {
277 return queue.size();
278 }
279
280
281
282
283
284 /***
285 * Dispatch a new task onto this pool to be invoked asynchronously later.
286 *
287 * @param task the task to execute
288 */
289 public void invokeLater(final Runnable task) {
290 if (stopped) {
291 throw new IllegalStateException(RESOURCES.getString("threadpool.not.started"));
292 }
293 queue.enqueue(task);
294 }
295
296 /***
297 * @see de.bea.domingo.threadpool.ThreadPool#stop()
298 */
299 public void stop() {
300 synchronized (mutex) {
301 stopped = true;
302 synchronized (queue) {
303 mutex.notifyAll();
304 }
305 while (getThreadCount() > 0) {
306 try {
307 mutex.wait(MAX_WAIT_FOR_STOP);
308 } catch (InterruptedException e) {
309 monitor.debug(RESOURCES.getString("threadpool.wait.stop"));
310 }
311 }
312 monitor.info(RESOURCES.getString("threadpool.stopped"));
313 }
314 }
315
316 /***
317 * {@inheritDoc}
318 * @see de.bea.domingo.threadpool.ThreadPool#resize(int)
319 */
320 public void resize(final int newSize) throws ThreadPoolException {
321 synchronized (mutex) {
322 size = newSize;
323 if (size > threadCount) {
324 initThreads(size - threadCount);
325 } else if (size < threadCount) {
326 while (threadCount > size) {
327 try {
328 mutex.wait(MAX_WAIT_FOR_STOP);
329 } catch (InterruptedException e) {
330 monitor.debug(RESOURCES.getString("threadpool.wait.resize"));
331 }
332 }
333 }
334 }
335 }
336
337
338
339
340
341 /***
342 * The method ran by the pool of background threads.
343 */
344 public void run() {
345 monitor.info(RESOURCES.getString("thread.started"));
346 try {
347 threadFactory.initThread();
348 } catch (Throwable t) {
349 synchronized (mutex) {
350 initException = t;
351 mutex.notifyAll();
352 }
353 return;
354 }
355 synchronized (mutex) {
356 initialized = true;
357 threadCount++;
358 mutex.notifyAll();
359 }
360 while (!stopped) {
361 synchronized (mutex) {
362 if (threadCount > size) {
363 break;
364 }
365 }
366 final Runnable task = (Runnable) queue.dequeue(MAX_WAIT_FOR_TASK);
367 if (task != null) {
368 try {
369 task.run();
370 } catch (Throwable t) {
371 monitor.fatalError(RESOURCES.getString("task.execute.failed"), t);
372 }
373 synchronized (task) {
374 task.notifyAll();
375 }
376 }
377 }
378 synchronized (mutex) {
379 threadCount--;
380 }
381 try {
382 threadFactory.termThread();
383 } catch (Throwable t) {
384 threadFactory.handleThrowable(t);
385 }
386 monitor.info(RESOURCES.getString("thread.stopped"));
387 synchronized (mutex) {
388 mutex.notifyAll();
389 }
390 }
391 }