1 package org.sourceforge.jemm.database.remote.server;
2
3 import java.util.concurrent.ConcurrentHashMap;
4
5 import org.apache.log4j.Logger;
6 import org.sourceforge.jemm.comm.connection.ConnectionException;
7 import org.sourceforge.jemm.comm.connection.ServerConnectionFactory;
8 import org.sourceforge.jemm.comm.server.ClientConnectionListener;
9 import org.sourceforge.jemm.comm.server.RPCClientId;
10 import org.sourceforge.jemm.comm.server.RPCServer;
11 import org.sourceforge.jemm.database.ClientId;
12 import org.sourceforge.jemm.database.ClientThreadId;
13 import org.sourceforge.jemm.database.Database;
14 import org.sourceforge.jemm.database.LockAcquiredListener;
15 import org.sourceforge.jemm.database.remote.shared.RDbClientIF;
16 import org.sourceforge.jemm.database.remote.shared.RDbServerIF;
17 import org.sourceforge.jemm.types.ID;
18
19 public class RemoteDatabaseServer {
20
21 private static final Logger LOG = Logger.getLogger(RemoteDatabaseServer.class);
22
23 private final Database underlyingDatabase;
24 private final ClientIdGenerator clientIdGenerator;
25
26 private final RPCServer rpcServer;
27 private final RDbServerIFImpl ifImpl;
28
29 private final ConcurrentHashMap<RPCClientId, ClientId> rpcIdToClientIdMap =
30 new ConcurrentHashMap<RPCClientId, ClientId>();
31
32 public RemoteDatabaseServer(ServerConnectionFactory serverConnectionFactory,
33 Database underlyingDatabase) throws ConnectionException {
34 this.underlyingDatabase = underlyingDatabase;
35 clientIdGenerator = new DefaultClientIdGenerator();
36 rpcServer = new RPCServer(serverConnectionFactory,25);
37 rpcServer.setClientListener(new ClientConnectionListener()
38 {
39
40 @Override
41 public void clientConnected(RPCClientId clientId,
42 String clientAddress) {
43 handleClientConnection(clientId,clientAddress);
44 }
45
46 @Override
47 public void clientDisconnected(RPCClientId clientId) {
48 handleClientDisconnect(clientId);
49 }
50
51 });
52
53 ClientReferenceHandler crh = new ClientReferenceHandler(rpcServer,rpcIdToClientIdMap);
54 ifImpl = new RDbServerIFImpl(underlyingDatabase,crh);
55 rpcServer.registerInterface(RDbServerIF.class, ifImpl);
56
57 rpcServer.start();
58 rpcServer.waitForInitialisation();
59 }
60
61 protected void handleClientDisconnect(RPCClientId rpcClientId) {
62
63 ClientId clientId = rpcIdToClientIdMap.get(rpcClientId);
64 LOG.info("Client " + clientId + " disconnected");
65 rpcIdToClientIdMap.remove(rpcClientId);
66
67 underlyingDatabase.removeLockAcquiredListener(clientId);
68 underlyingDatabase.clientDisconnect(clientId);
69 }
70
71 private void handleClientConnection(final RPCClientId rpcClientId,
72 String clientAddress) {
73 ClientId clientId = clientIdGenerator.nextId();
74 LOG.info("Got connection from " + clientAddress + " assigned id " + clientId);
75
76 rpcIdToClientIdMap.put(rpcClientId, clientId);
77 underlyingDatabase.setClientLockAcquiredListener(clientId,new LockAcquiredListener()
78 {
79 RDbClientIF clientIf;
80 @Override
81 public void lockAcquired(ClientThreadId threadId, ID objectId) {
82 if(clientIf == null)
83 clientIf = (RDbClientIF) rpcServer.getClientIF(rpcClientId, RDbClientIF.class);
84 clientIf.lockAcquired(threadId.getThreadId(), objectId);
85 }
86 });
87 }
88
89 public void shutdown() {
90 rpcServer.shutdown();
91 }
92 }