View Javadoc

1   /**
2    *
3    */
4   package org.sourceforge.jemm.comm.shared;
5   
6   import java.io.BufferedInputStream;
7   import java.io.BufferedOutputStream;
8   import java.io.IOException;
9   import java.io.NotSerializableException;
10  import java.io.ObjectInputStream;
11  import java.io.ObjectOutputStream;
12  import java.lang.reflect.InvocationTargetException;
13  import java.lang.reflect.Method;
14  import java.lang.reflect.Proxy;
15  import java.util.ArrayList;
16  import java.util.HashMap;
17  import java.util.Map;
18  import java.util.concurrent.ConcurrentHashMap;
19  import java.util.concurrent.CountDownLatch;
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.SynchronousQueue;
22  
23  import org.apache.log4j.Logger;
24  import org.sourceforge.jemm.comm.connection.Connection;
25  import org.sourceforge.jemm.comm.connection.ConnectionException;
26  
27  /**
28   * RPCHandler is used to implement the shared logic used by both {@link org.sourceforge.jemm.comm.server.RPCClient} 
29   * and {@link org.sourceforge.jemm.comm.server.RPCServer}.
30   * Given a socket RPCHandler basically manages the client and server interfaces, the proxies and the
31   * actual message passing.
32   *
33   *  N.B. A client/server flag is passed to the constructor because {@link ObjectInputStream}/{@link ObjectOutputStream}
34   *  must be invoked in alternate order on the two sides of the connection or it deadlocks.
35   * @author Rory Graves
36   *
37   */
38  public class RPCHandler implements Runnable {
39      private static final Logger LOG = Logger.getLogger(RPCHandler.class);
40      
41      private static ThreadLocal<Object> connectionIdTL = new ThreadLocal<Object>();
42  
43      protected Connection connection;
44      
45      protected CountDownLatch initialisationLatch = new CountDownLatch(1);
46  
47      protected volatile boolean closing;
48  
49      protected ObjectInputStream is;
50  
51      protected ObjectOutputStream os;
52  
53      // interfaces offered on the remote side
54      protected Map<Class<?>, Object> remoteInterfaces;
55  
56      // interfaces we offer to the rest of the world
57      protected Map<Class<?>, Object> localInterfaces;
58  
59      protected ExecutorService requestExecutor;
60  
61      protected ThreadLocal<String> threadIdTL = new ThreadLocal<String>();
62  
63      protected ThreadLocal<SynchronousQueue<Message>> syncQueueTL = new ThreadLocal<SynchronousQueue<Message>>();
64  
65      protected ConcurrentHashMap<String, SynchronousQueue<Message>> msgSyncPoints = 
66      	new ConcurrentHashMap<String, SynchronousQueue<Message>>();
67  
68      protected final boolean isClient;
69  
70      protected RPCHandlerListener listener = null;
71  
72      protected Object connectionId;
73  
74      /**
75       * Create an RPCHandler
76       * @param isClient Is this the client side of the connection (false for server)
77       * @param connection The socket connection.
78       * @param localInterfaces The interfaces this side offers.
79       * @param requestExecutor The service to use for running requests.
80       * @param connectionId The id of the connection.
81       */
82      public RPCHandler(boolean isClient,Connection connection, Map<Class<?>, Object> localInterfaces,
83              ExecutorService requestExecutor,Object connectionId) {
84          this.isClient = isClient;
85          this.connection = connection;
86          this.closing = false;
87  
88          this.localInterfaces = localInterfaces;
89          this.requestExecutor = requestExecutor;
90          this.connectionId = connectionId;
91      }
92  
93      /**
94       * Set the listener for this handler for event notification.
95       * @param newListener The listener to inform of events.
96       */
97      public synchronized void setHandlerListener(RPCHandlerListener newListener) {
98          this.listener = newListener;
99      }
100 
101     /**
102      * Returns an local interface for calling the remote interface.
103      * @param ifClass The interface to get.
104      * @return An object which offers the interface.
105      * @throws IllegalArgumentException If the requested class is not an interface or not supported.
106      */
107     public synchronized Object getRemoteIF(Class<?> ifClass) {
108         try {
109             initialisationLatch.await();
110         } catch (InterruptedException e) {
111             // do nothing
112         }
113         
114         if (!ifClass.isInterface())
115             throw new IllegalArgumentException(
116                     "given class is not an interface");
117 
118         Object obj = remoteInterfaces.get(ifClass);
119         if (obj == null) {
120             if (!remoteInterfaces.keySet().contains(ifClass))
121                 throw new IllegalArgumentException("Interface " + ifClass
122                         + " not offered by server");
123 
124             obj = createProxyClass(ifClass);
125             remoteInterfaces.put(ifClass, obj);
126         }
127 
128         return obj;
129     }
130 
131     protected Object createProxyClass(Class<?> ifClass) {
132         Class<?>[] ifs = {ifClass};
133         RPCProxyHandler ph = new RPCProxyHandler(this, ifClass);
134         Object obj = Proxy.newProxyInstance(this.getClass().getClassLoader(),
135                 ifs, ph);
136         return obj;
137     }
138 
139     public void initialise() {
140         try {
141         	// n.b. order of initialisation is reversed from client to server as otherwise
142         	// object streams fail to initialise
143         	
144             if(isClient) {
145                 is = new ObjectInputStream(new BufferedInputStream(connection.getInputStream()));
146                 os = new ObjectOutputStream(new BufferedOutputStream(connection.getOutputStream()));
147                 
148                 // this is needed otherwise buffering means that ObjectOutputStream fails to initialise
149                 os.flush();  
150             } else {
151                 os = new ObjectOutputStream(new BufferedOutputStream(connection.getOutputStream()));
152                 
153                 // this is needed otherwise buffering means that ObjectOutputStream fails to initialise
154                 os.flush();  
155                 is = new ObjectInputStream(new BufferedInputStream(connection.getInputStream()));
156             }
157 
158             AvailableIFsMessage ifOMsg = new AvailableIFsMessage(
159                     localInterfaces.keySet().toArray(new Class<?>[0]));
160             os.writeObject(ifOMsg);
161             os.flush();
162 
163             AvailableIFsMessage ifIMsg = (AvailableIFsMessage) is .readObject();
164 
165             synchronized(this) {
166             	remoteInterfaces = new HashMap<Class<?>, Object>();
167             	for (Class<?> ifClass : ifIMsg.getOfferedIFs())
168             		remoteInterfaces.put(ifClass, null);
169             }
170             
171             if(isClient) {
172             	// read the ready message
173             	is.readObject();
174             }
175 
176         } catch (ClassNotFoundException e) {
177         	LOG.error("Caught exception whilst initialising ",e);
178             connectionTerminated();
179         } catch (IOException ioe) {
180         	LOG.error("Caught exception whilst initialising ",ioe);
181             connectionTerminated();
182         }
183     }
184     
185     public void run() {
186         try {
187             initialisationLatch.countDown();
188             if(!isClient) {
189             	os.writeObject(new ServerReadyMessage());
190             	os.flush();
191             }
192                         
193             while (true) {
194                 Object o = null;
195                 try {
196                     o = is.readObject();
197                 } catch (ClassNotFoundException e1) {
198                     e1.printStackTrace();
199                 }
200 
201                 if (o instanceof Message) {
202                     Message message = (Message) o;
203                     try {
204                         receiveMessage(message);
205                     } catch (Exception e) {
206                         LOG.warn("exception thrown whilst sending message to receiver",e);
207                     }
208                 } else
209                     LOG.warn("Invalid object on stream " + o);
210             }
211         } catch (IOException se) {
212             connectionTerminated();
213         }
214     }
215 
216     protected void receiveMessage(Message message) {
217         if (message instanceof RPCCallRespMessage) {
218             final String threadId = message.getThreadId();
219             SynchronousQueue<Message> replyQueue = msgSyncPoints.get(threadId);
220             if (replyQueue != null) {
221                 try {
222                     replyQueue.put(message);
223                 } catch (InterruptedException e) {
224                     LOG.info("Receive thread interrupted");
225                 }
226             } else {
227                 LOG.error("No client thread found for sync message to "
228                         + threadId);
229             }
230         } else if (message instanceof RPCCallMessage) {
231             processCallMessage((RPCCallMessage) message);
232         } else if (message instanceof ErrorMessage) {
233             LOG.warn("Error message recieved from server "
234                     + ((ErrorMessage) message).getUserErrorMessage());
235         } else
236             LOG.warn("Invalid message type received by client: "
237                     + message.getClass());
238     }
239 
240     /**
241      * Internal method to handle RPC call method.
242      * This will invoke a thread using the executor supplied in the constructor.
243      * @param message The message to process.
244      */
245 
246     protected void processCallMessage(final RPCCallMessage message) {
247     	if(closing) {
248     		LOG.warn("Message processing skipped - client is shutting down");
249     		return;
250     	}
251     	
252         requestExecutor.execute(new Runnable() {
253             public void run() {
254                 connectionIdTL.set(connectionId);
255                 try {
256                     Class<?> targetIF  = message.getIfClass();
257                     Object targetIFImpl = localInterfaces.get(message.getIfClass());
258                     if(targetIF != null) {
259                         Method method = targetIF.getMethod(message.getMethodName(), message.getParameterTypes());
260                         if(method == null)
261                             throw new IllegalArgumentException("Interface method does not exist");
262                         Object resp = method.invoke(targetIFImpl,message.getParameters());
263                         if(!message.isAsyncCall())
264                             writeMessage(new RPCCallRespMessage(message.threadId,true,resp));
265                     } else
266                         throw new IllegalArgumentException("Interface not supported");
267                 } catch(Exception e) {
268                     Throwable cause = e instanceof InvocationTargetException?e.getCause():e;
269                     if(!message.isAsyncCall())
270                         writeMessage(new RPCCallRespMessage(message.threadId,false,cause));
271                     else
272                         LOG.warn("Exception caught whilst processing async call to " 
273                         		+ message.getIfClass() + "." + message.getMethodName() + "()",e);
274                 }
275                 connectionIdTL.set(null);
276             }
277         });
278     }
279 
280     protected synchronized void connectionTerminated() {
281         if (!closing) {
282             if(isClient)
283                 LOG.error("Lost connection to Server");
284             else
285                 LOG.info("Lost connection to client");
286         }
287 
288         closing = true;
289         if (msgSyncPoints.size() > 0) {
290             LOG.error("Client connection closed with waiters active");
291             ArrayList<String> list = new ArrayList<String>();
292             list.addAll(msgSyncPoints.keySet());
293 
294             ErrorMessage errorMessage = new ErrorMessage(
295                     "Server connection lost");
296             for (String threadId : list) {
297                 SynchronousQueue<Message> queue = msgSyncPoints.get(threadId);
298                 if (queue != null && !queue.offer(errorMessage))
299                 	LOG.warn("Unable to inform thread " + threadId
300                 			+ " of connection close");
301             }
302         }
303 
304         if(listener != null)
305             listener.connectionTerminated();
306     }
307 
308 
309     /**
310      * Close the connection to the server.
311      */
312     public void close() {
313         closing = true;
314         try {
315         	connection.close();
316         } catch (final ConnectionException ce) {
317             LOG.warn("Exception thrown whilst closing client connection", ce);
318         }
319     }
320 
321     protected synchronized void writeMessage(Message message) {
322         try {
323             os.writeObject(message);
324             os.flush();
325         } catch(NotSerializableException nse) {
326             LOG.error("Sent message not serializable ",nse);
327         } catch (final IOException e) {
328             LOG.warn("error caught writing object", e);
329         }
330     }
331 
332     /**
333      * Send a synchronous message to the server. This method sends the given message 
334      * and waits for a response message.
335      *
336      * @param message The message to send
337      * @return The message received in reply
338      */
339     public Message sendSyncMessage(Message message) {
340         final String threadId = ThreadUtil.getThreadId();
341 
342         SynchronousQueue<Message> sq = syncQueueTL.get();
343         if (sq == null) {
344             sq = new SynchronousQueue<Message>();
345             syncQueueTL.set(sq);
346         }
347 
348         msgSyncPoints.put(threadId, sq);
349 
350         writeMessage(message);
351         Message replyMsg = null;
352         try {
353             replyMsg = sq.take();
354         } catch (InterruptedException ie) {
355             replyMsg = new ErrorMessage(
356                     "InterruptedException received whilst waiting for reply");
357         }
358 
359         msgSyncPoints.remove(threadId);
360 
361         return replyMsg;
362     }
363 
364 
365     /**
366      * Internal method to make an asynchronous call to a given method.
367      * @param ifClass The interface class being called.
368      * @param methodName The method being called.
369      * @param parameterTypes The parameter types of the method
370      * @param args The actual call arguments (based types wrapped by Proxy.invoke).
371      */
372     protected void makeAsyncCall(Class<?> ifClass, String methodName,Class<?>[] parameterTypes, Object[] args) {
373         RPCCallMessage callMessage = new RPCCallMessage(ThreadUtil.getThreadId(),true,
374         		ifClass,methodName,parameterTypes,args);
375         writeMessage(callMessage);
376     }
377 
378     /**
379      * Internal method to make a make a synchronous call to a given method.
380      * @param ifClass The interface class being called.
381      * @param methodName The method being called.
382      * @param parameterTypes The parameter types of the method
383      * @param args The actual call arguments (based types wrapped by Proxy.invoke).
384      * @return The response message received  from the target.
385      */
386     protected RPCCallRespMessage makeSyncCall(Class<?> ifClass, String methodName,Class<?>[] parameterTypes,
387             Object[] args) {
388         RPCCallMessage callMessage = new RPCCallMessage(ThreadUtil.getThreadId(),false,
389         		ifClass,methodName,parameterTypes,args);
390         Message replyMsg = sendSyncMessage(callMessage);
391         if(replyMsg instanceof RPCCallRespMessage)
392             return (RPCCallRespMessage) replyMsg;
393         else if(replyMsg instanceof ErrorMessage)
394             return new RPCCallRespMessage(ThreadUtil.getThreadId(),false,
395             		new IllegalStateException("Error message returned:" 
396             				+ ((ErrorMessage) replyMsg).getUserErrorMessage()));
397             
398         else
399             return new RPCCallRespMessage(ThreadUtil.getThreadId(),false,
400             		new IllegalStateException("Unexpected message returned " + replyMsg.getClass()));
401     }
402 
403     /**
404      * Retrieve the connection id object associated with the current processing thread.
405      * A connection id is associated with each execution by the thread-pool.
406      * @return The connection id object supplied.
407      */
408     public static Object getConnectionId() {
409         return connectionIdTL.get();
410     }
411 
412     /**
413      * Start the RPCHandler.  This will trigger the handler to initialise
414      * the connection and be ready to start serving calls.
415      */
416     public void start() {
417         (new Thread(this)).start();
418     }
419 }