View Javadoc

1   package org.sourceforge.jemm.database.remote.server;
2   
3   import java.util.concurrent.ConcurrentHashMap;
4   
5   import org.apache.log4j.Logger;
6   import org.sourceforge.jemm.comm.connection.ConnectionException;
7   import org.sourceforge.jemm.comm.connection.ServerConnectionFactory;
8   import org.sourceforge.jemm.comm.server.ClientConnectionListener;
9   import org.sourceforge.jemm.comm.server.RPCClientId;
10  import org.sourceforge.jemm.comm.server.RPCServer;
11  import org.sourceforge.jemm.database.ClientId;
12  import org.sourceforge.jemm.database.ClientThreadId;
13  import org.sourceforge.jemm.database.Database;
14  import org.sourceforge.jemm.database.LockAcquiredListener;
15  import org.sourceforge.jemm.database.remote.shared.RDbClientIF;
16  import org.sourceforge.jemm.database.remote.shared.RDbServerIF;
17  import org.sourceforge.jemm.types.ID;
18  
19  public class RemoteDatabaseServer {
20  	
21  	private static final Logger LOG = Logger.getLogger(RemoteDatabaseServer.class);
22  	
23  	private final Database underlyingDatabase;
24  	private final ClientIdGenerator clientIdGenerator;
25  	
26  	private final RPCServer rpcServer;
27  	private final RDbServerIFImpl ifImpl;
28  	
29  	private final ConcurrentHashMap<RPCClientId, ClientId> rpcIdToClientIdMap = 
30  		new ConcurrentHashMap<RPCClientId, ClientId>();
31  	
32  	public RemoteDatabaseServer(ServerConnectionFactory serverConnectionFactory, 
33  			Database underlyingDatabase) throws ConnectionException {
34  		this.underlyingDatabase = underlyingDatabase;
35  		clientIdGenerator = new DefaultClientIdGenerator();
36  		rpcServer = new RPCServer(serverConnectionFactory,25);
37  		rpcServer.setClientListener(new ClientConnectionListener()
38  		{
39  
40  			@Override
41  			public void clientConnected(RPCClientId clientId,
42  					String clientAddress) {
43  				handleClientConnection(clientId,clientAddress);
44  			}
45  
46  			@Override
47  			public void clientDisconnected(RPCClientId clientId) {
48  				handleClientDisconnect(clientId);
49  			}
50  			
51  		});
52  		
53  		ClientReferenceHandler crh = new ClientReferenceHandler(rpcServer,rpcIdToClientIdMap);
54  		ifImpl = new RDbServerIFImpl(underlyingDatabase,crh);
55  		rpcServer.registerInterface(RDbServerIF.class, ifImpl);
56  		
57  		rpcServer.start();
58          rpcServer.waitForInitialisation();
59  	}
60  
61  	protected void handleClientDisconnect(RPCClientId rpcClientId) {
62  		
63  		ClientId clientId = rpcIdToClientIdMap.get(rpcClientId);
64  		LOG.info("Client " + clientId + " disconnected");
65  		rpcIdToClientIdMap.remove(rpcClientId);
66  		
67  		underlyingDatabase.removeLockAcquiredListener(clientId);
68  		underlyingDatabase.clientDisconnect(clientId);
69  	}
70  
71  	private void handleClientConnection(final RPCClientId rpcClientId,
72  			String clientAddress) {
73  		ClientId clientId = clientIdGenerator.nextId();
74  		LOG.info("Got connection from " + clientAddress + " assigned id " + clientId);
75  
76  		rpcIdToClientIdMap.put(rpcClientId, clientId);		
77  		underlyingDatabase.setClientLockAcquiredListener(clientId,new LockAcquiredListener()
78  		{
79  			RDbClientIF clientIf;
80  			@Override
81  			public void lockAcquired(ClientThreadId threadId, ID objectId) {
82  				if(clientIf == null)
83  					 clientIf = (RDbClientIF) rpcServer.getClientIF(rpcClientId, RDbClientIF.class);
84  				clientIf.lockAcquired(threadId.getThreadId(), objectId);
85  			}			
86  		});
87  	}
88  	
89  	public void shutdown() {
90  		rpcServer.shutdown();
91  	}
92  }