View Javadoc

1   /*
2      Copyright 2010 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package mirrormap.nio;
17  
18  import java.io.IOException;
19  import java.net.InetAddress;
20  import java.net.InetSocketAddress;
21  import java.net.UnknownHostException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.logging.Level;
29  import java.util.logging.Logger;
30  
31  import mirrormap.Utils;
32  import mirrormap.lifecycle.AbstractLifeCycle;
33  import mirrormap.nio.IConnectionEndPointReceiver.IFactory;
34  
35  /**
36   * A server that opens a TCP server socket and spawns
37   * {@link TcpConnectionEndPoint} instances to handle each of the client
38   * connections.
39   * 
40   * @author Ramon Servadei
41   */
42  public class TcpServer extends AbstractLifeCycle
43  {
44      private final static Logger LOGGER =
45          Logger.getLogger(TcpServer.class.getName());
46  
47      /**
48       * The system property for overriding the default receive buffer size for
49       * the server port. This must be specified in units of bytes.
50       */
51      public static final String CONTEXT_RCV_BUFFER = "TcpServer.bufferSize";
52  
53      /** The default TCP port */
54      public static final int DEFAULT_TCP_PORT = 16025;
55  
56      /** The host name */
57      public final static String HOST_NAME;
58  
59      static
60      {
61          String name = null;
62          try
63          {
64              name = InetAddress.getLocalHost().getHostName();
65          }
66          catch (UnknownHostException e)
67          {
68              LOGGER.log(Level.SEVERE, "Could not get host name", e);
69              System.exit(1);
70          }
71          HOST_NAME = name;
72      }
73  
74      /**
75       * Task that handles new connections to this server. It creates a
76       * {@link TcpConnectionEndPoint} for each connection.
77       * 
78       * @author Ramon Servadei
79       */
80      private final class ConnectionAcceptTask implements
81          ISelectorTasksStateListener, ISelectionKeyTask
82      {
83  
84          /** The selection key */
85          private SelectionKey selectionKey;
86  
87          public ConnectionAcceptTask()
88          {
89              super();
90          }
91  
92          public void run()
93          {
94              if (TcpServer.this.isActive() && getSelectionKey().isAcceptable())
95              {
96                  try
97                  {
98                      SocketChannel socketChannel =
99                          TcpServer.this.serverSocketChannel.accept();
100                     if (LOGGER.isLoggable(Level.INFO))
101                     {
102                         LOGGER.info("(<-) Accepted inbound " + socketChannel);
103                     }
104                     socketChannel.configureBlocking(false);
105                     final String address =
106                         socketChannel.socket().getInetAddress().getHostName();
107                     final int port = socketChannel.socket().getPort();
108                     final IConnectionEndPointReceiver createEndPointRecevier =
109                         TcpServer.this.endPointReceiverFactory.createEndPointRecevier();
110                     final TcpConnectionEndPoint endPoint =
111                         new TcpConnectionEndPoint(createEndPointRecevier,
112                             TcpServer.this.selectorTasks,
113                             TcpServer.this.writer, address, port);
114                     createEndPointRecevier.setEndPointConnection(endPoint);
115                     endPoint.setOutbound(false);
116                     endPoint.setSocketChannel(socketChannel);
117                     endPoint.start();
118                 }
119                 catch (Exception e)
120                 {
121                     Utils.logException(LOGGER, Utils.safeToString(this)
122                         + " could not accept connection", e);
123                 }
124             }
125         }
126 
127         public void selectorOpened(SelectorTasks tasks)
128         {
129             if (TcpServer.this.isActive()
130                 && TcpServer.this.serverSocketChannel != null
131                 && TcpServer.this.connectionAcceptTask != null)
132             {
133                 // (re)register
134                 TcpServer.this.selectorTasks.register(SelectionKey.OP_ACCEPT,
135                     TcpServer.this.serverSocketChannel,
136                     TcpServer.this.connectionAcceptTask);
137             }
138         }
139 
140         public SelectionKey getSelectionKey()
141         {
142             return this.selectionKey;
143         }
144 
145         public void setSelectionKey(SelectionKey selectionKey)
146         {
147             this.selectionKey = selectionKey;
148         }
149 
150         public final void destroy()
151         {
152             this.selectionKey.cancel();
153         }
154 
155         @Override
156         public String toString()
157         {
158             return getClass().getSimpleName();
159         }
160     }
161 
162     /** The IP address for the TCP/IP socket of this server */
163     private final String address;
164 
165     /** The port number for the TCP/IP socket of this server */
166     private final int port;
167 
168     /** The server socket channel of this server */
169     ServerSocketChannel serverSocketChannel;
170 
171     /** The selector tasks */
172     private final SelectorTasks selectorTasks;
173 
174     /** The task that handles new socket connections to the server socket */
175     private final ConnectionAcceptTask connectionAcceptTask;
176 
177     /** The thread that handles all the socket IO activity */
178     private Thread connectionThread;
179 
180     /** Handles socket writing activity */
181     private final ExecutorService writer =
182         Executors.newFixedThreadPool(1, new ThreadFactory()
183         {
184             public Thread newThread(Runnable r)
185             {
186                 Thread thread =
187                     new Thread(r, TcpServer.class.getSimpleName()
188                         + " NIO writer");
189                 thread.setDaemon(true);
190                 return thread;
191             }
192         });
193 
194     /** Creates {@link IConnectionEndPointReceiver} instances per client socket */
195     private final IFactory endPointReceiverFactory;
196 
197     /**
198      * Construct the server with the factory that will create the
199      * {@link IConnectionEndPointReceiver} per client socket connection and the
200      * server socket address and port
201      * 
202      * @param endPointReceiverFactory
203      *            the factory to create the {@link IConnectionEndPointReceiver}
204      *            for each client socket
205      * @param address
206      *            the server socket address or host name
207      * @param port
208      *            the server socket TCP port
209      */
210     public TcpServer(
211         IConnectionEndPointReceiver.IFactory endPointReceiverFactory,
212         String address, int port)
213     {
214         super();
215         this.address = address == null ? TcpServer.HOST_NAME : address;
216         this.port = port <= 0 ? TcpServer.DEFAULT_TCP_PORT : port;
217         this.selectorTasks = new SelectorTasks();
218         this.connectionAcceptTask = new ConnectionAcceptTask();
219         this.selectorTasks.setStateListener(this.connectionAcceptTask);
220         this.endPointReceiverFactory = endPointReceiverFactory;
221     }
222 
223     @Override
224     public String toString()
225     {
226         return Utils.string(this, this.address + ":" + this.port);
227     }
228 
229     @Override
230     protected void doDestroy()
231     {
232         this.writer.shutdownNow();
233         this.selectorTasks.destroy();
234         try
235         {
236             this.serverSocketChannel.close();
237         }
238         catch (IOException e)
239         {
240             Utils.logException(LOGGER, "Could not close "
241                 + Utils.safeToString(this.serverSocketChannel), e);
242         }
243     }
244 
245     @Override
246     protected void doStart()
247     {
248         try
249         {
250             this.serverSocketChannel = ServerSocketChannel.open();
251             this.serverSocketChannel.configureBlocking(false);
252             if (System.getProperty(CONTEXT_RCV_BUFFER) != null)
253             {
254                 this.serverSocketChannel.socket().setReceiveBufferSize(
255                     Integer.parseInt(System.getProperty(CONTEXT_RCV_BUFFER)));
256             }
257             final InetSocketAddress endpoint =
258                 new InetSocketAddress(this.address, this.port);
259             this.serverSocketChannel.socket().bind(endpoint);
260 
261             this.selectorTasks.start();
262             this.connectionThread =
263                 new Thread(this.selectorTasks.getRunnable(), this.toString());
264             this.connectionThread.setDaemon(true);
265             this.connectionThread.start();
266         }
267         catch (IOException e)
268         {
269             LOGGER.log(Level.SEVERE, "Could not open channel for " + this, e);
270             // super fatal
271             System.exit(1);
272         }
273     }
274 }