1 /*
2 Copyright 2010 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package mirrormap;
17
18 import java.io.Serializable;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.Map;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
24
25 import mirrormap.collections.NotifyingMap;
26 import mirrormap.commands.RegisterCommand;
27 import mirrormap.commands.UnregisterCommand;
28 import mirrormap.nio.SelectorTasks;
29 import mirrormap.nio.TcpConnectionEndPoint;
30 import mirrormap.nio.TcpServer;
31
32 /**
33 * MirrorMap is a replicating Java map implementation. Its key features are:
34 * <ul>
35 * <li>Replicates changes to remote instances
36 * <li>Can detect when the map contents change
37 * <li>Can detect when a local map is connected and disconnected from remote
38 * instances
39 * <li>Can detect when peers of a map are connected and disconnected
40 * </ul>
41 * A MirrorMap wraps any standard Java map implementation and decorates it with
42 * the ability to replicate changes with other remote MirrorMap instances. A
43 * MirrorMap is given a name, replication occurs with remote instances of the
44 * same name. Both the map key and value need to implement the Serializable
45 * interface.
46 * <p>
47 * MirrorMap is built on top of a {@link NotifyingMap}; this is a Map
48 * implementation that has the ability to notify observer objects when map
49 * entries are added, updated or removed. The operations to the map are further
50 * categorised into a 'before' and 'after' events e.g. when adding an entry to a
51 * NotifyingMap, observers receive a 'will add' notification, the entry is
52 * added, then the observers receive a 'did add' notification. A similar for
53 * pattern follows for update and remove operations.
54 * <p>
55 * Logically, replication is peer-to-peer. Physically, all peer communication is
56 * done using a server component. The server holds the master copy of the named
57 * MirrorMap. New peers register with the server for replication and will
58 * receive the full map image upon registration. The connection with the server
59 * is point-to-point using TCP; the server performs the fan out of changes to
60 * all peers.
61 * <p>
62 * MirrorMap is written using the Java NIO library. A mirror map has a single
63 * I/O thread dealing with network communication. The mirror map server (
64 * {@link #server(String, int)}) creates 2 threads; one to handle reading and
65 * one to handle writing.
66 *
67 * @param <K>
68 * The key type for the map
69 * @param <V>
70 * The value type for the map
71 * @author Ramon Servadei
72 */
73 public class MirrorMap<K extends Serializable, V extends Serializable> extends
74 NotifyingMap<K, V>
75 {
76 private final static Logger LOGGER =
77 Logger.getLogger(MirrorMap.class.getName());
78
79 /** A no-op implementation */
80 private static final IMirrorMapConnectionListener NULL_CONNECTION_LISTENER =
81 new IMirrorMapConnectionListener()
82 {
83
84 public void localConnected(String mirrorMapName)
85 {
86 }
87
88 public void localDisconnected(String mirrorMapName)
89 {
90 }
91
92 public void peerConnected(String mirrorMapName)
93 {
94 }
95
96 public void peerDisconnected(String mirrorMapName)
97 {
98 }
99 };
100
101 /** The cache of all {@link MirrorMap} instances */
102 @SuppressWarnings("unchecked")
103 private static final Map<String, MirrorMap> mirrorMaps =
104 new HashMap<String, MirrorMap>();
105
106 /**
107 * Create a {@link MirrorMap} identified by a unique name that wraps the
108 * passed in {@link Map}. If the mirror map already exists, the current
109 * instance is returned and nothing is done with the {@link Map} argument.
110 *
111 * @param <Key>
112 * @param <Value>
113 * @param name
114 * the name identifying the {@link MirrorMap}
115 * @param data
116 * the underlying {@link Map} for the {@link MirrorMap}
117 * @return a {@link MirrorMap} or <code>null</code> if there is none for
118 * this name
119 */
120 @SuppressWarnings("unchecked")
121 public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> create(
122 String name, Map<Key, Value> data)
123 {
124 MirrorMap mirrorMap = mirrorMaps.get(name);
125 if (mirrorMap == null)
126 {
127 mirrorMap = new MirrorMap(name, data);
128 mirrorMaps.put(name, mirrorMap);
129 }
130 return mirrorMap;
131 }
132
133 /**
134 * Get the {@link MirrorMap} identified by the name.
135 *
136 * @param <Key>
137 * @param <Value>
138 * @param name
139 * the name identifying the {@link MirrorMap}
140 * @return a {@link MirrorMap} or <code>null</code> if there is none for
141 * this name
142 */
143 @SuppressWarnings("unchecked")
144 public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> get(
145 String name)
146 {
147 return mirrorMaps.get(name);
148 }
149
150 /**
151 * Holds the {@link TcpConnectionEndPoint} instances per named
152 * {@link MirrorMap}. This is used for the client outbound connections.
153 */
154 final static Map<String, TcpConnectionEndPoint> mapConnections =
155 new HashMap<String, TcpConnectionEndPoint>();
156
157 /**
158 * Holds the {@link TcpConnectionEndPoint} instances keyed against the
159 * string host and port of the server socket it is attached to. This is used
160 * for the client outbound connections.
161 */
162 final static Map<String, TcpConnectionEndPoint> addressConnections =
163 new HashMap<String, TcpConnectionEndPoint>();
164
165 /**
166 * Handles the I/O for client connections in conjunction with
167 * {@link #clientThread}
168 */
169 static SelectorTasks selectorTasks;
170
171 /** The client thread handling I/O from the server */
172 private static Thread clientThread;
173
174 /**
175 * @return the client thread name
176 */
177 static String getClientThreadName()
178 {
179 return "MirrorMap I/O thread";
180 }
181
182 /**
183 * Initialise client-side components
184 */
185 private synchronized static void initClientComponents()
186 {
187 if (selectorTasks != null)
188 {
189 return;
190 }
191 selectorTasks = new SelectorTasks();
192 try
193 {
194 selectorTasks.start();
195 }
196 catch (Exception e)
197 {
198 LOGGER.log(Level.SEVERE, "Could not initialise selector tasks", e);
199 System.exit(1);
200 }
201 clientThread =
202 new Thread(selectorTasks.getRunnable(), getClientThreadName());
203 try
204 {
205 clientThread.setDaemon(true);
206 clientThread.start();
207 }
208 catch (Exception e)
209 {
210 LOGGER.log(Level.SEVERE, "Could not start client I/O thread", e);
211 System.exit(1);
212 }
213 }
214
215 /**
216 * Connects the passed in {@link MirrorMap} to the mirror map server
217 * identified by the host and port. After this method completes, the
218 * instance will receive changes from remote instances and local updates
219 * will be sent to the server for distribution to other instances.
220 *
221 * @param map
222 * the map to connect
223 * @param host
224 * the host or IP of the mirror map server to connect to
225 * @param port
226 * the port of the mirror map server to connect to
227 * @return <code>true</code> if the map was connected, <code>false</code>
228 * otherwise
229 */
230 public static synchronized boolean connect(
231 MirrorMap<? extends Serializable, ? extends Serializable> map,
232 String host, int port)
233 {
234 initClientComponents();
235 TcpConnectionEndPoint endPoint = null;
236 if (mapConnections.containsKey(map.name))
237 {
238 if (LOGGER.isLoggable(Level.FINE))
239 {
240 LOGGER.fine("Already connected: " + map.name);
241 }
242 return false;
243 }
244
245 // see what connections we have
246 final String address = host + ":" + port;
247 if (addressConnections.containsKey(address))
248 {
249 // found one, lets use it....
250 endPoint = addressConnections.get(address);
251 }
252 if (endPoint == null)
253 {
254 // we need to create a new connection
255 try
256 {
257 final ClientCommandReceiver receiver =
258 new ClientCommandReceiver();
259 endPoint =
260 new TcpConnectionEndPoint(receiver, selectorTasks, null,
261 host, port);
262 receiver.setEndPointConnection(endPoint);
263 endPoint.start();
264 addressConnections.put(address, endPoint);
265 }
266 catch (Exception e)
267 {
268 LOGGER.log(Level.INFO, "Could not connect to " + address, e);
269 return false;
270 }
271 }
272 ClientCommandReceiver receiver =
273 (ClientCommandReceiver) endPoint.getReceiver();
274 receiver.addListener(map.name);
275 mapConnections.put(map.name, endPoint);
276 endPoint.send(new RegisterCommand(map.name).getBytes());
277
278 if (LOGGER.isLoggable(Level.FINE))
279 {
280 LOGGER.fine("Connections " + mapConnections);
281 }
282 map.connected();
283 return true;
284 }
285
286 /**
287 * Disconnects the passed in {@link MirrorMap} from the mirror map server.
288 * After this method completes, the instance will no longer receive changes
289 * from remote instances and local updates will no longer be sent to the
290 * server for distribution to other instances.
291 *
292 * @param map
293 * the instance to disconnect
294 * @return <code>true</code> if the map was disconnected, <code>false</code>
295 * otherwise
296 */
297 public static synchronized boolean disconnect(
298 MirrorMap<? extends Serializable, ? extends Serializable> map)
299 {
300 TcpConnectionEndPoint endPoint = null;
301 if (mapConnections.containsKey(map.name))
302 {
303 endPoint = mapConnections.remove(map.name);
304 endPoint.send(new UnregisterCommand(map.name).getBytes());
305 ClientCommandReceiver receiver =
306 (ClientCommandReceiver) endPoint.getReceiver();
307 receiver.removeListener(map.name);
308 // if there are no more maps being reflected by this end point then
309 // destroy it
310 if (receiver.listeners.size() == 0)
311 {
312 final Iterator<Entry<String, TcpConnectionEndPoint>> iterator =
313 addressConnections.entrySet().iterator();
314 while (iterator.hasNext())
315 {
316 final Entry<String, TcpConnectionEndPoint> next =
317 iterator.next();
318 if (next.getValue().equals(endPoint))
319 {
320 iterator.remove();
321 }
322 }
323 endPoint.destroy();
324 }
325 if (LOGGER.isLoggable(Level.FINE))
326 {
327 LOGGER.fine("Connections " + mapConnections);
328 }
329 map.disconnected();
330 return true;
331 }
332 if (LOGGER.isLoggable(Level.FINE))
333 {
334 LOGGER.fine("No connection for " + map.name);
335 }
336 return false;
337 }
338
339 /**
340 * Create a {@link TcpServer} with the corresponding server socket details.
341 * The returned server will be started.
342 *
343 * @param address
344 * the IP address or host name for the server socket
345 * @param port
346 * the TCP port for the server socket
347 * @return a {@link TcpServer}
348 */
349 public static TcpServer server(String address, int port)
350 {
351 TcpServer server =
352 new TcpServer(
353 new ServerCommandReceiver.ServerCommandReceiverFactory(),
354 address, port);
355 server.start();
356 return server;
357 }
358
359 /** The name of this instance */
360 private final String name;
361
362 /**
363 * Tracks the listener registered for connection events
364 */
365 private volatile IMirrorMapConnectionListener connectionListener =
366 NULL_CONNECTION_LISTENER;
367
368 private MirrorMap(String name, Map<K, V> data)
369 {
370 super(data);
371 this.name = name;
372 }
373
374 /**
375 * The name of this instance
376 *
377 * @return the name of this {@link MirrorMap}
378 */
379 public String getName()
380 {
381 return name;
382 }
383
384 @Override
385 public String toString()
386 {
387 return getClass().getSimpleName() + ":" + this.name + ","
388 + super.toString();
389 }
390
391 /**
392 * Set the {@link IMirrorMapConnectionListener} on this mirror map
393 *
394 * @param listener
395 * the listener to set
396 */
397 public void setConnectionListener(IMirrorMapConnectionListener listener)
398 {
399 this.connectionListener = (listener);
400 }
401
402 /**
403 * Get the connection listener attached to this mirror map
404 *
405 * @return the {@link IMirrorMapConnectionListener} attached to this
406 * instance
407 */
408 public IMirrorMapConnectionListener getConnectionListener()
409 {
410 return this.connectionListener;
411 }
412
413 private void connected()
414 {
415 try
416 {
417 this.connectionListener.localConnected(this.name);
418 }
419 catch (Exception e)
420 {
421 if (LOGGER.isLoggable(Level.FINE))
422 {
423 LOGGER.log(Level.FINE, "Could not call connected on "
424 + Utils.safeToString(this.connectionListener) + " with "
425 + Utils.safeToString(this), e);
426 }
427 }
428 }
429
430 private void disconnected()
431 {
432 try
433 {
434 this.connectionListener.localDisconnected(this.name);
435 }
436 catch (Exception e)
437 {
438 if (LOGGER.isLoggable(Level.FINE))
439 {
440 LOGGER.log(Level.FINE, "Could not call disconnected on "
441 + Utils.safeToString(this.connectionListener) + " with "
442 + Utils.safeToString(this), e);
443 }
444 }
445 }
446 }