1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package de.bea.domingo.queue;
24
25 import java.util.LinkedList;
26
27 import de.bea.domingo.DNotesMonitor;
28 import de.bea.domingo.monitor.ConsoleMonitor;
29
30 /***
31 * A multithreaded blocking queue which is very useful for
32 * implementing producer-consumer style threading patterns.
33 * <p>
34 * Multiple blocking threads can wait for items being added
35 * to the queue while other threads add to the queue.
36 * <p>
37 * Non blocking and timeout based modes of access are possible as well.
38 *
39 * @author <a href="mailto:kriede@users.sourceforge.net">Kurt Riede</a>
40 */
41 public final class MTQueue implements Queue {
42
43 /*** Default amount of time to wait for a task from the queue in milli seconds. */
44 private static final long DEFAULT_TIMEOUT = 1000;
45
46 /*** List of queued objects. */
47 private LinkedList list = new LinkedList();
48
49 /*** Reference to the associated monitor. */
50 private final DNotesMonitor monitor;
51
52 private Object fMutex = new Object();
53
54 /***
55 * Constructor.
56 *
57 * @param mutex a mutex for syncronization
58 * @param theMonitor the monitor
59 */
60 public MTQueue(final Object mutex, final DNotesMonitor theMonitor) {
61 fMutex = mutex;
62 if (theMonitor != null) {
63 monitor = theMonitor;
64 } else {
65 monitor = new ConsoleMonitor();
66 }
67 }
68
69 /***
70 * {@inheritDoc}
71 * @see de.bea.domingo.queue.Queue#size()
72 */
73 public int size() {
74 synchronized (fMutex) {
75 return list.size();
76 }
77 }
78
79 /***
80 * {@inheritDoc}
81 * @see de.bea.domingo.queue.Queue#isEmpty()
82 */
83 public boolean isEmpty() {
84 synchronized (fMutex) {
85 return size() == 0;
86 }
87 }
88 /***
89 * Adds a new object to the end of the queue.
90 * At least one thread will be notified.
91 * @param object the object to add to the queue
92 */
93 public void enqueue(final Object object) {
94 synchronized (fMutex) {
95 list.add(object);
96 fMutex.notifyAll();
97 }
98 }
99
100 /***
101 * Removes the first object from the queue, blocking until one is available.
102 * Note that this method will never return <code>null</code> and could block forever.
103 *
104 * @return next object from the queue
105 */
106 public Object dequeue() {
107 synchronized (fMutex) {
108 while (true) {
109 final Object answer = dequeueNoWait();
110 if (answer != null) {
111 return answer;
112 }
113 try {
114 fMutex.wait(DEFAULT_TIMEOUT);
115 } catch (InterruptedException e) {
116 monitor.fatalError(e.getMessage(), e);
117 }
118 }
119 }
120 }
121
122 /***
123 * Removes the first object from the queue, blocking only up to the given
124 * timeout time.
125 *
126 * A thread can also wake up without being notified, interrupted, or
127 * timing out, a so-called <i>spurious wakeup</i>. This will rarely
128 * occur in practice, but it can occur and in such a case, this method
129 * will not wait upto the timeout, but come back earlier without a
130 * dequeued object.
131 *
132 * @param timeout maximum time to wait for an object from the queue
133 * @return next object from the queue
134 */
135 public Object dequeue(final long timeout) {
136 synchronized (fMutex) {
137 Object answer = dequeueNoWait();
138 if (answer == null) {
139 try {
140 fMutex.wait(timeout);
141 } catch (InterruptedException e) {
142 monitor.fatalError(e.getMessage(), e);
143 }
144 answer = dequeueNoWait();
145 }
146 return answer;
147 }
148 }
149
150 /***
151 * Removes the first object from the queue without blocking.
152 * This method will return immediately with an item from the queue or <code>null</code>.
153 *
154 * @return the first object removed from the queue or null if the
155 * queue is empty
156 */
157 public Object dequeueNoWait() {
158 synchronized (fMutex) {
159 if (!list.isEmpty()) {
160 return list.removeFirst();
161 }
162 return null;
163 }
164 }
165 }