View Javadoc

1   /*
2      Copyright 2010 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package mirrormap.nio;
17  
18  import java.io.IOException;
19  import java.nio.channels.Channel;
20  import java.nio.channels.ClosedChannelException;
21  import java.nio.channels.SelectableChannel;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.logging.Level;
30  import java.util.logging.Logger;
31  
32  import mirrormap.Utils;
33  import mirrormap.lifecycle.AbstractLifeCycle;
34  
35  /**
36   * A collection of {@link ISelectionKeyTask} objects that handle the logic for
37   * the ready operations raised from registered NIO {@link Channel} objects. This
38   * class exists to provide a mechanism to manage the tasks that should be run
39   * for each channel that is registered against a single {@link Selector}. Each
40   * instance has an internal {@link Selector} that is used to register with the
41   * {@link Channel} supplied in the
42   * {@link #register(int, SelectableChannel, ISelectionKeyTask)} method.
43   * <p>
44   * This object invokes the {@link Selector#select()} method and blocks until a
45   * channel is available for one of the registered operations. When this happens
46   * the appropriate task is located and executed.
47   * 
48   * <pre>
49   *  // create and register
50   *  SelectorTasks tasks = new SelectorTasks();
51   *  // create an optional listener to get notified when the selector is opened
52   *  ISelectorTasksStateListener = ...
53   *  tasks.setStateListener(listener);
54   *  
55   *  // OPEN THE CHANNEL - no other operation is valid until this is run
56   *  tasks.openSelector();
57   *  
58   *  SelectableChannel channel = ...;
59   *  ISelectionKeyTask task = ...;
60   *  tasks.register(SelectionKey.OP_READ, channel, task);
61   *  
62   *  // process - this blocks until a ready operation occurs, 
63   *  // in the real-world, this would be called in a separate thread
64   *  while(tasks.process());
65   * </pre>
66   * 
67   * There should only be 1 thread running the process method. The
68   * {@link #getRunnable()} method returns an {@link Runnable} object for a
69   * dedicated processing thread. e.g.
70   * 
71   * <pre>
72   *  // create and register
73   *  SelectorTasks tasks = new SelectorTasks();
74   *  ...
75   *  // kick off a thread to run the process() method 
76   *  new Thread(tasks.getRunnable()).start();
77   * </pre>
78   * 
79   * <b>This is not thread aware.</b>
80   * 
81   * @author Ramon Servadei
82   * 
83   */
84  public final class SelectorTasks extends AbstractLifeCycle
85  {
86      private final static Logger LOGGER =
87          Logger.getLogger(SelectorTasks.class.getName());
88  
89      /**
90       * The {@link ISelectionKeyTask} objects registered via
91       * {@link #register(int, SelectableChannel, ISelectionKeyTask)}
92       */
93      private final Map<SelectionKey, ISelectionKeyTask> tasks;
94  
95      /** The selector */
96      private Selector bo;
97  
98      /** The state listener */
99      private ISelectorTasksStateListener listener;
100 
101     /**
102      * Standard constructor
103      */
104     public SelectorTasks()
105     {
106         super();
107         this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
108     }
109 
110     /**
111      * Register the internal {@link Selector} with the channel and map the
112      * supplied task to the {@link SelectionKey} that is returned from the
113      * {@link SelectableChannel#register(Selector, int)} method.
114      * 
115      * @param op
116      *            the operation to register for
117      * @param channel
118      *            the channel that will register with the internal selector
119      * @param task
120      *            the task to run when the operation occurs in the channel. This
121      *            task should be efficient; if it does any blocking this affects
122      *            processing of other registered {@link ISelectionKeyTask}
123      *            objects.
124      * @see SelectionKey
125      */
126     @SuppressWarnings("boxing")
127     public void register(int op, SelectableChannel channel,
128         ISelectionKeyTask task)
129     {
130         checkSelector();
131         try
132         {
133             SelectionKey selectionKey = null;
134             // synchronized before we wake up the selector so we prevent any
135             // thread in the process() method from re-aquiring the selector lock
136             // via selector.select()
137             synchronized (this)
138             {
139                 // register blocks until the selector is available so wake it up
140                 this.bo.wakeup();
141                 selectionKey = channel.register(this.bo, op);
142             }
143             if (LOGGER.isLoggable(Level.FINE))
144             {
145                 LOGGER.fine("Registered " + Utils.safeToString(task)
146                     + " against " + Utils.safeToString(channel));
147             }
148             task.setSelectionKey(selectionKey);
149             this.tasks.put(selectionKey, task);
150         }
151         catch (ClosedChannelException e)
152         {
153             Utils.logException(LOGGER, "Could not register operation "
154                 + Utils.safeToString(op) + " for "
155                 + Utils.safeToString(channel) + " with "
156                 + Utils.safeToString(task), e);
157         }
158     }
159 
160     /**
161      * Unregister the selection key and remove the associated
162      * {@link ISelectionKeyTask}. The key is also cancelled via the
163      * {@link SelectionKey#cancel()} method.
164      * 
165      * @param key
166      *            the selection key to unregister and cancel
167      * @return the previously registered task
168      */
169     public ISelectionKeyTask unregister(SelectionKey key)
170     {
171         key.cancel();
172         final ISelectionKeyTask task = this.tasks.remove(key);
173         if (LOGGER.isLoggable(Level.FINE))
174         {
175             LOGGER.fine("Unregistered " + Utils.safeToString(task));
176         }
177         return task;
178     }
179 
180     /**
181      * Call {@link Selector#select()} and wait for a one of the operations in
182      * any of the registered channels to occur. When one does, the appropriate
183      * {@link ISelectionKeyTask} is run.
184      * 
185      * @return <code>false</code> if this becomes inactive
186      */
187     public boolean process()
188     {
189         checkSelector();
190         try
191         {
192             this.bo.select();
193             if (this.bo.isOpen())
194             {
195                 Set<SelectionKey> selectedKeys = null;
196                 // synchronized to prevent dead-lock if a thread is running the
197                 // register() method and has woken up the selector
198                 synchronized (this)
199                 {
200                     selectedKeys = this.bo.selectedKeys();
201                 }
202                 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
203                 {
204                     SelectionKey selectionKey = iterator.next();
205                     iterator.remove();
206                     try
207                     {
208                         if (selectionKey.isValid())
209                         {
210                             final ISelectionKeyTask selectionKeyTask =
211                                 this.tasks.get(selectionKey);
212                             if (selectionKeyTask != null)
213                             {
214                                 selectionKeyTask.run();
215                             }
216                             else
217                             {
218                                 if (LOGGER.isLoggable(Level.INFO))
219                                 {
220                                     LOGGER.info("No registered task for handling selection key for "
221                                         + Utils.safeToString(selectionKey.channel()));
222                                 }
223                             }
224                         }
225                         else
226                         {
227                             final ISelectionKeyTask selectionKeyTask =
228                                 this.tasks.remove(selectionKey);
229                             if (selectionKeyTask != null)
230                             {
231                                 selectionKeyTask.destroy();
232                             }
233                         }
234                     }
235                     catch (Exception e)
236                     {
237                         Utils.logException(LOGGER,
238                             "Could not process selection key event for "
239                                 + Utils.safeToString(selectionKey.channel()), e);
240                     }
241                 }
242 
243             }
244             else
245             {
246                 openSelector();
247             }
248         }
249         catch (Exception e)
250         {
251             Utils.logException(LOGGER, "Error processing selector "
252                 + Utils.safeToString(this.bo), e);
253         }
254         return isActive();
255     }
256 
257     /**
258      * Open the internal {@link Selector}
259      */
260     private void openSelector()
261     {
262         if (isActive())
263         {
264             try
265             {
266                 this.bo = Selector.open();
267             }
268             catch (IOException e)
269             {
270                 throw new IllegalStateException("Could not open selector", e);
271             }
272             if (this.listener != null)
273             {
274                 this.listener.selectorOpened(this);
275             }
276         }
277     }
278 
279     @Override
280     protected void doStart()
281     {
282         openSelector();
283     }
284 
285     @Override
286     protected final void doDestroy()
287     {
288         try
289         {
290             destroyTasks();
291             if (this.bo != null)
292             {
293                 this.bo.close();
294             }
295         }
296         catch (Exception e)
297         {
298             Utils.logException(LOGGER, "Error destroying selector "
299                 + Utils.safeToString(this.bo), e);
300         }
301     }
302 
303     /**
304      * Set the state listener
305      * 
306      * @param listener
307      */
308     public void setStateListener(ISelectorTasksStateListener listener)
309     {
310         this.listener = listener;
311     }
312 
313     /**
314      * Get a {@link Runnable} to continually execute the {@link #process()}
315      * method.
316      * 
317      * @return a runnable to execute this
318      */
319     public Runnable getRunnable()
320     {
321         checkSelector();
322         return this.new SelectorTaskProcessor();
323     }
324 
325     private void checkSelector()
326     {
327         if (this.bo == null)
328         {
329             throw new IllegalStateException("Selector not open");
330         }
331     }
332 
333     /**
334      * Destroy all registered {@link ISelectionKeyTask} objects and cancel their
335      * associated {@link SelectionKey}
336      */
337     private void destroyTasks()
338     {
339         if (this.bo != null)
340         {
341             Set<SelectionKey> keys = null;
342             // iterate over a copy because the iteration loop modifies the keys
343             // associated with the selector (when it cancels the key)
344             keys = new HashSet<SelectionKey>(this.bo.keys());
345             for (SelectionKey selectionKey : keys)
346             {
347                 if (selectionKey.isValid())
348                 {
349                     selectionKey.cancel();
350                     final ISelectionKeyTask task =
351                         this.tasks.remove(selectionKey);
352                     if (task != null)
353                     {
354                         try
355                         {
356                             task.destroy();
357                         }
358                         catch (Exception e)
359                         {
360                             Utils.logException(LOGGER,
361                                 "Could not destroy task "
362                                     + Utils.safeToString(task), e);
363                         }
364                     }
365                 }
366             }
367         }
368     }
369 
370     /**
371      * A {@link Runnable} implementation to execute the
372      * {@link SelectorTasks#process()} method.
373      * 
374      * @author Ramon Servadei
375      * 
376      */
377     private final class SelectorTaskProcessor implements Runnable
378     {
379         public void run()
380         {
381             while (SelectorTasks.this.process())
382             {
383 
384             }
385         }
386     }
387 }