1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirrormap.nio;
17
18 import java.io.IOException;
19 import java.nio.channels.Channel;
20 import java.nio.channels.ClosedChannelException;
21 import java.nio.channels.SelectableChannel;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31
32 import mirrormap.Utils;
33 import mirrormap.lifecycle.AbstractLifeCycle;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public final class SelectorTasks extends AbstractLifeCycle
85 {
86 private final static Logger LOGGER =
87 Logger.getLogger(SelectorTasks.class.getName());
88
89
90
91
92
93 private final Map<SelectionKey, ISelectionKeyTask> tasks;
94
95
96 private Selector bo;
97
98
99 private ISelectorTasksStateListener listener;
100
101
102
103
104 public SelectorTasks()
105 {
106 super();
107 this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
108 }
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 @SuppressWarnings("boxing")
127 public void register(int op, SelectableChannel channel,
128 ISelectionKeyTask task)
129 {
130 checkSelector();
131 try
132 {
133 SelectionKey selectionKey = null;
134
135
136
137 synchronized (this)
138 {
139
140 this.bo.wakeup();
141 selectionKey = channel.register(this.bo, op);
142 }
143 if (LOGGER.isLoggable(Level.FINE))
144 {
145 LOGGER.fine("Registered " + Utils.safeToString(task)
146 + " against " + Utils.safeToString(channel));
147 }
148 task.setSelectionKey(selectionKey);
149 this.tasks.put(selectionKey, task);
150 }
151 catch (ClosedChannelException e)
152 {
153 Utils.logException(LOGGER, "Could not register operation "
154 + Utils.safeToString(op) + " for "
155 + Utils.safeToString(channel) + " with "
156 + Utils.safeToString(task), e);
157 }
158 }
159
160
161
162
163
164
165
166
167
168
169 public ISelectionKeyTask unregister(SelectionKey key)
170 {
171 key.cancel();
172 final ISelectionKeyTask task = this.tasks.remove(key);
173 if (LOGGER.isLoggable(Level.FINE))
174 {
175 LOGGER.fine("Unregistered " + Utils.safeToString(task));
176 }
177 return task;
178 }
179
180
181
182
183
184
185
186
187 public boolean process()
188 {
189 checkSelector();
190 try
191 {
192 this.bo.select();
193 if (this.bo.isOpen())
194 {
195 Set<SelectionKey> selectedKeys = null;
196
197
198 synchronized (this)
199 {
200 selectedKeys = this.bo.selectedKeys();
201 }
202 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
203 {
204 SelectionKey selectionKey = iterator.next();
205 iterator.remove();
206 try
207 {
208 if (selectionKey.isValid())
209 {
210 final ISelectionKeyTask selectionKeyTask =
211 this.tasks.get(selectionKey);
212 if (selectionKeyTask != null)
213 {
214 selectionKeyTask.run();
215 }
216 else
217 {
218 if (LOGGER.isLoggable(Level.INFO))
219 {
220 LOGGER.info("No registered task for handling selection key for "
221 + Utils.safeToString(selectionKey.channel()));
222 }
223 }
224 }
225 else
226 {
227 final ISelectionKeyTask selectionKeyTask =
228 this.tasks.remove(selectionKey);
229 if (selectionKeyTask != null)
230 {
231 selectionKeyTask.destroy();
232 }
233 }
234 }
235 catch (Exception e)
236 {
237 Utils.logException(LOGGER,
238 "Could not process selection key event for "
239 + Utils.safeToString(selectionKey.channel()), e);
240 }
241 }
242
243 }
244 else
245 {
246 openSelector();
247 }
248 }
249 catch (Exception e)
250 {
251 Utils.logException(LOGGER, "Error processing selector "
252 + Utils.safeToString(this.bo), e);
253 }
254 return isActive();
255 }
256
257
258
259
260 private void openSelector()
261 {
262 if (isActive())
263 {
264 try
265 {
266 this.bo = Selector.open();
267 }
268 catch (IOException e)
269 {
270 throw new IllegalStateException("Could not open selector", e);
271 }
272 if (this.listener != null)
273 {
274 this.listener.selectorOpened(this);
275 }
276 }
277 }
278
279 @Override
280 protected void doStart()
281 {
282 openSelector();
283 }
284
285 @Override
286 protected final void doDestroy()
287 {
288 try
289 {
290 destroyTasks();
291 if (this.bo != null)
292 {
293 this.bo.close();
294 }
295 }
296 catch (Exception e)
297 {
298 Utils.logException(LOGGER, "Error destroying selector "
299 + Utils.safeToString(this.bo), e);
300 }
301 }
302
303
304
305
306
307
308 public void setStateListener(ISelectorTasksStateListener listener)
309 {
310 this.listener = listener;
311 }
312
313
314
315
316
317
318
319 public Runnable getRunnable()
320 {
321 checkSelector();
322 return this.new SelectorTaskProcessor();
323 }
324
325 private void checkSelector()
326 {
327 if (this.bo == null)
328 {
329 throw new IllegalStateException("Selector not open");
330 }
331 }
332
333
334
335
336
337 private void destroyTasks()
338 {
339 if (this.bo != null)
340 {
341 Set<SelectionKey> keys = null;
342
343
344 keys = new HashSet<SelectionKey>(this.bo.keys());
345 for (SelectionKey selectionKey : keys)
346 {
347 if (selectionKey.isValid())
348 {
349 selectionKey.cancel();
350 final ISelectionKeyTask task =
351 this.tasks.remove(selectionKey);
352 if (task != null)
353 {
354 try
355 {
356 task.destroy();
357 }
358 catch (Exception e)
359 {
360 Utils.logException(LOGGER,
361 "Could not destroy task "
362 + Utils.safeToString(task), e);
363 }
364 }
365 }
366 }
367 }
368 }
369
370
371
372
373
374
375
376
377 private final class SelectorTaskProcessor implements Runnable
378 {
379 public void run()
380 {
381 while (SelectorTasks.this.process())
382 {
383
384 }
385 }
386 }
387 }