1 /*
2 Copyright 2010 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package mirrormap.nio;
17
18 import java.io.IOException;
19 import java.nio.channels.Channel;
20 import java.nio.channels.ClosedChannelException;
21 import java.nio.channels.SelectableChannel;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31
32 import mirrormap.Utils;
33 import mirrormap.lifecycle.AbstractLifeCycle;
34
35 /**
36 * A collection of {@link ISelectionKeyTask} objects that handle the logic for
37 * the ready operations raised from registered NIO {@link Channel} objects. This
38 * class exists to provide a mechanism to manage the tasks that should be run
39 * for each channel that is registered against a single {@link Selector}. Each
40 * instance has an internal {@link Selector} that is used to register with the
41 * {@link Channel} supplied in the
42 * {@link #register(int, SelectableChannel, ISelectionKeyTask)} method.
43 * <p>
44 * This object invokes the {@link Selector#select()} method and blocks until a
45 * channel is available for one of the registered operations. When this happens
46 * the appropriate task is located and executed.
47 *
48 * <pre>
49 * // create and register
50 * SelectorTasks tasks = new SelectorTasks();
51 * // create an optional listener to get notified when the selector is opened
52 * ISelectorTasksStateListener = ...
53 * tasks.setStateListener(listener);
54 *
55 * // OPEN THE CHANNEL - no other operation is valid until this is run
56 * tasks.openSelector();
57 *
58 * SelectableChannel channel = ...;
59 * ISelectionKeyTask task = ...;
60 * tasks.register(SelectionKey.OP_READ, channel, task);
61 *
62 * // process - this blocks until a ready operation occurs,
63 * // in the real-world, this would be called in a separate thread
64 * while(tasks.process());
65 * </pre>
66 *
67 * There should only be 1 thread running the process method. The
68 * {@link #getRunnable()} method returns an {@link Runnable} object for a
69 * dedicated processing thread. e.g.
70 *
71 * <pre>
72 * // create and register
73 * SelectorTasks tasks = new SelectorTasks();
74 * ...
75 * // kick off a thread to run the process() method
76 * new Thread(tasks.getRunnable()).start();
77 * </pre>
78 *
79 * <b>This is not thread aware.</b>
80 *
81 * @author Ramon Servadei
82 *
83 */
84 public final class SelectorTasks extends AbstractLifeCycle
85 {
86 private final static Logger LOGGER =
87 Logger.getLogger(SelectorTasks.class.getName());
88
89 /**
90 * The {@link ISelectionKeyTask} objects registered via
91 * {@link #register(int, SelectableChannel, ISelectionKeyTask)}
92 */
93 private final Map<SelectionKey, ISelectionKeyTask> tasks;
94
95 /** The selector */
96 private Selector bo;
97
98 /** The state listener */
99 private ISelectorTasksStateListener listener;
100
101 /**
102 * Standard constructor
103 */
104 public SelectorTasks()
105 {
106 super();
107 this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
108 }
109
110 /**
111 * Register the internal {@link Selector} with the channel and map the
112 * supplied task to the {@link SelectionKey} that is returned from the
113 * {@link SelectableChannel#register(Selector, int)} method.
114 *
115 * @param op
116 * the operation to register for
117 * @param channel
118 * the channel that will register with the internal selector
119 * @param task
120 * the task to run when the operation occurs in the channel. This
121 * task should be efficient; if it does any blocking this affects
122 * processing of other registered {@link ISelectionKeyTask}
123 * objects.
124 * @see SelectionKey
125 */
126 @SuppressWarnings("boxing")
127 public void register(int op, SelectableChannel channel,
128 ISelectionKeyTask task)
129 {
130 checkSelector();
131 try
132 {
133 SelectionKey selectionKey = null;
134 // synchronized before we wake up the selector so we prevent any
135 // thread in the process() method from re-aquiring the selector lock
136 // via selector.select()
137 synchronized (this)
138 {
139 // register blocks until the selector is available so wake it up
140 this.bo.wakeup();
141 selectionKey = channel.register(this.bo, op);
142 }
143 if (LOGGER.isLoggable(Level.FINE))
144 {
145 LOGGER.fine("Registered " + Utils.safeToString(task)
146 + " against " + Utils.safeToString(channel));
147 }
148 task.setSelectionKey(selectionKey);
149 this.tasks.put(selectionKey, task);
150 }
151 catch (ClosedChannelException e)
152 {
153 Utils.logException(LOGGER, "Could not register operation "
154 + Utils.safeToString(op) + " for "
155 + Utils.safeToString(channel) + " with "
156 + Utils.safeToString(task), e);
157 }
158 }
159
160 /**
161 * Unregister the selection key and remove the associated
162 * {@link ISelectionKeyTask}. The key is also cancelled via the
163 * {@link SelectionKey#cancel()} method.
164 *
165 * @param key
166 * the selection key to unregister and cancel
167 * @return the previously registered task
168 */
169 public ISelectionKeyTask unregister(SelectionKey key)
170 {
171 key.cancel();
172 final ISelectionKeyTask task = this.tasks.remove(key);
173 if (LOGGER.isLoggable(Level.FINE))
174 {
175 LOGGER.fine("Unregistered " + Utils.safeToString(task));
176 }
177 return task;
178 }
179
180 /**
181 * Call {@link Selector#select()} and wait for a one of the operations in
182 * any of the registered channels to occur. When one does, the appropriate
183 * {@link ISelectionKeyTask} is run.
184 *
185 * @return <code>false</code> if this becomes inactive
186 */
187 public boolean process()
188 {
189 checkSelector();
190 try
191 {
192 this.bo.select();
193 if (this.bo.isOpen())
194 {
195 Set<SelectionKey> selectedKeys = null;
196 // synchronized to prevent dead-lock if a thread is running the
197 // register() method and has woken up the selector
198 synchronized (this)
199 {
200 selectedKeys = this.bo.selectedKeys();
201 }
202 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
203 {
204 SelectionKey selectionKey = iterator.next();
205 iterator.remove();
206 try
207 {
208 if (selectionKey.isValid())
209 {
210 final ISelectionKeyTask selectionKeyTask =
211 this.tasks.get(selectionKey);
212 if (selectionKeyTask != null)
213 {
214 selectionKeyTask.run();
215 }
216 else
217 {
218 if (LOGGER.isLoggable(Level.INFO))
219 {
220 LOGGER.info("No registered task for handling selection key for "
221 + Utils.safeToString(selectionKey.channel()));
222 }
223 }
224 }
225 else
226 {
227 final ISelectionKeyTask selectionKeyTask =
228 this.tasks.remove(selectionKey);
229 if (selectionKeyTask != null)
230 {
231 selectionKeyTask.destroy();
232 }
233 }
234 }
235 catch (Exception e)
236 {
237 Utils.logException(LOGGER,
238 "Could not process selection key event for "
239 + Utils.safeToString(selectionKey.channel()), e);
240 }
241 }
242
243 }
244 else
245 {
246 openSelector();
247 }
248 }
249 catch (Exception e)
250 {
251 Utils.logException(LOGGER, "Error processing selector "
252 + Utils.safeToString(this.bo), e);
253 }
254 return isActive();
255 }
256
257 /**
258 * Open the internal {@link Selector}
259 */
260 private void openSelector()
261 {
262 if (isActive())
263 {
264 try
265 {
266 this.bo = Selector.open();
267 }
268 catch (IOException e)
269 {
270 throw new IllegalStateException("Could not open selector", e);
271 }
272 if (this.listener != null)
273 {
274 this.listener.selectorOpened(this);
275 }
276 }
277 }
278
279 @Override
280 protected void doStart()
281 {
282 openSelector();
283 }
284
285 @Override
286 protected final void doDestroy()
287 {
288 try
289 {
290 destroyTasks();
291 if (this.bo != null)
292 {
293 this.bo.close();
294 }
295 }
296 catch (Exception e)
297 {
298 Utils.logException(LOGGER, "Error destroying selector "
299 + Utils.safeToString(this.bo), e);
300 }
301 }
302
303 /**
304 * Set the state listener
305 *
306 * @param listener
307 */
308 public void setStateListener(ISelectorTasksStateListener listener)
309 {
310 this.listener = listener;
311 }
312
313 /**
314 * Get a {@link Runnable} to continually execute the {@link #process()}
315 * method.
316 *
317 * @return a runnable to execute this
318 */
319 public Runnable getRunnable()
320 {
321 checkSelector();
322 return this.new SelectorTaskProcessor();
323 }
324
325 private void checkSelector()
326 {
327 if (this.bo == null)
328 {
329 throw new IllegalStateException("Selector not open");
330 }
331 }
332
333 /**
334 * Destroy all registered {@link ISelectionKeyTask} objects and cancel their
335 * associated {@link SelectionKey}
336 */
337 private void destroyTasks()
338 {
339 if (this.bo != null)
340 {
341 Set<SelectionKey> keys = null;
342 // iterate over a copy because the iteration loop modifies the keys
343 // associated with the selector (when it cancels the key)
344 keys = new HashSet<SelectionKey>(this.bo.keys());
345 for (SelectionKey selectionKey : keys)
346 {
347 if (selectionKey.isValid())
348 {
349 selectionKey.cancel();
350 final ISelectionKeyTask task =
351 this.tasks.remove(selectionKey);
352 if (task != null)
353 {
354 try
355 {
356 task.destroy();
357 }
358 catch (Exception e)
359 {
360 Utils.logException(LOGGER,
361 "Could not destroy task "
362 + Utils.safeToString(task), e);
363 }
364 }
365 }
366 }
367 }
368 }
369
370 /**
371 * A {@link Runnable} implementation to execute the
372 * {@link SelectorTasks#process()} method.
373 *
374 * @author Ramon Servadei
375 *
376 */
377 private final class SelectorTaskProcessor implements Runnable
378 {
379 public void run()
380 {
381 while (SelectorTasks.this.process())
382 {
383
384 }
385 }
386 }
387 }