View Javadoc

1   /*
2      Copyright 2010 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package mirrormap;
17  
18  import java.io.Serializable;
19  import java.util.HashMap;
20  import java.util.Iterator;
21  import java.util.Map;
22  import java.util.logging.Level;
23  import java.util.logging.Logger;
24  
25  import mirrormap.collections.NotifyingMap;
26  import mirrormap.commands.RegisterCommand;
27  import mirrormap.commands.UnregisterCommand;
28  import mirrormap.nio.SelectorTasks;
29  import mirrormap.nio.TcpConnectionEndPoint;
30  import mirrormap.nio.TcpServer;
31  
32  /**
33   * MirrorMap is a replicating Java map implementation. Its key features are:
34   * <ul>
35   * <li>Replicates changes to remote instances
36   * <li>Can detect when the map contents change
37   * <li>Can detect when a local map is connected and disconnected from remote
38   * instances
39   * <li>Can detect when peers of a map are connected and disconnected
40   * </ul>
41   * A MirrorMap wraps any standard Java map implementation and decorates it with
42   * the ability to replicate changes with other remote MirrorMap instances. A
43   * MirrorMap is given a name, replication occurs with remote instances of the
44   * same name. Both the map key and value need to implement the Serializable
45   * interface.
46   * <p>
47   * MirrorMap is built on top of a {@link NotifyingMap}; this is a Map
48   * implementation that has the ability to notify observer objects when map
49   * entries are added, updated or removed. The operations to the map are further
50   * categorised into a 'before' and 'after' events e.g. when adding an entry to a
51   * NotifyingMap, observers receive a 'will add' notification, the entry is
52   * added, then the observers receive a 'did add' notification. A similar for
53   * pattern follows for update and remove operations.
54   * <p>
55   * Logically, replication is peer-to-peer. Physically, all peer communication is
56   * done using a server component. The server holds the master copy of the named
57   * MirrorMap. New peers register with the server for replication and will
58   * receive the full map image upon registration. The connection with the server
59   * is point-to-point using TCP; the server performs the fan out of changes to
60   * all peers.
61   * <p>
62   * MirrorMap is written using the Java NIO library. A mirror map has a single
63   * I/O thread dealing with network communication. The mirror map server (
64   * {@link #server(String, int)}) creates 2 threads; one to handle reading and
65   * one to handle writing.
66   * 
67   * @param <K>
68   *            The key type for the map
69   * @param <V>
70   *            The value type for the map
71   * @author Ramon Servadei
72   */
73  public class MirrorMap<K extends Serializable, V extends Serializable> extends
74      NotifyingMap<K, V>
75  {
76      private final static Logger LOGGER =
77          Logger.getLogger(MirrorMap.class.getName());
78  
79      /** A no-op implementation */
80      private static final IMirrorMapConnectionListener NULL_CONNECTION_LISTENER =
81          new IMirrorMapConnectionListener()
82          {
83  
84              public void localConnected(String mirrorMapName)
85              {
86              }
87  
88              public void localDisconnected(String mirrorMapName)
89              {
90              }
91  
92              public void peerConnected(String mirrorMapName)
93              {
94              }
95  
96              public void peerDisconnected(String mirrorMapName)
97              {
98              }
99          };
100 
101     /** The cache of all {@link MirrorMap} instances */
102     @SuppressWarnings("unchecked")
103     private static final Map<String, MirrorMap> mirrorMaps =
104         new HashMap<String, MirrorMap>();
105 
106     /**
107      * Create a {@link MirrorMap} identified by a unique name that wraps the
108      * passed in {@link Map}. If the mirror map already exists, the current
109      * instance is returned and nothing is done with the {@link Map} argument.
110      * 
111      * @param <Key>
112      * @param <Value>
113      * @param name
114      *            the name identifying the {@link MirrorMap}
115      * @param data
116      *            the underlying {@link Map} for the {@link MirrorMap}
117      * @return a {@link MirrorMap} or <code>null</code> if there is none for
118      *         this name
119      */
120     @SuppressWarnings("unchecked")
121     public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> create(
122         String name, Map<Key, Value> data)
123     {
124         MirrorMap mirrorMap = mirrorMaps.get(name);
125         if (mirrorMap == null)
126         {
127             mirrorMap = new MirrorMap(name, data);
128             mirrorMaps.put(name, mirrorMap);
129         }
130         return mirrorMap;
131     }
132 
133     /**
134      * Get the {@link MirrorMap} identified by the name.
135      * 
136      * @param <Key>
137      * @param <Value>
138      * @param name
139      *            the name identifying the {@link MirrorMap}
140      * @return a {@link MirrorMap} or <code>null</code> if there is none for
141      *         this name
142      */
143     @SuppressWarnings("unchecked")
144     public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> get(
145         String name)
146     {
147         return mirrorMaps.get(name);
148     }
149 
150     /**
151      * Holds the {@link TcpConnectionEndPoint} instances per named
152      * {@link MirrorMap}. This is used for the client outbound connections.
153      */
154     final static Map<String, TcpConnectionEndPoint> mapConnections =
155         new HashMap<String, TcpConnectionEndPoint>();
156 
157     /**
158      * Holds the {@link TcpConnectionEndPoint} instances keyed against the
159      * string host and port of the server socket it is attached to. This is used
160      * for the client outbound connections.
161      */
162     final static Map<String, TcpConnectionEndPoint> addressConnections =
163         new HashMap<String, TcpConnectionEndPoint>();
164 
165     /**
166      * Handles the I/O for client connections in conjunction with
167      * {@link #clientThread}
168      */
169     static SelectorTasks selectorTasks;
170 
171     /** The client thread handling I/O from the server */
172     private static Thread clientThread;
173 
174     /**
175      * @return the client thread name
176      */
177     static String getClientThreadName()
178     {
179         return "MirrorMap I/O thread";
180     }
181 
182     /**
183      * Initialise client-side components
184      */
185     private synchronized static void initClientComponents()
186     {
187         if (selectorTasks != null)
188         {
189             return;
190         }
191         selectorTasks = new SelectorTasks();
192         try
193         {
194             selectorTasks.start();
195         }
196         catch (Exception e)
197         {
198             LOGGER.log(Level.SEVERE, "Could not initialise selector tasks", e);
199             System.exit(1);
200         }
201         clientThread =
202             new Thread(selectorTasks.getRunnable(), getClientThreadName());
203         try
204         {
205             clientThread.setDaemon(true);
206             clientThread.start();
207         }
208         catch (Exception e)
209         {
210             LOGGER.log(Level.SEVERE, "Could not start client I/O thread", e);
211             System.exit(1);
212         }
213     }
214 
215     /**
216      * Connects the passed in {@link MirrorMap} to the mirror map server
217      * identified by the host and port. After this method completes, the
218      * instance will receive changes from remote instances and local updates
219      * will be sent to the server for distribution to other instances.
220      * 
221      * @param map
222      *            the map to connect
223      * @param host
224      *            the host or IP of the mirror map server to connect to
225      * @param port
226      *            the port of the mirror map server to connect to
227      * @return <code>true</code> if the map was connected, <code>false</code>
228      *         otherwise
229      */
230     public static synchronized boolean connect(
231         MirrorMap<? extends Serializable, ? extends Serializable> map,
232         String host, int port)
233     {
234         initClientComponents();
235         TcpConnectionEndPoint endPoint = null;
236         if (mapConnections.containsKey(map.name))
237         {
238             if (LOGGER.isLoggable(Level.FINE))
239             {
240                 LOGGER.fine("Already connected: " + map.name);
241             }
242             return false;
243         }
244 
245         // see what connections we have
246         final String address = host + ":" + port;
247         if (addressConnections.containsKey(address))
248         {
249             // found one, lets use it....
250             endPoint = addressConnections.get(address);
251         }
252         if (endPoint == null)
253         {
254             // we need to create a new connection
255             try
256             {
257                 final ClientCommandReceiver receiver =
258                     new ClientCommandReceiver();
259                 endPoint =
260                     new TcpConnectionEndPoint(receiver, selectorTasks, null,
261                         host, port);
262                 receiver.setEndPointConnection(endPoint);
263                 endPoint.start();
264                 addressConnections.put(address, endPoint);
265             }
266             catch (Exception e)
267             {
268                 LOGGER.log(Level.INFO, "Could not connect to " + address, e);
269                 return false;
270             }
271         }
272         ClientCommandReceiver receiver =
273             (ClientCommandReceiver) endPoint.getReceiver();
274         receiver.addListener(map.name);
275         mapConnections.put(map.name, endPoint);
276         endPoint.send(new RegisterCommand(map.name).getBytes());
277 
278         if (LOGGER.isLoggable(Level.FINE))
279         {
280             LOGGER.fine("Connections " + mapConnections);
281         }
282         map.connected();
283         return true;
284     }
285 
286     /**
287      * Disconnects the passed in {@link MirrorMap} from the mirror map server.
288      * After this method completes, the instance will no longer receive changes
289      * from remote instances and local updates will no longer be sent to the
290      * server for distribution to other instances.
291      * 
292      * @param map
293      *            the instance to disconnect
294      * @return <code>true</code> if the map was disconnected, <code>false</code>
295      *         otherwise
296      */
297     public static synchronized boolean disconnect(
298         MirrorMap<? extends Serializable, ? extends Serializable> map)
299     {
300         TcpConnectionEndPoint endPoint = null;
301         if (mapConnections.containsKey(map.name))
302         {
303             endPoint = mapConnections.remove(map.name);
304             endPoint.send(new UnregisterCommand(map.name).getBytes());
305             ClientCommandReceiver receiver =
306                 (ClientCommandReceiver) endPoint.getReceiver();
307             receiver.removeListener(map.name);
308             // if there are no more maps being reflected by this end point then
309             // destroy it
310             if (receiver.listeners.size() == 0)
311             {
312                 final Iterator<Entry<String, TcpConnectionEndPoint>> iterator =
313                     addressConnections.entrySet().iterator();
314                 while (iterator.hasNext())
315                 {
316                     final Entry<String, TcpConnectionEndPoint> next =
317                         iterator.next();
318                     if (next.getValue().equals(endPoint))
319                     {
320                         iterator.remove();
321                     }
322                 }
323                 endPoint.destroy();
324             }
325             if (LOGGER.isLoggable(Level.FINE))
326             {
327                 LOGGER.fine("Connections " + mapConnections);
328             }
329             map.disconnected();
330             return true;
331         }
332         if (LOGGER.isLoggable(Level.FINE))
333         {
334             LOGGER.fine("No connection for " + map.name);
335         }
336         return false;
337     }
338 
339     /**
340      * Create a {@link TcpServer} with the corresponding server socket details.
341      * The returned server will be started.
342      * 
343      * @param address
344      *            the IP address or host name for the server socket
345      * @param port
346      *            the TCP port for the server socket
347      * @return a {@link TcpServer}
348      */
349     public static TcpServer server(String address, int port)
350     {
351         TcpServer server =
352             new TcpServer(
353                 new ServerCommandReceiver.ServerCommandReceiverFactory(),
354                 address, port);
355         server.start();
356         return server;
357     }
358 
359     /** The name of this instance */
360     private final String name;
361 
362     /**
363      * Tracks the listener registered for connection events
364      */
365     private volatile IMirrorMapConnectionListener connectionListener =
366         NULL_CONNECTION_LISTENER;
367 
368     private MirrorMap(String name, Map<K, V> data)
369     {
370         super(data);
371         this.name = name;
372     }
373 
374     /**
375      * The name of this instance
376      * 
377      * @return the name of this {@link MirrorMap}
378      */
379     public String getName()
380     {
381         return name;
382     }
383 
384     @Override
385     public String toString()
386     {
387         return getClass().getSimpleName() + ":" + this.name + ","
388             + super.toString();
389     }
390 
391     /**
392      * Set the {@link IMirrorMapConnectionListener} on this mirror map
393      * 
394      * @param listener
395      *            the listener to set
396      */
397     public void setConnectionListener(IMirrorMapConnectionListener listener)
398     {
399         this.connectionListener = (listener);
400     }
401 
402     /**
403      * Get the connection listener attached to this mirror map
404      * 
405      * @return the {@link IMirrorMapConnectionListener} attached to this
406      *         instance
407      */
408     public IMirrorMapConnectionListener getConnectionListener()
409     {
410         return this.connectionListener;
411     }
412 
413     private void connected()
414     {
415         try
416         {
417             this.connectionListener.localConnected(this.name);
418         }
419         catch (Exception e)
420         {
421             if (LOGGER.isLoggable(Level.FINE))
422             {
423                 LOGGER.log(Level.FINE, "Could not call connected on "
424                     + Utils.safeToString(this.connectionListener) + " with "
425                     + Utils.safeToString(this), e);
426             }
427         }
428     }
429 
430     private void disconnected()
431     {
432         try
433         {
434             this.connectionListener.localDisconnected(this.name);
435         }
436         catch (Exception e)
437         {
438             if (LOGGER.isLoggable(Level.FINE))
439             {
440                 LOGGER.log(Level.FINE, "Could not call disconnected on "
441                     + Utils.safeToString(this.connectionListener) + " with "
442                     + Utils.safeToString(this), e);
443             }
444         }
445     }
446 }