View Javadoc

1   package org.sourceforge.jemm.comm.server;
2   
3   import java.util.HashMap;
4   import java.util.HashSet;
5   import java.util.Map;
6   import java.util.Set;
7   import java.util.concurrent.CountDownLatch;
8   import java.util.concurrent.ExecutorService;
9   import java.util.concurrent.Executors;
10  
11  import org.apache.log4j.Logger;
12  import org.sourceforge.jemm.comm.connection.Connection;
13  import org.sourceforge.jemm.comm.connection.ConnectionException;
14  import org.sourceforge.jemm.comm.connection.ServerConnectionFactory;
15  import org.sourceforge.jemm.comm.shared.IFUtilities;
16  import org.sourceforge.jemm.comm.shared.RPCHandler;
17  
18  /**
19   * The RPCServer offers RPC interfaces to connecting clients.  The server can handle multiple
20   * client connections.
21   *
22   * See {@link org.sourceforge.jemm.comm.example.echo.EchoClient}/{@link 
23   * org.sourceforge.jemm.comm.example.echo.EchoServer} for an example of usage.
24   * 
25   * <i><P>Released under the <a href="http://www.apache.org/licenses/LICENSE-2.0.html">Apache License V2.0 license</a> 
26   * by the <a href="http://jemm.sourceforge.net">JEMM Project</a></i>
27   *
28   * @author Rory Graves
29   */
30  public class RPCServer extends Thread {
31      protected static final Logger LOG = Logger.getLogger(RPCServer.class);
32  
33      /** The map of clientIds to active clients */
34      protected Map<RPCClientId, ServerThread> clientThreads = new HashMap<RPCClientId, ServerThread>();
35  
36      /** A latch to ensure server is ready when initialisation call returns. */
37      protected CountDownLatch initialisationLatch = new CountDownLatch(1);
38  
39      /** Used for graceful shutdown. */
40      protected boolean closing = false;
41  
42      /** The thread pool for processing client call requests (shared among all clients) */
43      protected ExecutorService msgProccessingThreadPool;
44  
45      protected final ServerConnectionFactory connectionFactory;
46  
47      /** A map of the interfaces implemented by the server and their corresponding interface implementations. */
48      HashMap<Class<?>, Object> serverIFMap = new HashMap<Class<?>, Object>();
49  
50      ClientConnectionListener clientConnListener;
51  
52      /**
53       * Creates an RPCServer instance
54       *
55       * @param connectionFactory The connection factory for accepting client connections.
56       * @param executorService The executor used to service requests.
57       */
58      public RPCServer(ServerConnectionFactory connectionFactory,ExecutorService executorService) {
59      	this.connectionFactory = connectionFactory;
60          msgProccessingThreadPool = executorService;
61      }
62  
63      /**
64       * Creates an RPCServer instance
65       *
66       * @param connectionFactory The connection factory for accepting client connections.
67       * @param noProcessingThreads The number of processing threads.
68       */
69      public RPCServer(ServerConnectionFactory connectionFactory,int noProcessingThreads) {
70      	this(connectionFactory,Executors.newFixedThreadPool(noProcessingThreads));
71      }
72  
73      /**
74       * Notification that a given client has disconnected. The client is removed from the active
75       * pool.
76       *
77       * @param clientId The assigned id of the disconnected client.
78       */
79      public void clientDisconnected(final RPCClientId clientId) {
80          LOG.info("client " + clientId + " disconnected");
81  
82          final ClientConnectionListener listener = this.clientConnListener;
83          if(listener != null)
84              listener.clientDisconnected(clientId);
85  
86          synchronized (clientThreads) {
87              clientThreads.remove(clientId);
88          }
89      }
90  
91      /**
92       * Used by called methods to find out ID of calling client.
93       * This uses a ThreadLocal which is set before the server method is invoked.
94       * @return The id of the caller.
95       */
96      public RPCClientId getClientId() {
97          return (RPCClientId) RPCHandler.getConnectionId();
98      }
99  
100     /**
101      * Retrieve a proxy for the given interface on the given client.
102      * @param clientId The id of the client.
103      * @param ifClass The required interface class.
104      * @return An object proxying the interface to the given client.
105      */
106     public Object getClientIF(RPCClientId clientId,
107             Class<?> ifClass) {
108         ServerThread st = clientThreads.get(clientId);
109         return st.getClientIF(ifClass);
110     }
111 
112     /**
113      * @param clientId The id of the new client.
114      * @param clientHostname The clients hostname.
115      */
116     public void notifyNewClient(RPCClientId clientId,String clientAddress) {
117         ClientConnectionListener listener = this.clientConnListener;
118         if(listener != null)
119             listener.clientConnected(clientId,clientAddress);
120         else
121             LOG.debug("No listener set for new client notifications");
122     }
123 
124     /**
125      * Register an interface for RPC calls by clients.
126      * @param targetIF The interface to offer to clients.
127      * @param targetIFImpl The server side implementation of the given interface.
128      */
129     public void registerInterface(Class<?> targetIF,
130             Object targetIFImpl) {
131         IFUtilities.validateInterface(targetIF);
132         
133         if(!targetIF.isInstance(targetIFImpl))
134             throw new IllegalArgumentException("Target impl not instance of interface");
135 
136         serverIFMap.put(targetIF, targetIFImpl);
137     }
138 
139     @Override
140     public void run() {
141         try {
142             LOG.info("Server started");
143             connectionFactory.initialise();
144             LOG.info("Listening for client connections");
145             initialisationLatch.countDown();
146             
147             while (true) {
148                 Connection connection = connectionFactory.getClientConnection();
149                 RPCClientId clientId = new RPCClientId();
150                 LOG.info("connection from " + connection.getConnectionName() + " assigned id=" + clientId);
151                 ServerThread st = new ServerThread(this, connection, clientId,serverIFMap,msgProccessingThreadPool);
152                 synchronized (clientThreads) {
153                     clientThreads.put(clientId, st);
154                 }
155                 
156                 // starts reading messages after client thread has been added to list to avoid race
157                 // condition.
158                 st.start();
159             }
160         } catch (ConnectionException e) {
161             if (!closing)
162                 LOG.error("Unexpected exception whilst accepting connections",e);
163         }
164 
165         LOG.info("Server terminated");
166     }
167 
168     /**
169      * Sets the client listener to the given listener.
170      * @param listener The listener to receive disconnection events.
171      */
172     public void setClientListener(ClientConnectionListener listener) {
173         this.clientConnListener = listener;
174     }
175 
176     /**
177      * Request by the system for the server to be shutdown.
178      */
179     public void shutdown() {
180         closing = true;
181         try {
182             LOG.info("Shutting down");
183 
184             // close the server socket to stop accepting new connections
185             connectionFactory.close();
186             Set<ServerThread> toShutdown = new HashSet<ServerThread>();
187             toShutdown.addAll(clientThreads.values());
188 
189             // notify any remaining clients to shutdown
190             LOG.info("Notifying client threads");
191             for (ServerThread clientThread : toShutdown)
192                 clientThread.shutdown();
193 
194             for (ServerThread clientThread : toShutdown)
195                 clientThread.waitForShutdown();
196             
197             LOG.info("Client threads complete");
198             msgProccessingThreadPool.shutdown();
199 
200         } catch (ConnectionException ce) {
201             LOG.warn("Exception thrown whilst terminating server", ce);
202         }
203         LOG.info("Shutdown complete");
204 
205     }
206 
207     /**
208      * This method will pause the caller until the server is ready to start
209      * accepting connections.  If the server is already running it will return
210      * immediately.  Mostly used for testing when a client and server are
211      * run in the same VM to avoid the client connecting before the server is
212      * ready.
213      */
214     public void waitForInitialisation() {
215         try {
216             initialisationLatch.await();
217         } catch (InterruptedException e) {
218             e.printStackTrace();
219         }
220     }
221 }