View Javadoc

1   /*
2      Copyright 2010 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package mirrormap.nio;
17  
18  import java.io.IOException;
19  import java.net.InetSocketAddress;
20  import java.nio.ByteBuffer;
21  import java.nio.channels.SelectableChannel;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.concurrent.Executor;
31  import java.util.logging.Level;
32  import java.util.logging.Logger;
33  
34  import mirrormap.Utils;
35  import mirrormap.lifecycle.AbstractLifeCycle;
36  
37  /**
38   * Handles the socket processing for a TCP end point.
39   * 
40   * @author Ramon Servadei
41   */
42  public final class TcpConnectionEndPoint extends AbstractLifeCycle implements
43      IConnectionEndPoint, ISelectionKeyTask
44  {
45      /**
46       * Bit shifts to access the byte ordinal that the index represents. E.g.
47       * bitShiftForByteOrdinal[2] has a value of 8 which is the number of bits to
48       * right shift an integral to access the 2nd byte.
49       */
50      private static final long[] bitShiftForByteOrdinal = new long[9];
51      static
52      {
53          int i = 0;
54          // 0 doesn't count
55          bitShiftForByteOrdinal[i] = 0;
56          bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 0
57          bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 8
58          bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 16
59          bitShiftForByteOrdinal[++i] = 8 * (i - 1);
60          bitShiftForByteOrdinal[++i] = 8 * (i - 1);
61          bitShiftForByteOrdinal[++i] = 8 * (i - 1);
62          bitShiftForByteOrdinal[++i] = 8 * (i - 1);
63          bitShiftForByteOrdinal[++i] = 8 * (i - 1);
64      }
65  
66      private final static Logger LOGGER =
67          Logger.getLogger(TcpConnectionEndPoint.class.getName());
68  
69      /** The receive buffer, 65535 bytes */
70      private final static byte[] recvBuffer = new byte[65535];
71  
72      /** The pending messages to send */
73      final List<ByteBuffer> messages = new ArrayList<ByteBuffer>();
74  
75      /**
76       * The selection key associated with the {@link SelectableChannel} for this
77       * end point
78       */
79      private SelectionKey selectionKey;
80  
81      /** Reference required during {@link #doComponentDestroy()} */
82      private final SelectorTasks selectorTasks;
83  
84      /** Handles data received from the remote end point */
85      private final IConnectionEndPointReceiver receiver;
86  
87      /** The socket channel to the remote {@link IConnectionEndPoint} */
88      private SocketChannel socketChannel;
89  
90      /**
91       * Flag to indicate the socket direction, by default connections are
92       * outbound
93       */
94      private boolean outbound = true;
95  
96      /** The TCP port for the socket connection of the other end point */
97      private final int remotePort;
98  
99      /** The IP address or resolvable host name of the other end point */
100     private final String remoteHost;
101 
102     /** Handles the socket writing */
103     private Executor writer;
104 
105     /**
106      * Standard constructor for a TCP end point.
107      * 
108      * @param receiver
109      *            the object that will handle data received from the remote end
110      *            point
111      * @param selectorTasks
112      *            used to register and unregister this end point against the
113      *            {@link SelectionKey} of the underlying
114      *            {@link SelectableChannel} backing this end point
115      * @param writer
116      *            handles writing data, optional. If <code>null</code> then the
117      *            {@link Thread} running the {@link SelectorTasks#process()}
118      *            method will do the writing
119      * @param host
120      *            the IP address or resolvable host name of the other end point
121      * @param port
122      *            the TCP port for the socket connection of the other end point
123      * @throws IllegalStateException
124      *             if the socket could not be constructed
125      */
126     public TcpConnectionEndPoint(IConnectionEndPointReceiver receiver,
127         SelectorTasks selectorTasks, Executor writer, String host, int port)
128     {
129         this.remoteHost = host;
130         this.remotePort = port;
131         this.receiver = receiver;
132         this.selectorTasks = selectorTasks;
133         this.writer = writer;
134     }
135 
136     /**
137      * Add the data to the pending messages list
138      * 
139      * @param data
140      *            the data to add to the pending messages
141      */
142     public void send(byte[] data)
143     {
144         if (!isActive())
145         {
146             if (LOGGER.isLoggable(Level.FINE))
147             {
148                 LOGGER.fine("inactive");
149             }
150             return;
151         }
152         synchronized (this)
153         {
154             byte[] actual = encode(data);
155 
156             final ByteBuffer wrapped = ByteBuffer.wrap(actual);
157             messages.add(wrapped);
158             // we're interested in writing now
159             this.selectionKey.interestOps(this.selectionKey.interestOps()
160                 | SelectionKey.OP_WRITE);
161             this.selectionKey.selector().wakeup();
162         }
163     }
164 
165     public IConnectionEndPointReceiver getReceiver()
166     {
167         return this.receiver;
168     }
169 
170     @Override
171     protected void doStart()
172     {
173         try
174         {
175             if (isOutbound())
176             {
177                 final SocketChannel localSocketChannel = SocketChannel.open();
178                 localSocketChannel.configureBlocking(false);
179                 // as the channel is in non-blocking mode, we need some extra
180                 // work to connect
181                 if (!localSocketChannel.connect(new InetSocketAddress(
182                     this.remoteHost, this.remotePort)))
183                 {
184                     // the connection has not completed, so wait on the selector
185                     // until we can complete the connection
186                     final Selector selector = Selector.open();
187                     final SelectionKey key =
188                         localSocketChannel.register(selector,
189                             SelectionKey.OP_CONNECT);
190                     selector.select();
191                     localSocketChannel.finishConnect();
192                     key.cancel();
193                     selector.close();
194                 }
195                 if (LOGGER.isLoggable(Level.FINE))
196                 {
197                     LOGGER.fine("(->) Connected to " + this);
198                 }
199                 setSocketChannel(localSocketChannel);
200             }
201             else
202             {
203                 if (LOGGER.isLoggable(Level.FINE))
204                 {
205                     LOGGER.fine("(<-) Connected to " + this);
206                 }
207             }
208         }
209         catch (Exception e)
210         {
211             this.socketChannel = null;
212             final String warning =
213                 "Could not create socket to " + this.remoteHost + ":"
214                     + this.remotePort + ". Calling destroy().";
215             destroy();
216             throw new RuntimeException(warning, e);
217         }
218         this.selectorTasks.register(SelectionKey.OP_READ, getSocketChannel(),
219             this);
220     }
221 
222     public boolean isConnected()
223     {
224         return this.socketChannel != null
225             && this.socketChannel.socket() != null
226             && this.socketChannel.socket().isConnected();
227     }
228 
229     /**
230      * Set the direction of the socket connection.
231      * 
232      * @param outbound
233      *            <code>true</code> for an outbound connecting socket (connects
234      *            to a server socket), <code>false</code> for an inbound
235      *            connecting one
236      */
237     void setOutbound(boolean outbound)
238     {
239         this.outbound = outbound;
240     }
241 
242     /**
243      * Indicates the connecting direction.
244      * 
245      * @return <code>true</code> if this socket connects to a server socket,
246      *         <code>false</code> if it was an inbound connection from another
247      *         end point
248      */
249     public boolean isOutbound()
250     {
251         return outbound;
252     }
253 
254     /**
255      * Destroy the internal TCP I/O components.
256      */
257     @Override
258     protected void doDestroy()
259     {
260         if (this.receiver != null)
261         {
262             this.receiver.destroy();
263         }
264         if (this.socketChannel != null)
265         {
266             try
267             {
268                 if (this.socketChannel.socket() != null)
269                 {
270                     this.socketChannel.socket().getOutputStream().flush();
271                     this.socketChannel.socket().shutdownInput();
272                     this.socketChannel.socket().shutdownOutput();
273                     this.socketChannel.socket().close();
274                 }
275                 this.socketChannel.close();
276             }
277             catch (IOException e)
278             {
279                 if (LOGGER.isLoggable(Level.FINE))
280                 {
281                     LOGGER.log(Level.FINE, "Could not close "
282                         + Utils.safeToString(this), e);
283                 }
284             }
285         }
286         try
287         {
288             if (this.selectionKey != null
289                 && this.selectionKey.channel() != null)
290             {
291                 this.selectorTasks.unregister(this.selectionKey);
292                 this.selectionKey.channel().close();
293             }
294         }
295         catch (Exception e)
296         {
297             Utils.logException(LOGGER, "Could not close channel for "
298                 + Utils.safeToString(this), e);
299         }
300     }
301 
302     protected SocketChannel getSocketChannel()
303     {
304         return this.socketChannel;
305     }
306 
307     void setSocketChannel(SocketChannel socketChannel)
308     {
309         this.socketChannel = socketChannel;
310     }
311 
312     @Override
313     public String toString()
314     {
315         return Utils.string(this, this.remoteHost + ":" + this.remotePort);
316     }
317 
318     /**
319      * Add a 4 byte prefix to the data that includes the length of the data as a
320      * big-endian integer.
321      * 
322      * @param data
323      *            the data
324      * @return an array equal in length to <code>4 + data.length</code> with a
325      *         copy of data starting from element 4
326      */
327     private byte[] encode(byte[] data)
328     {
329         // add the integer length of the data, always the first 4 bytes
330         byte[] actual = new byte[data.length + 4];
331         System.arraycopy(data, 0, actual, 4, data.length);
332         int startFrom = 0;
333         for (int i = 4; i > 0; i--)
334         {
335             actual[startFrom++] =
336                 (byte) (data.length >>> bitShiftForByteOrdinal[i]);
337         }
338         return actual;
339     }
340 
341     /**
342      * Split the data into its sub-messages. Messages have a 4 byte prefix that
343      * is the message length in bytes, excluding the 4 bytes, as a big-endian
344      * integer. This allows multiple messages to be received as a single TCP
345      * frame.
346      * 
347      * @param message
348      *            the full TCP frame data
349      * @return a list of the individual messages contained in the message
350      *         argument
351      */
352     private List<byte[]> decode(byte[] message)
353     {
354         // the TCP message could be a concatenation of multiple ones
355         // the format is: <4 bytes len><subdata><4 bytes len><subdata>
356         List<byte[]> values = new LinkedList<byte[]>();
357         int lenPtr = 0;
358         int len = 0;
359         while (lenPtr < message.length)
360         {
361             len = readInteger(message, lenPtr, 4);
362             // copy the sub data into the new array
363             byte[] subData = new byte[len];
364             lenPtr += 4;
365             System.arraycopy(message, lenPtr, subData, 0, len);
366             lenPtr += len;
367             values.add(subData);
368         }
369         return values;
370     }
371 
372     /**
373      * Reconstruct an integer from its byte form in the byte[].
374      * 
375      * @param bytes
376      *            the byte[] holding the data in byte form
377      * @param start
378      *            the position in bytes where the data starts
379      * @param numberOfBytes
380      *            the number of bytes that form the data to read
381      * @return an integer
382      */
383     private int readInteger(final byte[] bytes, final int start,
384         int numberOfBytes)
385     {
386         long output = 0;
387         final int end = numberOfBytes + start;
388         // identify if this is a -ve number, has ramifications for building up
389         // the output
390         boolean negative = false;
391         if ((bytes[start] & 0x80) == 0x80)
392         {
393             output = -1;
394             negative = true;
395         }
396         for (int i = start; i < end; i++)
397         {
398             // cast to a char as it is unsigned, and also AND with 00FF to
399             // correctly promote whilst keeping the byte unsigned
400             final char dataByte = (char) (bytes[i] & 0x00FF);
401             if (negative)
402             {
403                 // clear out this byte position with 0, prior to bitwise ORing
404                 // the byte
405                 output ^=
406                     ((long) 0xff << bitShiftForByteOrdinal[numberOfBytes]);
407             }
408             // bitwise OR the byte after shifting it by the correct number of
409             // bits
410             output |=
411                 ((long) dataByte << bitShiftForByteOrdinal[numberOfBytes--]);
412         }
413         return (int) output;
414     }
415 
416     public void run()
417     {
418         if (isActive())
419         {
420             if (this.selectionKey.isWritable())
421             {
422                 write();
423             }
424             if (this.selectionKey.isReadable())
425             {
426                 read();
427             }
428         }
429     }
430 
431     /**
432      * Reads data from the socket
433      */
434     private void read()
435     {
436         try
437         {
438             final ByteBuffer recv = ByteBuffer.wrap(recvBuffer);
439             final SocketChannel socketChannel =
440                 ((SocketChannel) this.selectionKey.channel());
441             final int size = socketChannel.read(recv);
442             if (size == -1)
443             {
444                 /*
445                  * end of stream reached (see
446                  * ReadableByteChannel.read(ByteBuffer dst)
447                  */
448                 throw new IOException("end-of-stream");
449             }
450             final byte[] bufferData = new byte[recv.position()];
451             System.arraycopy(recv.array(), 0, bufferData, 0, recv.position());
452             // find any sub-messages
453             final List<byte[]> decoded = decode(bufferData);
454             for (byte[] byteData : decoded)
455             {
456                 if (LOGGER.isLoggable(Level.FINEST))
457                 {
458                     LOGGER.finest(this + " received '"
459                         + Arrays.toString(byteData) + "'");
460                 }
461                 try
462                 {
463                     this.receiver.receive(byteData);
464                 }
465                 catch (Exception e)
466                 {
467                     Utils.logException(LOGGER, "Could not handle data", e);
468                 }
469             }
470         }
471         catch (IOException e)
472         {
473             Utils.logException(LOGGER, "Destroying " + Utils.safeToString(this)
474                 + " because data could not be read from "
475                 + Utils.safeToString(this.selectionKey.channel()), e);
476             destroy();
477         }
478     }
479 
480     /**
481      * Writes any data in the message buffer to the socket
482      */
483     private void write()
484     {
485         final List<ByteBuffer> copy =
486             new ArrayList<ByteBuffer>(this.messages.size());
487         synchronized (this)
488         {
489             copy.addAll(this.messages);
490             this.messages.clear();
491         }
492         Runnable task = new Runnable()
493         {
494             public void run()
495             {
496                 for (Iterator<ByteBuffer> iterator = copy.iterator(); iterator.hasNext();)
497                 {
498                     final ByteBuffer data = iterator.next();
499                     try
500                     {
501                         if (LOGGER.isLoggable(Level.FINEST))
502                         {
503                             LOGGER.finest(TcpConnectionEndPoint.this
504                                 + " sending '" + Arrays.toString(data.array())
505                                 + "'");
506                         }
507                         ((SocketChannel) TcpConnectionEndPoint.this.selectionKey.channel()).write(data);
508                     }
509                     catch (IOException e)
510                     {
511                         Utils.logException(LOGGER,
512                             Utils.safeToString(TcpConnectionEndPoint.this)
513                                 + " could not write data '"
514                                 + new String(data.array()) + "'", e);
515                     }
516                 }
517             }
518         };
519         if (this.writer == null)
520         {
521             task.run();
522         }
523         else
524         {
525             this.writer.execute(task);
526         }
527         synchronized (this)
528         {
529             if (this.messages.size() == 0)
530             {
531                 // nothing more to write, so remove write interest
532                 this.selectionKey.interestOps(this.selectionKey.interestOps()
533                     ^ SelectionKey.OP_WRITE);
534             }
535         }
536     }
537 
538     public void setSelectionKey(SelectionKey selectionKey)
539     {
540         this.selectionKey = selectionKey;
541     }
542 
543 }