1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirrormap;
17
18 import java.io.ByteArrayInputStream;
19 import java.io.ObjectInputStream;
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.Map.Entry;
30 import java.util.logging.Level;
31 import java.util.logging.Logger;
32
33 import mirrormap.commands.AbstractCommand;
34 import mirrormap.commands.ICommand;
35 import mirrormap.commands.PeerConnectedCommand;
36 import mirrormap.commands.PeerDisconnectedCommand;
37 import mirrormap.commands.RegisterCommand;
38 import mirrormap.commands.UnregisterCommand;
39 import mirrormap.nio.IConnectionEndPoint;
40 import mirrormap.nio.IConnectionEndPointReceiver;
41
42
43
44
45
46
47
48
49
50
51
52
53
54 class ServerCommandReceiver implements IConnectionEndPointReceiver
55 {
56
57
58
59
60
61 static final class ServerCommandReceiverFactory implements IFactory
62 {
63 public IConnectionEndPointReceiver createEndPointRecevier()
64 {
65 return new ServerCommandReceiver();
66 }
67 }
68
69 private final static Logger LOGGER =
70 Logger.getLogger(ServerCommandReceiver.class.getName());
71
72
73
74
75 private final static List<IConnectionEndPoint> allEndPoints =
76 new ArrayList<IConnectionEndPoint>();
77
78
79 protected IConnectionEndPoint connectionEndPoint;
80
81
82
83
84
85 protected final Map<String, ConnectionEndPointMapListener> listeners =
86 Collections.synchronizedMap(new HashMap<String, ConnectionEndPointMapListener>());
87
88 ServerCommandReceiver()
89 {
90 super();
91 }
92
93 public final void receive(byte[] data)
94 {
95 ObjectInputStream ois;
96 try
97 {
98 ois = new ObjectInputStream(new ByteArrayInputStream(data));
99 Object received = ois.readObject();
100 if (received instanceof ICommand)
101 {
102 doReceive((ICommand) received);
103 }
104 else
105 {
106 if (LOGGER.isLoggable(Level.INFO))
107 {
108 LOGGER.info("Unhandled object " + received);
109 }
110 }
111 }
112 catch (Exception e)
113 {
114 Utils.logException(LOGGER, "Could not handle received data from "
115 + Utils.safeToString(this.connectionEndPoint), e);
116 }
117 }
118
119
120
121
122
123
124
125 protected void doReceive(ICommand command)
126 {
127 if (LOGGER.isLoggable(Level.FINEST))
128 {
129 LOGGER.finest(command.toString());
130 }
131 if (command instanceof RegisterCommand)
132 {
133 final String mirrorMapName = command.getMirrorMapName();
134 MirrorMap<Serializable, Serializable> target =
135 MirrorMap.create(mirrorMapName,
136 new HashMap<Serializable, Serializable>());
137 final ConnectionEndPointMapListener listener =
138 new ConnectionEndPointMapListener(this.connectionEndPoint,
139 mirrorMapName);
140 if (target.addListener(listener))
141 {
142 this.listeners.put(mirrorMapName, listener);
143 }
144 sendCommandToAll(new PeerConnectedCommand(mirrorMapName));
145 }
146 else
147 {
148 if (command instanceof UnregisterCommand)
149 {
150 final String mirrorMapName = command.getMirrorMapName();
151 final ConnectionEndPointMapListener listener =
152 this.listeners.remove(mirrorMapName);
153 if (listener != null)
154 {
155 MirrorMap<Serializable, Serializable> target =
156 MirrorMap.get(mirrorMapName);
157 if (target != null)
158 {
159 target.removeListener(listener);
160 }
161 }
162 sendCommandToAll(new PeerDisconnectedCommand(mirrorMapName));
163 }
164 else
165 {
166 executeCommand(command);
167 }
168 }
169 }
170
171
172
173
174
175
176
177
178 private void sendCommandToAll(AbstractCommand command)
179 {
180 synchronized (allEndPoints)
181 {
182 for (Iterator<IConnectionEndPoint> iterator =
183 allEndPoints.iterator(); iterator.hasNext();)
184 {
185 IConnectionEndPoint endPoint = iterator.next();
186 if (!endPoint.isActive())
187 {
188 iterator.remove();
189 }
190 else
191 {
192 endPoint.send(command.getBytes());
193 }
194 }
195 }
196 }
197
198
199
200
201
202
203
204
205
206 protected void executeCommand(ICommand command)
207 {
208 final String mirrorMapName = command.getMirrorMapName();
209 final ConnectionEndPointMapListener listener =
210 this.listeners.get(mirrorMapName);
211 if (listener != null)
212 {
213 command.execute();
214 }
215 }
216
217 public void setEndPointConnection(IConnectionEndPoint connectionEndPoint)
218 {
219 this.connectionEndPoint = connectionEndPoint;
220 synchronized (allEndPoints)
221 {
222 allEndPoints.add(this.connectionEndPoint);
223 }
224 }
225
226 public void destroy()
227 {
228 synchronized (this.listeners)
229 {
230
231 final Set<Entry<String, ConnectionEndPointMapListener>> entrySet =
232 new HashSet<Entry<String, ConnectionEndPointMapListener>>(
233 this.listeners.entrySet());
234 for (Entry<String, ConnectionEndPointMapListener> entry : entrySet)
235 {
236 destroyListener(entry);
237 }
238 }
239 synchronized (allEndPoints)
240 {
241 allEndPoints.remove(this.connectionEndPoint);
242 }
243 this.connectionEndPoint = null;
244 }
245
246
247
248
249 protected void destroyListener(
250 Entry<String, ConnectionEndPointMapListener> entry)
251 {
252 MirrorMap.get(entry.getKey()).removeListener(entry.getValue());
253 }
254 }