1 package org.sourceforge.jemm.comm.server;
2
3 import java.util.HashMap;
4 import java.util.HashSet;
5 import java.util.Map;
6 import java.util.Set;
7 import java.util.concurrent.CountDownLatch;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Executors;
10
11 import org.apache.log4j.Logger;
12 import org.sourceforge.jemm.comm.connection.Connection;
13 import org.sourceforge.jemm.comm.connection.ConnectionException;
14 import org.sourceforge.jemm.comm.connection.ServerConnectionFactory;
15 import org.sourceforge.jemm.comm.shared.IFUtilities;
16 import org.sourceforge.jemm.comm.shared.RPCHandler;
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public class RPCServer extends Thread {
31 protected static final Logger LOG = Logger.getLogger(RPCServer.class);
32
33
34 protected Map<RPCClientId, ServerThread> clientThreads = new HashMap<RPCClientId, ServerThread>();
35
36
37 protected CountDownLatch initialisationLatch = new CountDownLatch(1);
38
39
40 protected boolean closing = false;
41
42
43 protected ExecutorService msgProccessingThreadPool;
44
45 protected final ServerConnectionFactory connectionFactory;
46
47
48 HashMap<Class<?>, Object> serverIFMap = new HashMap<Class<?>, Object>();
49
50 ClientConnectionListener clientConnListener;
51
52
53
54
55
56
57
58 public RPCServer(ServerConnectionFactory connectionFactory,ExecutorService executorService) {
59 this.connectionFactory = connectionFactory;
60 msgProccessingThreadPool = executorService;
61 }
62
63
64
65
66
67
68
69 public RPCServer(ServerConnectionFactory connectionFactory,int noProcessingThreads) {
70 this(connectionFactory,Executors.newFixedThreadPool(noProcessingThreads));
71 }
72
73
74
75
76
77
78
79 public void clientDisconnected(final RPCClientId clientId) {
80 LOG.info("client " + clientId + " disconnected");
81
82 final ClientConnectionListener listener = this.clientConnListener;
83 if(listener != null)
84 listener.clientDisconnected(clientId);
85
86 synchronized (clientThreads) {
87 clientThreads.remove(clientId);
88 }
89 }
90
91
92
93
94
95
96 public RPCClientId getClientId() {
97 return (RPCClientId) RPCHandler.getConnectionId();
98 }
99
100
101
102
103
104
105
106 public Object getClientIF(RPCClientId clientId,
107 Class<?> ifClass) {
108 ServerThread st = clientThreads.get(clientId);
109 return st.getClientIF(ifClass);
110 }
111
112
113
114
115
116 public void notifyNewClient(RPCClientId clientId,String clientAddress) {
117 ClientConnectionListener listener = this.clientConnListener;
118 if(listener != null)
119 listener.clientConnected(clientId,clientAddress);
120 else
121 LOG.debug("No listener set for new client notifications");
122 }
123
124
125
126
127
128
129 public void registerInterface(Class<?> targetIF,
130 Object targetIFImpl) {
131 IFUtilities.validateInterface(targetIF);
132
133 if(!targetIF.isInstance(targetIFImpl))
134 throw new IllegalArgumentException("Target impl not instance of interface");
135
136 serverIFMap.put(targetIF, targetIFImpl);
137 }
138
139 @Override
140 public void run() {
141 try {
142 LOG.info("Server started");
143 connectionFactory.initialise();
144 LOG.info("Listening for client connections");
145 initialisationLatch.countDown();
146
147 while (true) {
148 Connection connection = connectionFactory.getClientConnection();
149 RPCClientId clientId = new RPCClientId();
150 LOG.info("connection from " + connection.getConnectionName() + " assigned id=" + clientId);
151 ServerThread st = new ServerThread(this, connection, clientId,serverIFMap,msgProccessingThreadPool);
152 synchronized (clientThreads) {
153 clientThreads.put(clientId, st);
154 }
155
156
157
158 st.start();
159 }
160 } catch (ConnectionException e) {
161 if (!closing)
162 LOG.error("Unexpected exception whilst accepting connections",e);
163 }
164
165 LOG.info("Server terminated");
166 }
167
168
169
170
171
172 public void setClientListener(ClientConnectionListener listener) {
173 this.clientConnListener = listener;
174 }
175
176
177
178
179 public void shutdown() {
180 closing = true;
181 try {
182 LOG.info("Shutting down");
183
184
185 connectionFactory.close();
186 Set<ServerThread> toShutdown = new HashSet<ServerThread>();
187 toShutdown.addAll(clientThreads.values());
188
189
190 LOG.info("Notifying client threads");
191 for (ServerThread clientThread : toShutdown)
192 clientThread.shutdown();
193
194 for (ServerThread clientThread : toShutdown)
195 clientThread.waitForShutdown();
196
197 LOG.info("Client threads complete");
198 msgProccessingThreadPool.shutdown();
199
200 } catch (ConnectionException ce) {
201 LOG.warn("Exception thrown whilst terminating server", ce);
202 }
203 LOG.info("Shutdown complete");
204
205 }
206
207
208
209
210
211
212
213
214 public void waitForInitialisation() {
215 try {
216 initialisationLatch.await();
217 } catch (InterruptedException e) {
218 e.printStackTrace();
219 }
220 }
221 }