1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirrormap;
17
18 import java.io.Serializable;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.Map;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
24
25 import mirrormap.collections.NotifyingMap;
26 import mirrormap.commands.RegisterCommand;
27 import mirrormap.commands.UnregisterCommand;
28 import mirrormap.nio.SelectorTasks;
29 import mirrormap.nio.TcpConnectionEndPoint;
30 import mirrormap.nio.TcpServer;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class MirrorMap<K extends Serializable, V extends Serializable> extends
74 NotifyingMap<K, V>
75 {
76 private final static Logger LOGGER =
77 Logger.getLogger(MirrorMap.class.getName());
78
79
80 private static final IMirrorMapConnectionListener NULL_CONNECTION_LISTENER =
81 new IMirrorMapConnectionListener()
82 {
83
84 public void localConnected(String mirrorMapName)
85 {
86 }
87
88 public void localDisconnected(String mirrorMapName)
89 {
90 }
91
92 public void peerConnected(String mirrorMapName)
93 {
94 }
95
96 public void peerDisconnected(String mirrorMapName)
97 {
98 }
99 };
100
101
102 @SuppressWarnings("unchecked")
103 private static final Map<String, MirrorMap> mirrorMaps =
104 new HashMap<String, MirrorMap>();
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 @SuppressWarnings("unchecked")
121 public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> create(
122 String name, Map<Key, Value> data)
123 {
124 MirrorMap mirrorMap = mirrorMaps.get(name);
125 if (mirrorMap == null)
126 {
127 mirrorMap = new MirrorMap(name, data);
128 mirrorMaps.put(name, mirrorMap);
129 }
130 return mirrorMap;
131 }
132
133
134
135
136
137
138
139
140
141
142
143 @SuppressWarnings("unchecked")
144 public static synchronized <Key extends Serializable, Value extends Serializable> MirrorMap<Key, Value> get(
145 String name)
146 {
147 return mirrorMaps.get(name);
148 }
149
150
151
152
153
154 final static Map<String, TcpConnectionEndPoint> mapConnections =
155 new HashMap<String, TcpConnectionEndPoint>();
156
157
158
159
160
161
162 final static Map<String, TcpConnectionEndPoint> addressConnections =
163 new HashMap<String, TcpConnectionEndPoint>();
164
165
166
167
168
169 static SelectorTasks selectorTasks;
170
171
172 private static Thread clientThread;
173
174
175
176
177 static String getClientThreadName()
178 {
179 return "MirrorMap I/O thread";
180 }
181
182
183
184
185 private synchronized static void initClientComponents()
186 {
187 if (selectorTasks != null)
188 {
189 return;
190 }
191 selectorTasks = new SelectorTasks();
192 try
193 {
194 selectorTasks.start();
195 }
196 catch (Exception e)
197 {
198 LOGGER.log(Level.SEVERE, "Could not initialise selector tasks", e);
199 System.exit(1);
200 }
201 clientThread =
202 new Thread(selectorTasks.getRunnable(), getClientThreadName());
203 try
204 {
205 clientThread.setDaemon(true);
206 clientThread.start();
207 }
208 catch (Exception e)
209 {
210 LOGGER.log(Level.SEVERE, "Could not start client I/O thread", e);
211 System.exit(1);
212 }
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 public static synchronized boolean connect(
231 MirrorMap<? extends Serializable, ? extends Serializable> map,
232 String host, int port)
233 {
234 initClientComponents();
235 TcpConnectionEndPoint endPoint = null;
236 if (mapConnections.containsKey(map.name))
237 {
238 if (LOGGER.isLoggable(Level.FINE))
239 {
240 LOGGER.fine("Already connected: " + map.name);
241 }
242 return false;
243 }
244
245
246 final String address = host + ":" + port;
247 if (addressConnections.containsKey(address))
248 {
249
250 endPoint = addressConnections.get(address);
251 }
252 if (endPoint == null)
253 {
254
255 try
256 {
257 final ClientCommandReceiver receiver =
258 new ClientCommandReceiver();
259 endPoint =
260 new TcpConnectionEndPoint(receiver, selectorTasks, null,
261 host, port);
262 receiver.setEndPointConnection(endPoint);
263 endPoint.start();
264 addressConnections.put(address, endPoint);
265 }
266 catch (Exception e)
267 {
268 LOGGER.log(Level.INFO, "Could not connect to " + address, e);
269 return false;
270 }
271 }
272 ClientCommandReceiver receiver =
273 (ClientCommandReceiver) endPoint.getReceiver();
274 receiver.addListener(map.name);
275 mapConnections.put(map.name, endPoint);
276 endPoint.send(new RegisterCommand(map.name).getBytes());
277
278 if (LOGGER.isLoggable(Level.FINE))
279 {
280 LOGGER.fine("Connections " + mapConnections);
281 }
282 map.connected();
283 return true;
284 }
285
286
287
288
289
290
291
292
293
294
295
296
297 public static synchronized boolean disconnect(
298 MirrorMap<? extends Serializable, ? extends Serializable> map)
299 {
300 TcpConnectionEndPoint endPoint = null;
301 if (mapConnections.containsKey(map.name))
302 {
303 endPoint = mapConnections.remove(map.name);
304 endPoint.send(new UnregisterCommand(map.name).getBytes());
305 ClientCommandReceiver receiver =
306 (ClientCommandReceiver) endPoint.getReceiver();
307 receiver.removeListener(map.name);
308
309
310 if (receiver.listeners.size() == 0)
311 {
312 final Iterator<Entry<String, TcpConnectionEndPoint>> iterator =
313 addressConnections.entrySet().iterator();
314 while (iterator.hasNext())
315 {
316 final Entry<String, TcpConnectionEndPoint> next =
317 iterator.next();
318 if (next.getValue().equals(endPoint))
319 {
320 iterator.remove();
321 }
322 }
323 endPoint.destroy();
324 }
325 if (LOGGER.isLoggable(Level.FINE))
326 {
327 LOGGER.fine("Connections " + mapConnections);
328 }
329 map.disconnected();
330 return true;
331 }
332 if (LOGGER.isLoggable(Level.FINE))
333 {
334 LOGGER.fine("No connection for " + map.name);
335 }
336 return false;
337 }
338
339
340
341
342
343
344
345
346
347
348
349 public static TcpServer server(String address, int port)
350 {
351 TcpServer server =
352 new TcpServer(
353 new ServerCommandReceiver.ServerCommandReceiverFactory(),
354 address, port);
355 server.start();
356 return server;
357 }
358
359
360 private final String name;
361
362
363
364
365 private volatile IMirrorMapConnectionListener connectionListener =
366 NULL_CONNECTION_LISTENER;
367
368 private MirrorMap(String name, Map<K, V> data)
369 {
370 super(data);
371 this.name = name;
372 }
373
374
375
376
377
378
379 public String getName()
380 {
381 return name;
382 }
383
384 @Override
385 public String toString()
386 {
387 return getClass().getSimpleName() + ":" + this.name + ","
388 + super.toString();
389 }
390
391
392
393
394
395
396
397 public void setConnectionListener(IMirrorMapConnectionListener listener)
398 {
399 this.connectionListener = (listener);
400 }
401
402
403
404
405
406
407
408 public IMirrorMapConnectionListener getConnectionListener()
409 {
410 return this.connectionListener;
411 }
412
413 private void connected()
414 {
415 try
416 {
417 this.connectionListener.localConnected(this.name);
418 }
419 catch (Exception e)
420 {
421 if (LOGGER.isLoggable(Level.FINE))
422 {
423 LOGGER.log(Level.FINE, "Could not call connected on "
424 + Utils.safeToString(this.connectionListener) + " with "
425 + Utils.safeToString(this), e);
426 }
427 }
428 }
429
430 private void disconnected()
431 {
432 try
433 {
434 this.connectionListener.localDisconnected(this.name);
435 }
436 catch (Exception e)
437 {
438 if (LOGGER.isLoggable(Level.FINE))
439 {
440 LOGGER.log(Level.FINE, "Could not call disconnected on "
441 + Utils.safeToString(this.connectionListener) + " with "
442 + Utils.safeToString(this), e);
443 }
444 }
445 }
446 }