1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirrormap.nio;
17
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.InetSocketAddress;
21 import java.net.UnknownHostException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30
31 import mirrormap.Utils;
32 import mirrormap.lifecycle.AbstractLifeCycle;
33 import mirrormap.nio.IConnectionEndPointReceiver.IFactory;
34
35
36
37
38
39
40
41
42 public class TcpServer extends AbstractLifeCycle
43 {
44 private final static Logger LOGGER =
45 Logger.getLogger(TcpServer.class.getName());
46
47
48
49
50
51 public static final String CONTEXT_RCV_BUFFER = "TcpServer.bufferSize";
52
53
54 public static final int DEFAULT_TCP_PORT = 16025;
55
56
57 public final static String HOST_NAME;
58
59 static
60 {
61 String name = null;
62 try
63 {
64 name = InetAddress.getLocalHost().getHostName();
65 }
66 catch (UnknownHostException e)
67 {
68 LOGGER.log(Level.SEVERE, "Could not get host name", e);
69 System.exit(1);
70 }
71 HOST_NAME = name;
72 }
73
74
75
76
77
78
79
80 private final class ConnectionAcceptTask implements
81 ISelectorTasksStateListener, ISelectionKeyTask
82 {
83
84
85 private SelectionKey selectionKey;
86
87 public ConnectionAcceptTask()
88 {
89 super();
90 }
91
92 public void run()
93 {
94 if (TcpServer.this.isActive() && getSelectionKey().isAcceptable())
95 {
96 try
97 {
98 SocketChannel socketChannel =
99 TcpServer.this.serverSocketChannel.accept();
100 if (LOGGER.isLoggable(Level.INFO))
101 {
102 LOGGER.info("(<-) Accepted inbound " + socketChannel);
103 }
104 socketChannel.configureBlocking(false);
105 final String address =
106 socketChannel.socket().getInetAddress().getHostName();
107 final int port = socketChannel.socket().getPort();
108 final IConnectionEndPointReceiver createEndPointRecevier =
109 TcpServer.this.endPointReceiverFactory.createEndPointRecevier();
110 final TcpConnectionEndPoint endPoint =
111 new TcpConnectionEndPoint(createEndPointRecevier,
112 TcpServer.this.selectorTasks,
113 TcpServer.this.writer, address, port);
114 createEndPointRecevier.setEndPointConnection(endPoint);
115 endPoint.setOutbound(false);
116 endPoint.setSocketChannel(socketChannel);
117 endPoint.start();
118 }
119 catch (Exception e)
120 {
121 Utils.logException(LOGGER, Utils.safeToString(this)
122 + " could not accept connection", e);
123 }
124 }
125 }
126
127 public void selectorOpened(SelectorTasks tasks)
128 {
129 if (TcpServer.this.isActive()
130 && TcpServer.this.serverSocketChannel != null
131 && TcpServer.this.connectionAcceptTask != null)
132 {
133
134 TcpServer.this.selectorTasks.register(SelectionKey.OP_ACCEPT,
135 TcpServer.this.serverSocketChannel,
136 TcpServer.this.connectionAcceptTask);
137 }
138 }
139
140 public SelectionKey getSelectionKey()
141 {
142 return this.selectionKey;
143 }
144
145 public void setSelectionKey(SelectionKey selectionKey)
146 {
147 this.selectionKey = selectionKey;
148 }
149
150 public final void destroy()
151 {
152 this.selectionKey.cancel();
153 }
154
155 @Override
156 public String toString()
157 {
158 return getClass().getSimpleName();
159 }
160 }
161
162
163 private final String address;
164
165
166 private final int port;
167
168
169 ServerSocketChannel serverSocketChannel;
170
171
172 private final SelectorTasks selectorTasks;
173
174
175 private final ConnectionAcceptTask connectionAcceptTask;
176
177
178 private Thread connectionThread;
179
180
181 private final ExecutorService writer =
182 Executors.newFixedThreadPool(1, new ThreadFactory()
183 {
184 public Thread newThread(Runnable r)
185 {
186 Thread thread =
187 new Thread(r, TcpServer.class.getSimpleName()
188 + " NIO writer");
189 thread.setDaemon(true);
190 return thread;
191 }
192 });
193
194
195 private final IFactory endPointReceiverFactory;
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 public TcpServer(
211 IConnectionEndPointReceiver.IFactory endPointReceiverFactory,
212 String address, int port)
213 {
214 super();
215 this.address = address == null ? TcpServer.HOST_NAME : address;
216 this.port = port <= 0 ? TcpServer.DEFAULT_TCP_PORT : port;
217 this.selectorTasks = new SelectorTasks();
218 this.connectionAcceptTask = new ConnectionAcceptTask();
219 this.selectorTasks.setStateListener(this.connectionAcceptTask);
220 this.endPointReceiverFactory = endPointReceiverFactory;
221 }
222
223 @Override
224 public String toString()
225 {
226 return Utils.string(this, this.address + ":" + this.port);
227 }
228
229 @Override
230 protected void doDestroy()
231 {
232 this.writer.shutdownNow();
233 this.selectorTasks.destroy();
234 try
235 {
236 this.serverSocketChannel.close();
237 }
238 catch (IOException e)
239 {
240 Utils.logException(LOGGER, "Could not close "
241 + Utils.safeToString(this.serverSocketChannel), e);
242 }
243 }
244
245 @Override
246 protected void doStart()
247 {
248 try
249 {
250 this.serverSocketChannel = ServerSocketChannel.open();
251 this.serverSocketChannel.configureBlocking(false);
252 if (System.getProperty(CONTEXT_RCV_BUFFER) != null)
253 {
254 this.serverSocketChannel.socket().setReceiveBufferSize(
255 Integer.parseInt(System.getProperty(CONTEXT_RCV_BUFFER)));
256 }
257 final InetSocketAddress endpoint =
258 new InetSocketAddress(this.address, this.port);
259 this.serverSocketChannel.socket().bind(endpoint);
260
261 this.selectorTasks.start();
262 this.connectionThread =
263 new Thread(this.selectorTasks.getRunnable(), this.toString());
264 this.connectionThread.setDaemon(true);
265 this.connectionThread.start();
266 }
267 catch (IOException e)
268 {
269 LOGGER.log(Level.SEVERE, "Could not open channel for " + this, e);
270
271 System.exit(1);
272 }
273 }
274 }