1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirrormap.nio;
17
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.SelectableChannel;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.concurrent.Executor;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34 import mirrormap.Utils;
35 import mirrormap.lifecycle.AbstractLifeCycle;
36
37
38
39
40
41
42 public final class TcpConnectionEndPoint extends AbstractLifeCycle implements
43 IConnectionEndPoint, ISelectionKeyTask
44 {
45
46
47
48
49
50 private static final long[] bitShiftForByteOrdinal = new long[9];
51 static
52 {
53 int i = 0;
54
55 bitShiftForByteOrdinal[i] = 0;
56 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
57 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
58 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
59 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
60 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
61 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
62 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
63 bitShiftForByteOrdinal[++i] = 8 * (i - 1);
64 }
65
66 private final static Logger LOGGER =
67 Logger.getLogger(TcpConnectionEndPoint.class.getName());
68
69
70 private final static byte[] recvBuffer = new byte[65535];
71
72
73 final List<ByteBuffer> messages = new ArrayList<ByteBuffer>();
74
75
76
77
78
79 private SelectionKey selectionKey;
80
81
82 private final SelectorTasks selectorTasks;
83
84
85 private final IConnectionEndPointReceiver receiver;
86
87
88 private SocketChannel socketChannel;
89
90
91
92
93
94 private boolean outbound = true;
95
96
97 private final int remotePort;
98
99
100 private final String remoteHost;
101
102
103 private Executor writer;
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 public TcpConnectionEndPoint(IConnectionEndPointReceiver receiver,
127 SelectorTasks selectorTasks, Executor writer, String host, int port)
128 {
129 this.remoteHost = host;
130 this.remotePort = port;
131 this.receiver = receiver;
132 this.selectorTasks = selectorTasks;
133 this.writer = writer;
134 }
135
136
137
138
139
140
141
142 public void send(byte[] data)
143 {
144 if (!isActive())
145 {
146 if (LOGGER.isLoggable(Level.FINE))
147 {
148 LOGGER.fine("inactive");
149 }
150 return;
151 }
152 synchronized (this)
153 {
154 byte[] actual = encode(data);
155
156 final ByteBuffer wrapped = ByteBuffer.wrap(actual);
157 messages.add(wrapped);
158
159 this.selectionKey.interestOps(this.selectionKey.interestOps()
160 | SelectionKey.OP_WRITE);
161 this.selectionKey.selector().wakeup();
162 }
163 }
164
165 public IConnectionEndPointReceiver getReceiver()
166 {
167 return this.receiver;
168 }
169
170 @Override
171 protected void doStart()
172 {
173 try
174 {
175 if (isOutbound())
176 {
177 final SocketChannel localSocketChannel = SocketChannel.open();
178 localSocketChannel.configureBlocking(false);
179
180
181 if (!localSocketChannel.connect(new InetSocketAddress(
182 this.remoteHost, this.remotePort)))
183 {
184
185
186 final Selector selector = Selector.open();
187 final SelectionKey key =
188 localSocketChannel.register(selector,
189 SelectionKey.OP_CONNECT);
190 selector.select();
191 localSocketChannel.finishConnect();
192 key.cancel();
193 selector.close();
194 }
195 if (LOGGER.isLoggable(Level.FINE))
196 {
197 LOGGER.fine("(->) Connected to " + this);
198 }
199 setSocketChannel(localSocketChannel);
200 }
201 else
202 {
203 if (LOGGER.isLoggable(Level.FINE))
204 {
205 LOGGER.fine("(<-) Connected to " + this);
206 }
207 }
208 }
209 catch (Exception e)
210 {
211 this.socketChannel = null;
212 final String warning =
213 "Could not create socket to " + this.remoteHost + ":"
214 + this.remotePort + ". Calling destroy().";
215 destroy();
216 throw new RuntimeException(warning, e);
217 }
218 this.selectorTasks.register(SelectionKey.OP_READ, getSocketChannel(),
219 this);
220 }
221
222 public boolean isConnected()
223 {
224 return this.socketChannel != null
225 && this.socketChannel.socket() != null
226 && this.socketChannel.socket().isConnected();
227 }
228
229
230
231
232
233
234
235
236
237 void setOutbound(boolean outbound)
238 {
239 this.outbound = outbound;
240 }
241
242
243
244
245
246
247
248
249 public boolean isOutbound()
250 {
251 return outbound;
252 }
253
254
255
256
257 @Override
258 protected void doDestroy()
259 {
260 if (this.receiver != null)
261 {
262 this.receiver.destroy();
263 }
264 if (this.socketChannel != null)
265 {
266 try
267 {
268 if (this.socketChannel.socket() != null)
269 {
270 this.socketChannel.socket().getOutputStream().flush();
271 this.socketChannel.socket().shutdownInput();
272 this.socketChannel.socket().shutdownOutput();
273 this.socketChannel.socket().close();
274 }
275 this.socketChannel.close();
276 }
277 catch (IOException e)
278 {
279 if (LOGGER.isLoggable(Level.FINE))
280 {
281 LOGGER.log(Level.FINE, "Could not close "
282 + Utils.safeToString(this), e);
283 }
284 }
285 }
286 try
287 {
288 if (this.selectionKey != null
289 && this.selectionKey.channel() != null)
290 {
291 this.selectorTasks.unregister(this.selectionKey);
292 this.selectionKey.channel().close();
293 }
294 }
295 catch (Exception e)
296 {
297 Utils.logException(LOGGER, "Could not close channel for "
298 + Utils.safeToString(this), e);
299 }
300 }
301
302 protected SocketChannel getSocketChannel()
303 {
304 return this.socketChannel;
305 }
306
307 void setSocketChannel(SocketChannel socketChannel)
308 {
309 this.socketChannel = socketChannel;
310 }
311
312 @Override
313 public String toString()
314 {
315 return Utils.string(this, this.remoteHost + ":" + this.remotePort);
316 }
317
318
319
320
321
322
323
324
325
326
327 private byte[] encode(byte[] data)
328 {
329
330 byte[] actual = new byte[data.length + 4];
331 System.arraycopy(data, 0, actual, 4, data.length);
332 int startFrom = 0;
333 for (int i = 4; i > 0; i--)
334 {
335 actual[startFrom++] =
336 (byte) (data.length >>> bitShiftForByteOrdinal[i]);
337 }
338 return actual;
339 }
340
341
342
343
344
345
346
347
348
349
350
351
352 private List<byte[]> decode(byte[] message)
353 {
354
355
356 List<byte[]> values = new LinkedList<byte[]>();
357 int lenPtr = 0;
358 int len = 0;
359 while (lenPtr < message.length)
360 {
361 len = readInteger(message, lenPtr, 4);
362
363 byte[] subData = new byte[len];
364 lenPtr += 4;
365 System.arraycopy(message, lenPtr, subData, 0, len);
366 lenPtr += len;
367 values.add(subData);
368 }
369 return values;
370 }
371
372
373
374
375
376
377
378
379
380
381
382
383 private int readInteger(final byte[] bytes, final int start,
384 int numberOfBytes)
385 {
386 long output = 0;
387 final int end = numberOfBytes + start;
388
389
390 boolean negative = false;
391 if ((bytes[start] & 0x80) == 0x80)
392 {
393 output = -1;
394 negative = true;
395 }
396 for (int i = start; i < end; i++)
397 {
398
399
400 final char dataByte = (char) (bytes[i] & 0x00FF);
401 if (negative)
402 {
403
404
405 output ^=
406 ((long) 0xff << bitShiftForByteOrdinal[numberOfBytes]);
407 }
408
409
410 output |=
411 ((long) dataByte << bitShiftForByteOrdinal[numberOfBytes--]);
412 }
413 return (int) output;
414 }
415
416 public void run()
417 {
418 if (isActive())
419 {
420 if (this.selectionKey.isWritable())
421 {
422 write();
423 }
424 if (this.selectionKey.isReadable())
425 {
426 read();
427 }
428 }
429 }
430
431
432
433
434 private void read()
435 {
436 try
437 {
438 final ByteBuffer recv = ByteBuffer.wrap(recvBuffer);
439 final SocketChannel socketChannel =
440 ((SocketChannel) this.selectionKey.channel());
441 final int size = socketChannel.read(recv);
442 if (size == -1)
443 {
444
445
446
447
448 throw new IOException("end-of-stream");
449 }
450 final byte[] bufferData = new byte[recv.position()];
451 System.arraycopy(recv.array(), 0, bufferData, 0, recv.position());
452
453 final List<byte[]> decoded = decode(bufferData);
454 for (byte[] byteData : decoded)
455 {
456 if (LOGGER.isLoggable(Level.FINEST))
457 {
458 LOGGER.finest(this + " received '"
459 + Arrays.toString(byteData) + "'");
460 }
461 try
462 {
463 this.receiver.receive(byteData);
464 }
465 catch (Exception e)
466 {
467 Utils.logException(LOGGER, "Could not handle data", e);
468 }
469 }
470 }
471 catch (IOException e)
472 {
473 Utils.logException(LOGGER, "Destroying " + Utils.safeToString(this)
474 + " because data could not be read from "
475 + Utils.safeToString(this.selectionKey.channel()), e);
476 destroy();
477 }
478 }
479
480
481
482
483 private void write()
484 {
485 final List<ByteBuffer> copy =
486 new ArrayList<ByteBuffer>(this.messages.size());
487 synchronized (this)
488 {
489 copy.addAll(this.messages);
490 this.messages.clear();
491 }
492 Runnable task = new Runnable()
493 {
494 public void run()
495 {
496 for (Iterator<ByteBuffer> iterator = copy.iterator(); iterator.hasNext();)
497 {
498 final ByteBuffer data = iterator.next();
499 try
500 {
501 if (LOGGER.isLoggable(Level.FINEST))
502 {
503 LOGGER.finest(TcpConnectionEndPoint.this
504 + " sending '" + Arrays.toString(data.array())
505 + "'");
506 }
507 ((SocketChannel) TcpConnectionEndPoint.this.selectionKey.channel()).write(data);
508 }
509 catch (IOException e)
510 {
511 Utils.logException(LOGGER,
512 Utils.safeToString(TcpConnectionEndPoint.this)
513 + " could not write data '"
514 + new String(data.array()) + "'", e);
515 }
516 }
517 }
518 };
519 if (this.writer == null)
520 {
521 task.run();
522 }
523 else
524 {
525 this.writer.execute(task);
526 }
527 synchronized (this)
528 {
529 if (this.messages.size() == 0)
530 {
531
532 this.selectionKey.interestOps(this.selectionKey.interestOps()
533 ^ SelectionKey.OP_WRITE);
534 }
535 }
536 }
537
538 public void setSelectionKey(SelectionKey selectionKey)
539 {
540 this.selectionKey = selectionKey;
541 }
542
543 }