1 /*
2 Copyright 2010 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package mirrormap.nio;
17
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.SelectableChannel;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.concurrent.Executor;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34 import mirrormap.Utils;
35 import mirrormap.lifecycle.AbstractLifeCycle;
36
37 /**
38 * Handles the socket processing for a TCP end point.
39 *
40 * @author Ramon Servadei
41 */
42 public final class TcpConnectionEndPoint extends AbstractLifeCycle implements
43 IConnectionEndPoint, ISelectionKeyTask
44 {
45 /**
46 * Bit shifts to access the byte ordinal that the index represents. E.g.
47 * bitShiftForByteOrdinal[2] has a value of 8 which is the number of bits to
48 * right shift an integral to access the 2nd byte.
49 */
50 private static final long[] bitShiftForByteOrdinal = new long[9];
51 static
52 {
53 int i = 0;
54 // 0 doesn't count
55 bitShiftForByteOrdinal[i] = 0;
56 bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 0
57 bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 8
58 bitShiftForByteOrdinal[++i] = 8 * (i - 1); // 16
59 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
60 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
61 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
62 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
63 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
64 }
65
66 private final static Logger LOGGER =
67 Logger.getLogger(TcpConnectionEndPoint.class.getName());
68
69 /** The receive buffer, 65535 bytes */
70 private final static byte[] recvBuffer = new byte[65535];
71
72 /** The pending messages to send */
73 final List<ByteBuffer> messages = new ArrayList<ByteBuffer>();
74
75 /**
76 * The selection key associated with the {@link SelectableChannel} for this
77 * end point
78 */
79 private SelectionKey selectionKey;
80
81 /** Reference required during {@link #doComponentDestroy()} */
82 private final SelectorTasks selectorTasks;
83
84 /** Handles data received from the remote end point */
85 private final IConnectionEndPointReceiver receiver;
86
87 /** The socket channel to the remote {@link IConnectionEndPoint} */
88 private SocketChannel socketChannel;
89
90 /**
91 * Flag to indicate the socket direction, by default connections are
92 * outbound
93 */
94 private boolean outbound = true;
95
96 /** The TCP port for the socket connection of the other end point */
97 private final int remotePort;
98
99 /** The IP address or resolvable host name of the other end point */
100 private final String remoteHost;
101
102 /** Handles the socket writing */
103 private Executor writer;
104
105 /**
106 * Standard constructor for a TCP end point.
107 *
108 * @param receiver
109 * the object that will handle data received from the remote end
110 * point
111 * @param selectorTasks
112 * used to register and unregister this end point against the
113 * {@link SelectionKey} of the underlying
114 * {@link SelectableChannel} backing this end point
115 * @param writer
116 * handles writing data, optional. If <code>null</code> then the
117 * {@link Thread} running the {@link SelectorTasks#process()}
118 * method will do the writing
119 * @param host
120 * the IP address or resolvable host name of the other end point
121 * @param port
122 * the TCP port for the socket connection of the other end point
123 * @throws IllegalStateException
124 * if the socket could not be constructed
125 */
126 public TcpConnectionEndPoint(IConnectionEndPointReceiver receiver,
127 SelectorTasks selectorTasks, Executor writer, String host, int port)
128 {
129 this.remoteHost = host;
130 this.remotePort = port;
131 this.receiver = receiver;
132 this.selectorTasks = selectorTasks;
133 this.writer = writer;
134 }
135
136 /**
137 * Add the data to the pending messages list
138 *
139 * @param data
140 * the data to add to the pending messages
141 */
142 public void send(byte[] data)
143 {
144 if (!isActive())
145 {
146 if (LOGGER.isLoggable(Level.FINE))
147 {
148 LOGGER.fine("inactive");
149 }
150 return;
151 }
152 synchronized (this)
153 {
154 byte[] actual = encode(data);
155
156 final ByteBuffer wrapped = ByteBuffer.wrap(actual);
157 messages.add(wrapped);
158 // we're interested in writing now
159 this.selectionKey.interestOps(this.selectionKey.interestOps()
160 | SelectionKey.OP_WRITE);
161 this.selectionKey.selector().wakeup();
162 }
163 }
164
165 public IConnectionEndPointReceiver getReceiver()
166 {
167 return this.receiver;
168 }
169
170 @Override
171 protected void doStart()
172 {
173 try
174 {
175 if (isOutbound())
176 {
177 final SocketChannel localSocketChannel = SocketChannel.open();
178 localSocketChannel.configureBlocking(false);
179 // as the channel is in non-blocking mode, we need some extra
180 // work to connect
181 if (!localSocketChannel.connect(new InetSocketAddress(
182 this.remoteHost, this.remotePort)))
183 {
184 // the connection has not completed, so wait on the selector
185 // until we can complete the connection
186 final Selector selector = Selector.open();
187 final SelectionKey key =
188 localSocketChannel.register(selector,
189 SelectionKey.OP_CONNECT);
190 selector.select();
191 localSocketChannel.finishConnect();
192 key.cancel();
193 selector.close();
194 }
195 if (LOGGER.isLoggable(Level.FINE))
196 {
197 LOGGER.fine("(->) Connected to " + this);
198 }
199 setSocketChannel(localSocketChannel);
200 }
201 else
202 {
203 if (LOGGER.isLoggable(Level.FINE))
204 {
205 LOGGER.fine("(<-) Connected to " + this);
206 }
207 }
208 }
209 catch (Exception e)
210 {
211 this.socketChannel = null;
212 final String warning =
213 "Could not create socket to " + this.remoteHost + ":"
214 + this.remotePort + ". Calling destroy().";
215 destroy();
216 throw new RuntimeException(warning, e);
217 }
218 this.selectorTasks.register(SelectionKey.OP_READ, getSocketChannel(),
219 this);
220 }
221
222 public boolean isConnected()
223 {
224 return this.socketChannel != null
225 && this.socketChannel.socket() != null
226 && this.socketChannel.socket().isConnected();
227 }
228
229 /**
230 * Set the direction of the socket connection.
231 *
232 * @param outbound
233 * <code>true</code> for an outbound connecting socket (connects
234 * to a server socket), <code>false</code> for an inbound
235 * connecting one
236 */
237 void setOutbound(boolean outbound)
238 {
239 this.outbound = outbound;
240 }
241
242 /**
243 * Indicates the connecting direction.
244 *
245 * @return <code>true</code> if this socket connects to a server socket,
246 * <code>false</code> if it was an inbound connection from another
247 * end point
248 */
249 public boolean isOutbound()
250 {
251 return outbound;
252 }
253
254 /**
255 * Destroy the internal TCP I/O components.
256 */
257 @Override
258 protected void doDestroy()
259 {
260 if (this.receiver != null)
261 {
262 this.receiver.destroy();
263 }
264 if (this.socketChannel != null)
265 {
266 try
267 {
268 if (this.socketChannel.socket() != null)
269 {
270 this.socketChannel.socket().getOutputStream().flush();
271 this.socketChannel.socket().shutdownInput();
272 this.socketChannel.socket().shutdownOutput();
273 this.socketChannel.socket().close();
274 }
275 this.socketChannel.close();
276 }
277 catch (IOException e)
278 {
279 if (LOGGER.isLoggable(Level.FINE))
280 {
281 LOGGER.log(Level.FINE, "Could not close "
282 + Utils.safeToString(this), e);
283 }
284 }
285 }
286 try
287 {
288 if (this.selectionKey != null
289 && this.selectionKey.channel() != null)
290 {
291 this.selectorTasks.unregister(this.selectionKey);
292 this.selectionKey.channel().close();
293 }
294 }
295 catch (Exception e)
296 {
297 Utils.logException(LOGGER, "Could not close channel for "
298 + Utils.safeToString(this), e);
299 }
300 }
301
302 protected SocketChannel getSocketChannel()
303 {
304 return this.socketChannel;
305 }
306
307 void setSocketChannel(SocketChannel socketChannel)
308 {
309 this.socketChannel = socketChannel;
310 }
311
312 @Override
313 public String toString()
314 {
315 return Utils.string(this, this.remoteHost + ":" + this.remotePort);
316 }
317
318 /**
319 * Add a 4 byte prefix to the data that includes the length of the data as a
320 * big-endian integer.
321 *
322 * @param data
323 * the data
324 * @return an array equal in length to <code>4 + data.length</code> with a
325 * copy of data starting from element 4
326 */
327 private byte[] encode(byte[] data)
328 {
329 // add the integer length of the data, always the first 4 bytes
330 byte[] actual = new byte[data.length + 4];
331 System.arraycopy(data, 0, actual, 4, data.length);
332 int startFrom = 0;
333 for (int i = 4; i > 0; i--)
334 {
335 actual[startFrom++] =
336 (byte) (data.length >>> bitShiftForByteOrdinal[i]);
337 }
338 return actual;
339 }
340
341 /**
342 * Split the data into its sub-messages. Messages have a 4 byte prefix that
343 * is the message length in bytes, excluding the 4 bytes, as a big-endian
344 * integer. This allows multiple messages to be received as a single TCP
345 * frame.
346 *
347 * @param message
348 * the full TCP frame data
349 * @return a list of the individual messages contained in the message
350 * argument
351 */
352 private List<byte[]> decode(byte[] message)
353 {
354 // the TCP message could be a concatenation of multiple ones
355 // the format is: <4 bytes len><subdata><4 bytes len><subdata>
356 List<byte[]> values = new LinkedList<byte[]>();
357 int lenPtr = 0;
358 int len = 0;
359 while (lenPtr < message.length)
360 {
361 len = readInteger(message, lenPtr, 4);
362 // copy the sub data into the new array
363 byte[] subData = new byte[len];
364 lenPtr += 4;
365 System.arraycopy(message, lenPtr, subData, 0, len);
366 lenPtr += len;
367 values.add(subData);
368 }
369 return values;
370 }
371
372 /**
373 * Reconstruct an integer from its byte form in the byte[].
374 *
375 * @param bytes
376 * the byte[] holding the data in byte form
377 * @param start
378 * the position in bytes where the data starts
379 * @param numberOfBytes
380 * the number of bytes that form the data to read
381 * @return an integer
382 */
383 private int readInteger(final byte[] bytes, final int start,
384 int numberOfBytes)
385 {
386 long output = 0;
387 final int end = numberOfBytes + start;
388 // identify if this is a -ve number, has ramifications for building up
389 // the output
390 boolean negative = false;
391 if ((bytes[start] & 0x80) == 0x80)
392 {
393 output = -1;
394 negative = true;
395 }
396 for (int i = start; i < end; i++)
397 {
398 // cast to a char as it is unsigned, and also AND with 00FF to
399 // correctly promote whilst keeping the byte unsigned
400 final char dataByte = (char) (bytes[i] & 0x00FF);
401 if (negative)
402 {
403 // clear out this byte position with 0, prior to bitwise ORing
404 // the byte
405 output ^=
406 ((long) 0xff << bitShiftForByteOrdinal[numberOfBytes]);
407 }
408 // bitwise OR the byte after shifting it by the correct number of
409 // bits
410 output |=
411 ((long) dataByte << bitShiftForByteOrdinal[numberOfBytes--]);
412 }
413 return (int) output;
414 }
415
416 public void run()
417 {
418 if (isActive())
419 {
420 if (this.selectionKey.isWritable())
421 {
422 write();
423 }
424 if (this.selectionKey.isReadable())
425 {
426 read();
427 }
428 }
429 }
430
431 /**
432 * Reads data from the socket
433 */
434 private void read()
435 {
436 try
437 {
438 final ByteBuffer recv = ByteBuffer.wrap(recvBuffer);
439 final SocketChannel socketChannel =
440 ((SocketChannel) this.selectionKey.channel());
441 final int size = socketChannel.read(recv);
442 if (size == -1)
443 {
444 /*
445 * end of stream reached (see
446 * ReadableByteChannel.read(ByteBuffer dst)
447 */
448 throw new IOException("end-of-stream");
449 }
450 final byte[] bufferData = new byte[recv.position()];
451 System.arraycopy(recv.array(), 0, bufferData, 0, recv.position());
452 // find any sub-messages
453 final List<byte[]> decoded = decode(bufferData);
454 for (byte[] byteData : decoded)
455 {
456 if (LOGGER.isLoggable(Level.FINEST))
457 {
458 LOGGER.finest(this + " received '"
459 + Arrays.toString(byteData) + "'");
460 }
461 try
462 {
463 this.receiver.receive(byteData);
464 }
465 catch (Exception e)
466 {
467 Utils.logException(LOGGER, "Could not handle data", e);
468 }
469 }
470 }
471 catch (IOException e)
472 {
473 Utils.logException(LOGGER, "Destroying " + Utils.safeToString(this)
474 + " because data could not be read from "
475 + Utils.safeToString(this.selectionKey.channel()), e);
476 destroy();
477 }
478 }
479
480 /**
481 * Writes any data in the message buffer to the socket
482 */
483 private void write()
484 {
485 final List<ByteBuffer> copy =
486 new ArrayList<ByteBuffer>(this.messages.size());
487 synchronized (this)
488 {
489 copy.addAll(this.messages);
490 this.messages.clear();
491 }
492 Runnable task = new Runnable()
493 {
494 public void run()
495 {
496 for (Iterator<ByteBuffer> iterator = copy.iterator(); iterator.hasNext();)
497 {
498 final ByteBuffer data = iterator.next();
499 try
500 {
501 if (LOGGER.isLoggable(Level.FINEST))
502 {
503 LOGGER.finest(TcpConnectionEndPoint.this
504 + " sending '" + Arrays.toString(data.array())
505 + "'");
506 }
507 ((SocketChannel) TcpConnectionEndPoint.this.selectionKey.channel()).write(data);
508 }
509 catch (IOException e)
510 {
511 Utils.logException(LOGGER,
512 Utils.safeToString(TcpConnectionEndPoint.this)
513 + " could not write data '"
514 + new String(data.array()) + "'", e);
515 }
516 }
517 }
518 };
519 if (this.writer == null)
520 {
521 task.run();
522 }
523 else
524 {
525 this.writer.execute(task);
526 }
527 synchronized (this)
528 {
529 if (this.messages.size() == 0)
530 {
531 // nothing more to write, so remove write interest
532 this.selectionKey.interestOps(this.selectionKey.interestOps()
533 ^ SelectionKey.OP_WRITE);
534 }
535 }
536 }
537
538 public void setSelectionKey(SelectionKey selectionKey)
539 {
540 this.selectionKey = selectionKey;
541 }
542
543 }