1 package org.sourceforge.jemm.database.remote.server; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 5 import org.apache.log4j.Logger; 6 import org.sourceforge.jemm.comm.connection.ConnectionException; 7 import org.sourceforge.jemm.comm.connection.ServerConnectionFactory; 8 import org.sourceforge.jemm.comm.server.ClientConnectionListener; 9 import org.sourceforge.jemm.comm.server.RPCClientId; 10 import org.sourceforge.jemm.comm.server.RPCServer; 11 import org.sourceforge.jemm.database.ClientId; 12 import org.sourceforge.jemm.database.ClientThreadId; 13 import org.sourceforge.jemm.database.Database; 14 import org.sourceforge.jemm.database.LockAcquiredListener; 15 import org.sourceforge.jemm.database.remote.shared.RDbClientIF; 16 import org.sourceforge.jemm.database.remote.shared.RDbServerIF; 17 import org.sourceforge.jemm.types.ID; 18 19 public class RemoteDatabaseServer { 20 21 private static final Logger LOG = Logger.getLogger(RemoteDatabaseServer.class); 22 23 private final Database underlyingDatabase; 24 private final ClientIdGenerator clientIdGenerator; 25 26 private final RPCServer rpcServer; 27 private final RDbServerIFImpl ifImpl; 28 29 private final ConcurrentHashMap<RPCClientId, ClientId> rpcIdToClientIdMap = 30 new ConcurrentHashMap<RPCClientId, ClientId>(); 31 32 public RemoteDatabaseServer(ServerConnectionFactory serverConnectionFactory, 33 Database underlyingDatabase) throws ConnectionException { 34 this.underlyingDatabase = underlyingDatabase; 35 clientIdGenerator = new DefaultClientIdGenerator(); 36 rpcServer = new RPCServer(serverConnectionFactory,25); 37 rpcServer.setClientListener(new ClientConnectionListener() 38 { 39 40 @Override 41 public void clientConnected(RPCClientId clientId, 42 String clientAddress) { 43 handleClientConnection(clientId,clientAddress); 44 } 45 46 @Override 47 public void clientDisconnected(RPCClientId clientId) { 48 handleClientDisconnect(clientId); 49 } 50 51 }); 52 53 ClientReferenceHandler crh = new ClientReferenceHandler(rpcServer,rpcIdToClientIdMap); 54 ifImpl = new RDbServerIFImpl(underlyingDatabase,crh); 55 rpcServer.registerInterface(RDbServerIF.class, ifImpl); 56 57 rpcServer.start(); 58 rpcServer.waitForInitialisation(); 59 } 60 61 protected void handleClientDisconnect(RPCClientId rpcClientId) { 62 63 ClientId clientId = rpcIdToClientIdMap.get(rpcClientId); 64 LOG.info("Client " + clientId + " disconnected"); 65 rpcIdToClientIdMap.remove(rpcClientId); 66 67 underlyingDatabase.removeLockAcquiredListener(clientId); 68 underlyingDatabase.clientDisconnect(clientId); 69 } 70 71 private void handleClientConnection(final RPCClientId rpcClientId, 72 String clientAddress) { 73 ClientId clientId = clientIdGenerator.nextId(); 74 LOG.info("Got connection from " + clientAddress + " assigned id " + clientId); 75 76 rpcIdToClientIdMap.put(rpcClientId, clientId); 77 underlyingDatabase.setClientLockAcquiredListener(clientId,new LockAcquiredListener() 78 { 79 RDbClientIF clientIf; 80 @Override 81 public void lockAcquired(ClientThreadId threadId, ID objectId) { 82 if(clientIf == null) 83 clientIf = (RDbClientIF) rpcServer.getClientIF(rpcClientId, RDbClientIF.class); 84 clientIf.lockAcquired(threadId.getThreadId(), objectId); 85 } 86 }); 87 } 88 89 public void shutdown() { 90 rpcServer.shutdown(); 91 } 92 }