View Javadoc

1   /*
2      Copyright 2010 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package mirrormap;
17  
18  import java.io.ByteArrayInputStream;
19  import java.io.ObjectInputStream;
20  import java.io.Serializable;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.Map.Entry;
30  import java.util.logging.Level;
31  import java.util.logging.Logger;
32  
33  import mirrormap.commands.AbstractCommand;
34  import mirrormap.commands.ICommand;
35  import mirrormap.commands.PeerConnectedCommand;
36  import mirrormap.commands.PeerDisconnectedCommand;
37  import mirrormap.commands.RegisterCommand;
38  import mirrormap.commands.UnregisterCommand;
39  import mirrormap.nio.IConnectionEndPoint;
40  import mirrormap.nio.IConnectionEndPointReceiver;
41  
42  /**
43   * The end point receiver on the server-side that handles the serialised
44   * {@link ICommand} objects sent from a remote client-side end point.
45   * <p>
46   * Each {@link RegisterCommand} that is processed has a new
47   * {@link ConnectionEndPointMapListener} created and attached to the
48   * corresponding {@link MirrorMap}. Changes in the map will be notified to the
49   * listener that will broadcast the change back to the client via the
50   * {@link IConnectionEndPoint} of this receiver.
51   * 
52   * @author Ramon Servadei
53   */
54  class ServerCommandReceiver implements IConnectionEndPointReceiver
55  {
56      /**
57       * Implementation that returns ServerCommandReceiver instances
58       * 
59       * @author Ramon Servadei
60       */
61      static final class ServerCommandReceiverFactory implements IFactory
62      {
63          public IConnectionEndPointReceiver createEndPointRecevier()
64          {
65              return new ServerCommandReceiver();
66          }
67      }
68  
69      private final static Logger LOGGER =
70          Logger.getLogger(ServerCommandReceiver.class.getName());
71  
72      /**
73       * Tracks all the connection end points to this server instance.
74       */
75      private final static List<IConnectionEndPoint> allEndPoints =
76          new ArrayList<IConnectionEndPoint>();
77  
78      /** The connection end point this receiver is associated with */
79      protected IConnectionEndPoint connectionEndPoint;
80  
81      /**
82       * The listeners per named {@link MirrorMap} that the client-side end point
83       * has requested a registration for. Synchronized.
84       */
85      protected final Map<String, ConnectionEndPointMapListener> listeners =
86          Collections.synchronizedMap(new HashMap<String, ConnectionEndPointMapListener>());
87  
88      ServerCommandReceiver()
89      {
90          super();
91      }
92  
93      public final void receive(byte[] data)
94      {
95          ObjectInputStream ois;
96          try
97          {
98              ois = new ObjectInputStream(new ByteArrayInputStream(data));
99              Object received = ois.readObject();
100             if (received instanceof ICommand)
101             {
102                 doReceive((ICommand) received);
103             }
104             else
105             {
106                 if (LOGGER.isLoggable(Level.INFO))
107                 {
108                     LOGGER.info("Unhandled object " + received);
109                 }
110             }
111         }
112         catch (Exception e)
113         {
114             Utils.logException(LOGGER, "Could not handle received data from "
115                 + Utils.safeToString(this.connectionEndPoint), e);
116         }
117     }
118 
119     /**
120      * Execute the actual logic to handle received {@link ICommand}
121      * 
122      * @param command
123      *            the command received
124      */
125     protected void doReceive(ICommand command)
126     {
127         if (LOGGER.isLoggable(Level.FINEST))
128         {
129             LOGGER.finest(command.toString());
130         }
131         if (command instanceof RegisterCommand)
132         {
133             final String mirrorMapName = command.getMirrorMapName();
134             MirrorMap<Serializable, Serializable> target =
135                 MirrorMap.create(mirrorMapName,
136                     new HashMap<Serializable, Serializable>());
137             final ConnectionEndPointMapListener listener =
138                 new ConnectionEndPointMapListener(this.connectionEndPoint,
139                     mirrorMapName);
140             if (target.addListener(listener))
141             {
142                 this.listeners.put(mirrorMapName, listener);
143             }
144             sendCommandToAll(new PeerConnectedCommand(mirrorMapName));
145         }
146         else
147         {
148             if (command instanceof UnregisterCommand)
149             {
150                 final String mirrorMapName = command.getMirrorMapName();
151                 final ConnectionEndPointMapListener listener =
152                     this.listeners.remove(mirrorMapName);
153                 if (listener != null)
154                 {
155                     MirrorMap<Serializable, Serializable> target =
156                         MirrorMap.get(mirrorMapName);
157                     if (target != null)
158                     {
159                         target.removeListener(listener);
160                     }
161                 }
162                 sendCommandToAll(new PeerDisconnectedCommand(mirrorMapName));
163             }
164             else
165             {
166                 executeCommand(command);
167             }
168         }
169     }
170 
171     /**
172      * Helper method to send the command to all end points known to this VM's
173      * server
174      * 
175      * @param command
176      *            the command to send
177      */
178     private void sendCommandToAll(AbstractCommand command)
179     {
180         synchronized (allEndPoints)
181         {
182             for (Iterator<IConnectionEndPoint> iterator =
183                 allEndPoints.iterator(); iterator.hasNext();)
184             {
185                 IConnectionEndPoint endPoint = iterator.next();
186                 if (!endPoint.isActive())
187                 {
188                     iterator.remove();
189                 }
190                 else
191                 {
192                     endPoint.send(command.getBytes());
193                 }
194             }
195         }
196     }
197 
198     /**
199      * Executes the commend and also informs the
200      * {@link ConnectionEndPointMapListener} to ignore sending out this change
201      * so we don't end up with infinite looping notifications.
202      * 
203      * @param command
204      *            the command
205      */
206     protected void executeCommand(ICommand command)
207     {
208         final String mirrorMapName = command.getMirrorMapName();
209         final ConnectionEndPointMapListener listener =
210             this.listeners.get(mirrorMapName);
211         if (listener != null)
212         {
213             command.execute();
214         }
215     }
216 
217     public void setEndPointConnection(IConnectionEndPoint connectionEndPoint)
218     {
219         this.connectionEndPoint = connectionEndPoint;
220         synchronized (allEndPoints)
221         {
222             allEndPoints.add(this.connectionEndPoint);
223         }
224     }
225 
226     public void destroy()
227     {
228         synchronized (this.listeners)
229         {
230             // iterate over a copy
231             final Set<Entry<String, ConnectionEndPointMapListener>> entrySet =
232                 new HashSet<Entry<String, ConnectionEndPointMapListener>>(
233                     this.listeners.entrySet());
234             for (Entry<String, ConnectionEndPointMapListener> entry : entrySet)
235             {
236                 destroyListener(entry);
237             }
238         }
239         synchronized (allEndPoints)
240         {
241             allEndPoints.remove(this.connectionEndPoint);
242         }
243         this.connectionEndPoint = null;
244     }
245 
246     /**
247      * Destroy the {@link ConnectionEndPointMapListener} in the entry
248      */
249     protected void destroyListener(
250         Entry<String, ConnectionEndPointMapListener> entry)
251     {
252         MirrorMap.get(entry.getKey()).removeListener(entry.getValue());
253     }
254 }