1
2
3
4 package org.sourceforge.jemm.comm.shared;
5
6 import java.io.BufferedInputStream;
7 import java.io.BufferedOutputStream;
8 import java.io.IOException;
9 import java.io.NotSerializableException;
10 import java.io.ObjectInputStream;
11 import java.io.ObjectOutputStream;
12 import java.lang.reflect.InvocationTargetException;
13 import java.lang.reflect.Method;
14 import java.lang.reflect.Proxy;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.SynchronousQueue;
22
23 import org.apache.log4j.Logger;
24 import org.sourceforge.jemm.comm.connection.Connection;
25 import org.sourceforge.jemm.comm.connection.ConnectionException;
26
27
28
29
30
31
32
33
34
35
36
37
38 public class RPCHandler implements Runnable {
39 private static final Logger LOG = Logger.getLogger(RPCHandler.class);
40
41 private static ThreadLocal<Object> connectionIdTL = new ThreadLocal<Object>();
42
43 protected Connection connection;
44
45 protected CountDownLatch initialisationLatch = new CountDownLatch(1);
46
47 protected volatile boolean closing;
48
49 protected ObjectInputStream is;
50
51 protected ObjectOutputStream os;
52
53
54 protected Map<Class<?>, Object> remoteInterfaces;
55
56
57 protected Map<Class<?>, Object> localInterfaces;
58
59 protected ExecutorService requestExecutor;
60
61 protected ThreadLocal<String> threadIdTL = new ThreadLocal<String>();
62
63 protected ThreadLocal<SynchronousQueue<Message>> syncQueueTL = new ThreadLocal<SynchronousQueue<Message>>();
64
65 protected ConcurrentHashMap<String, SynchronousQueue<Message>> msgSyncPoints =
66 new ConcurrentHashMap<String, SynchronousQueue<Message>>();
67
68 protected final boolean isClient;
69
70 protected RPCHandlerListener listener = null;
71
72 protected Object connectionId;
73
74
75
76
77
78
79
80
81
82 public RPCHandler(boolean isClient,Connection connection, Map<Class<?>, Object> localInterfaces,
83 ExecutorService requestExecutor,Object connectionId) {
84 this.isClient = isClient;
85 this.connection = connection;
86 this.closing = false;
87
88 this.localInterfaces = localInterfaces;
89 this.requestExecutor = requestExecutor;
90 this.connectionId = connectionId;
91 }
92
93
94
95
96
97 public synchronized void setHandlerListener(RPCHandlerListener newListener) {
98 this.listener = newListener;
99 }
100
101
102
103
104
105
106
107 public synchronized Object getRemoteIF(Class<?> ifClass) {
108 try {
109 initialisationLatch.await();
110 } catch (InterruptedException e) {
111
112 }
113
114 if (!ifClass.isInterface())
115 throw new IllegalArgumentException(
116 "given class is not an interface");
117
118 Object obj = remoteInterfaces.get(ifClass);
119 if (obj == null) {
120 if (!remoteInterfaces.keySet().contains(ifClass))
121 throw new IllegalArgumentException("Interface " + ifClass
122 + " not offered by server");
123
124 obj = createProxyClass(ifClass);
125 remoteInterfaces.put(ifClass, obj);
126 }
127
128 return obj;
129 }
130
131 protected Object createProxyClass(Class<?> ifClass) {
132 Class<?>[] ifs = {ifClass};
133 RPCProxyHandler ph = new RPCProxyHandler(this, ifClass);
134 Object obj = Proxy.newProxyInstance(this.getClass().getClassLoader(),
135 ifs, ph);
136 return obj;
137 }
138
139 public void initialise() {
140 try {
141
142
143
144 if(isClient) {
145 is = new ObjectInputStream(new BufferedInputStream(connection.getInputStream()));
146 os = new ObjectOutputStream(new BufferedOutputStream(connection.getOutputStream()));
147
148
149 os.flush();
150 } else {
151 os = new ObjectOutputStream(new BufferedOutputStream(connection.getOutputStream()));
152
153
154 os.flush();
155 is = new ObjectInputStream(new BufferedInputStream(connection.getInputStream()));
156 }
157
158 AvailableIFsMessage ifOMsg = new AvailableIFsMessage(
159 localInterfaces.keySet().toArray(new Class<?>[0]));
160 os.writeObject(ifOMsg);
161 os.flush();
162
163 AvailableIFsMessage ifIMsg = (AvailableIFsMessage) is .readObject();
164
165 synchronized(this) {
166 remoteInterfaces = new HashMap<Class<?>, Object>();
167 for (Class<?> ifClass : ifIMsg.getOfferedIFs())
168 remoteInterfaces.put(ifClass, null);
169 }
170
171 if(isClient) {
172
173 is.readObject();
174 }
175
176 } catch (ClassNotFoundException e) {
177 LOG.error("Caught exception whilst initialising ",e);
178 connectionTerminated();
179 } catch (IOException ioe) {
180 LOG.error("Caught exception whilst initialising ",ioe);
181 connectionTerminated();
182 }
183 }
184
185 public void run() {
186 try {
187 initialisationLatch.countDown();
188 if(!isClient) {
189 os.writeObject(new ServerReadyMessage());
190 os.flush();
191 }
192
193 while (true) {
194 Object o = null;
195 try {
196 o = is.readObject();
197 } catch (ClassNotFoundException e1) {
198 e1.printStackTrace();
199 }
200
201 if (o instanceof Message) {
202 Message message = (Message) o;
203 try {
204 receiveMessage(message);
205 } catch (Exception e) {
206 LOG.warn("exception thrown whilst sending message to receiver",e);
207 }
208 } else
209 LOG.warn("Invalid object on stream " + o);
210 }
211 } catch (IOException se) {
212 connectionTerminated();
213 }
214 }
215
216 protected void receiveMessage(Message message) {
217 if (message instanceof RPCCallRespMessage) {
218 final String threadId = message.getThreadId();
219 SynchronousQueue<Message> replyQueue = msgSyncPoints.get(threadId);
220 if (replyQueue != null) {
221 try {
222 replyQueue.put(message);
223 } catch (InterruptedException e) {
224 LOG.info("Receive thread interrupted");
225 }
226 } else {
227 LOG.error("No client thread found for sync message to "
228 + threadId);
229 }
230 } else if (message instanceof RPCCallMessage) {
231 processCallMessage((RPCCallMessage) message);
232 } else if (message instanceof ErrorMessage) {
233 LOG.warn("Error message recieved from server "
234 + ((ErrorMessage) message).getUserErrorMessage());
235 } else
236 LOG.warn("Invalid message type received by client: "
237 + message.getClass());
238 }
239
240
241
242
243
244
245
246 protected void processCallMessage(final RPCCallMessage message) {
247 if(closing) {
248 LOG.warn("Message processing skipped - client is shutting down");
249 return;
250 }
251
252 requestExecutor.execute(new Runnable() {
253 public void run() {
254 connectionIdTL.set(connectionId);
255 try {
256 Class<?> targetIF = message.getIfClass();
257 Object targetIFImpl = localInterfaces.get(message.getIfClass());
258 if(targetIF != null) {
259 Method method = targetIF.getMethod(message.getMethodName(), message.getParameterTypes());
260 if(method == null)
261 throw new IllegalArgumentException("Interface method does not exist");
262 Object resp = method.invoke(targetIFImpl,message.getParameters());
263 if(!message.isAsyncCall())
264 writeMessage(new RPCCallRespMessage(message.threadId,true,resp));
265 } else
266 throw new IllegalArgumentException("Interface not supported");
267 } catch(Exception e) {
268 Throwable cause = e instanceof InvocationTargetException?e.getCause():e;
269 if(!message.isAsyncCall())
270 writeMessage(new RPCCallRespMessage(message.threadId,false,cause));
271 else
272 LOG.warn("Exception caught whilst processing async call to "
273 + message.getIfClass() + "." + message.getMethodName() + "()",e);
274 }
275 connectionIdTL.set(null);
276 }
277 });
278 }
279
280 protected synchronized void connectionTerminated() {
281 if (!closing) {
282 if(isClient)
283 LOG.error("Lost connection to Server");
284 else
285 LOG.info("Lost connection to client");
286 }
287
288 closing = true;
289 if (msgSyncPoints.size() > 0) {
290 LOG.error("Client connection closed with waiters active");
291 ArrayList<String> list = new ArrayList<String>();
292 list.addAll(msgSyncPoints.keySet());
293
294 ErrorMessage errorMessage = new ErrorMessage(
295 "Server connection lost");
296 for (String threadId : list) {
297 SynchronousQueue<Message> queue = msgSyncPoints.get(threadId);
298 if (queue != null && !queue.offer(errorMessage))
299 LOG.warn("Unable to inform thread " + threadId
300 + " of connection close");
301 }
302 }
303
304 if(listener != null)
305 listener.connectionTerminated();
306 }
307
308
309
310
311
312 public void close() {
313 closing = true;
314 try {
315 connection.close();
316 } catch (final ConnectionException ce) {
317 LOG.warn("Exception thrown whilst closing client connection", ce);
318 }
319 }
320
321 protected synchronized void writeMessage(Message message) {
322 try {
323 os.writeObject(message);
324 os.flush();
325 } catch(NotSerializableException nse) {
326 LOG.error("Sent message not serializable ",nse);
327 } catch (final IOException e) {
328 LOG.warn("error caught writing object", e);
329 }
330 }
331
332
333
334
335
336
337
338
339 public Message sendSyncMessage(Message message) {
340 final String threadId = ThreadUtil.getThreadId();
341
342 SynchronousQueue<Message> sq = syncQueueTL.get();
343 if (sq == null) {
344 sq = new SynchronousQueue<Message>();
345 syncQueueTL.set(sq);
346 }
347
348 msgSyncPoints.put(threadId, sq);
349
350 writeMessage(message);
351 Message replyMsg = null;
352 try {
353 replyMsg = sq.take();
354 } catch (InterruptedException ie) {
355 replyMsg = new ErrorMessage(
356 "InterruptedException received whilst waiting for reply");
357 }
358
359 msgSyncPoints.remove(threadId);
360
361 return replyMsg;
362 }
363
364
365
366
367
368
369
370
371
372 protected void makeAsyncCall(Class<?> ifClass, String methodName,Class<?>[] parameterTypes, Object[] args) {
373 RPCCallMessage callMessage = new RPCCallMessage(ThreadUtil.getThreadId(),true,
374 ifClass,methodName,parameterTypes,args);
375 writeMessage(callMessage);
376 }
377
378
379
380
381
382
383
384
385
386 protected RPCCallRespMessage makeSyncCall(Class<?> ifClass, String methodName,Class<?>[] parameterTypes,
387 Object[] args) {
388 RPCCallMessage callMessage = new RPCCallMessage(ThreadUtil.getThreadId(),false,
389 ifClass,methodName,parameterTypes,args);
390 Message replyMsg = sendSyncMessage(callMessage);
391 if(replyMsg instanceof RPCCallRespMessage)
392 return (RPCCallRespMessage) replyMsg;
393 else if(replyMsg instanceof ErrorMessage)
394 return new RPCCallRespMessage(ThreadUtil.getThreadId(),false,
395 new IllegalStateException("Error message returned:"
396 + ((ErrorMessage) replyMsg).getUserErrorMessage()));
397
398 else
399 return new RPCCallRespMessage(ThreadUtil.getThreadId(),false,
400 new IllegalStateException("Unexpected message returned " + replyMsg.getClass()));
401 }
402
403
404
405
406
407
408 public static Object getConnectionId() {
409 return connectionIdTL.get();
410 }
411
412
413
414
415
416 public void start() {
417 (new Thread(this)).start();
418 }
419 }