View Javadoc

1   package org.sourceforge.jemm.client;
2   
3   import java.util.ArrayList;
4   import java.util.Arrays;
5   import java.util.Collections;
6   import java.util.HashMap;
7   import java.util.HashSet;
8   import java.util.List;
9   import java.util.Map;
10  import java.util.Map.Entry;
11  import java.util.Set;
12  import java.util.concurrent.locks.Lock;
13  import java.util.concurrent.locks.ReadWriteLock;
14  import java.util.concurrent.locks.ReentrantReadWriteLock;
15  
16  import org.sourceforge.jemm.client.id.TrackedID;
17  import org.sourceforge.jemm.client.id.TrackedIDFactory;
18  import org.sourceforge.jemm.client.id.TrackedIDListener;
19  import org.sourceforge.jemm.client.shared.ValueEncoder;
20  import org.sourceforge.jemm.database.ClassId;
21  import org.sourceforge.jemm.database.ClientId;
22  import org.sourceforge.jemm.database.ClientThreadId;
23  import org.sourceforge.jemm.database.Database;
24  import org.sourceforge.jemm.database.FieldInfo;
25  import org.sourceforge.jemm.database.GetObjectResp;
26  import org.sourceforge.jemm.database.LockAcquiredListener;
27  import org.sourceforge.jemm.database.ObjectSyncData;
28  import org.sourceforge.jemm.database.ObjectSyncResp;
29  import org.sourceforge.jemm.lifecycle.TypeRequest;
30  import org.sourceforge.jemm.lifecycle.TypeResponse;
31  import org.sourceforge.jemm.types.ID;
32  
33  /**
34   * This Database implementation tracks maps any ids from the delegated database into
35   * singleton tracked ids (i.e. for any given object, there will be one singleton instance 
36   * of a tracked id.  All down calls to the delegate will have the tracked ids mapped back to 
37   * regular ids. 
38   * <P> It is responsible for detecting live use of the ids within the client code, once a client
39   * has finished using a tracked id, it will be garbage collected and the database will inform the
40   * underlying database that the id is no longer referenced.
41   * <P> To avoid a race condition between marking an id as unused and it being passed back in 
42   * another call, the database users locking
43   * <P> <B> N.b. </B> This is heavily based upon the original ReferenceCleaner implementation and
44   * effectively is moving that code down into the Database rather than ObjectDatabase layer.
45   * 
46   * @author Paul Keeble
47   * @author Rory Graves
48   */
49  public class TrackingDatabase extends DelegatingDatabase {
50  	/** Sleep time between reference cleaning cycles (in ms) */
51  	public static final long SLEEP_TIME=10;
52  
53  	private Set<ID> queued;
54  	private Thread clearingThread;
55  	
56  	private final ClientId refClearedClientId = new ClientId("CLIENT");
57  
58  	private final TrackedIDFactory trackedIDFactory;
59  	
60  	/** Handlers for listener */
61  	private ClientId callerListenerClientId;
62  	private LockAcquiredListener callerLockListener;
63  	
64  	private final ReadWriteLock lock;
65  	private final Lock updateLock;
66  	private final Lock gcLock;
67  
68  	private final WrappingMapper wrappingMapper = new WrappingMapper();
69  	private final UnwrappingMapper unwrappingMapper = new UnwrappingMapper();
70  	
71  	public TrackingDatabase(Database delegate,TrackedIDFactory idFactory) {
72  		super(delegate);
73  		lock = new ReentrantReadWriteLock();
74  		updateLock = lock.readLock();
75  		gcLock = lock.writeLock();
76  		queued = Collections.synchronizedSet(new HashSet<ID>());
77  
78  		this.trackedIDFactory = idFactory;
79  		trackedIDFactory.addListener(new TrackedIDListener() {
80  			@Override
81  			public void expired(ID id) {
82  				queued.add(id);
83  			}			
84  		});
85  		
86  		clearingThread = new Thread(new QueueReader(),"Reference Cleaner");
87  		clearingThread.setDaemon(true);
88  		clearingThread.start();
89  	}
90  	
91  	private ID unwrapId(ID wrappedId) {
92  		
93  		if(wrappedId == null)
94  			return null;
95  		
96  		// unwrap an id for passing down
97  		if(wrappedId instanceof TrackedID) 
98  			return new ID(wrappedId.getIDValue());
99  		else
100 			// no raw ids should have been passed in by the layer above
101 			throw new IllegalStateException("All ids passed in must be TrackedIDs");
102 	}
103 
104 	/**
105 	 * Wrap an id as a TrackedID for passing up to higher levels.
106 	 * @param unwrappedId The id to wrap.
107 	 * @return
108 	 */
109 	private ID wrapId(ID unwrappedId) {
110 		if(unwrappedId == null)
111 			return null;
112 		else if(unwrappedId instanceof TrackedID)
113 			throw new IllegalStateException("TrackedID should not occur in lower levels");
114 		else
115 			return trackedIDFactory.create(unwrappedId);
116 	}
117 	
118 	private class WrappingMapper extends ValueEncoder implements ValueMapper {
119 
120 		@Override public ID mapID(ID id) {
121 			return wrapId((ID) id);
122 		}
123 
124 		@Override public Object map(Object o) {
125 			if(o instanceof ID)
126 				return wrapId((ID) o);
127 			else
128 				return o;
129 		}
130 
131 		@SuppressWarnings("unchecked")
132 		@Override
133 		public <K> K encode(K value) {
134 			if(value instanceof ID)
135 				return (K) wrapId((ID) value);
136 			else
137 				return value;
138 		}
139 	}
140 
141 	private class UnwrappingMapper extends ValueEncoder implements ValueMapper {
142 
143 		@Override public ID mapID(ID id) {
144 			return unwrapId((ID) id);
145 		}
146 		@Override public Object map(Object o) {
147 			if(o instanceof ID)
148 				return unwrapId((ID) o);
149 			else
150 				return o;
151 		}
152 		
153 		@SuppressWarnings("unchecked")
154 		@Override public <K> K encode(K value) {
155 			if(value instanceof ID)
156 				return (K) unwrapId((ID) value);
157 			else
158 				return value;
159 		}
160 	}
161 
162 	@Override public void acquireLock(ClientThreadId threadId, ID jemmId) {
163 		super.acquireLock(threadId, unwrapId(jemmId));
164 	}
165 	
166 	@Override public GetObjectResp getObject(ClientId clientId, ID jemmId) {
167 		try {
168 			updateLock.lock();
169 			GetObjectResp resp = super.getObject(clientId, unwrapId(jemmId));
170 			return mapGetObjResp(resp,wrappingMapper);
171 		} finally {
172 			updateLock.unlock();
173 		}		
174 	}
175 	@Override public ID getRoot(ClientId clientId, String rootName) {
176 		try {
177 			updateLock.lock();
178 			return wrapId(super.getRoot(clientId, rootName));
179 		} finally {
180 			updateLock.unlock();
181 		}		
182 	}
183 	@Override public ID newObject(ClientId clientId, ClassId classId) {
184 		return wrapId(super.newObject(clientId, classId));
185 	}
186 	@Override public void referenceCleared(ClientId clientId, ID... ids) {
187 		throw new IllegalStateException("Higher level database should not call this method");
188 	}	
189 	@Override public void releaseLock(ClientThreadId threadId, ID jemmId) {
190 		super.releaseLock(threadId, unwrapId(jemmId));
191 	}
192 	@Override public void setRoot(ClientId clientId, String rootName, ID newValue) {
193 		super.setRoot(clientId, rootName, unwrapId(newValue));
194 	}
195 	@Override public ID setRootIfNull(ClientId clientId, String rootName, ID newValue) {
196 		try {
197 			updateLock.lock();
198 			return wrapId(super.setRootIfNull(clientId, rootName, unwrapId(newValue)));
199 		} finally {
200 			updateLock.unlock();
201 		}		
202 	}
203 	@Override public ObjectSyncResp synchroniseObject(ClientId clientId, ID jemmId,
204 			ObjectSyncData syncData) {
205 		try {
206 			updateLock.lock();
207 			// wrap the request fields
208 			ObjectSyncData toSendSyncData = mapSyncData(syncData,unwrappingMapper);
209 			ObjectSyncResp resp = super.synchroniseObject(clientId, unwrapId(jemmId), toSendSyncData);
210 			return mapSyncRespData(resp,wrappingMapper);
211 		} finally {
212 			updateLock.unlock();
213 		}		
214 	}
215 	@Override public TypeResponse<?> processTypeRequest(ClientId clientId,
216 			ClassId classId, ID objId, TypeRequest<?> request) {
217 		try {
218 			updateLock.lock();
219 			TypeRequest<?> toSendRequest = (TypeRequest<?>) request.encode(unwrappingMapper);
220 			TypeResponse<?> resp = super.processTypeRequest(clientId, classId, unwrapId(objId), toSendRequest);
221 			return (TypeResponse<?>) resp.encode(wrappingMapper);
222 		} finally {
223 			updateLock.unlock();
224 		}
225 	}
226 
227 	@Override public synchronized void removeLockAcquiredListener(ClientId clientId) {
228 		super.removeLockAcquiredListener(clientId);
229 		callerListenerClientId = null;
230 		callerLockListener = null;
231 	}
232 	@Override public synchronized void setClientLockAcquiredListener(ClientId clientId,
233 			LockAcquiredListener listener) {
234 		if(this.callerListenerClientId != null)
235 			throw new IllegalStateException("client lock acquire listener already set");
236 	
237 		this.callerListenerClientId = clientId;
238 		this.callerLockListener = listener;
239 		super.setClientLockAcquiredListener(clientId, new LockAcquiredListener() {
240 
241 			@Override
242 			public void lockAcquired(ClientThreadId threadId, ID objectId) {
243 				callerLockListener.lockAcquired(threadId, wrapId(objectId));
244 			}
245 			
246 		});
247 	}
248 
249 	// methods to create new instances of the requests with all of the fields
250 	// mapped. 
251 	
252 	private GetObjectResp mapGetObjResp(GetObjectResp src, ValueMapper mapper) {
253 		
254 		return new GetObjectResp(mapper.mapID(src.jemmId),src.getClassId(),src.clientVersion,
255 				mapFieldValues(src.fieldValues,mapper));
256 	}
257 
258 	private Map<FieldInfo, Object> mapFieldValues(Map<FieldInfo, Object> src,ValueMapper mapper) {
259 		Map<FieldInfo,Object> newFields = new HashMap<FieldInfo, Object>();
260 		for (Entry<FieldInfo, Object> srcField : src.entrySet())
261 			newFields.put(srcField.getKey(), mapper.map(srcField.getValue()));
262 		
263 		return newFields;
264 	}
265 
266 	private ObjectSyncResp mapSyncRespData(ObjectSyncResp resp, ValueMapper mapper) {
267 		return new ObjectSyncResp(resp.getNewVersionNo(),
268 				mapFieldValues(resp.getUpdatedFields(), mapper));
269 	}
270 
271 	private ObjectSyncData mapSyncData(ObjectSyncData syncData,ValueMapper mapper) {
272 		return new ObjectSyncData(mapper.mapID(syncData.jemmId),
273 				syncData.clientVersion,mapFieldValues(syncData.fieldValues, mapper));
274 	}
275 	
276 	/**
277 	 * An implementation that watches for elements in the
278 	 * queue and takes a batch of IDs and passes them down
279 	 * the stack if they are no longer tracked.
280 	 * 
281 	 * @author Paul Keeble
282 	 *
283 	 */
284 	class QueueReader implements Runnable {
285 		public void run() {
286 			try {
287 				while(true) {
288 					if(queued.size()==0) {
289 						Thread.sleep(SLEEP_TIME);
290 					} else {
291 						acquireLockAndClear();
292 					}
293 				}
294 			} catch (InterruptedException e) {
295 				// do nothing
296 			}
297 		}
298 		
299 		public void acquireLockAndClear() {
300 			try {
301 				gcLock.lock();
302 				ID[] candidates = queued.toArray(new ID[0]);
303 				ID[] toClear = removeCurrentlyTracked(candidates);
304 				if(toClear.length>0)
305 					internalClearRererence(toClear);
306 				
307 				queued.removeAll(Arrays.asList(candidates));
308 			} finally {
309 				gcLock.unlock();
310 			}
311 		}
312 		
313 		public ID[] removeCurrentlyTracked(ID[] candidateIds) {
314 			List<ID> toClearIds = new ArrayList<ID>();
315 			for(ID id : candidateIds) {
316 				if(!trackedIDFactory.contains(id))
317 					toClearIds.add(id);
318 			}
319 			
320 			return toClearIds.toArray(new ID[0]);
321 		}		
322 	}
323 	
324 	public void internalClearRererence(ID[] toClear) {
325 		super.referenceCleared(refClearedClientId, toClear);
326 	}
327 	
328 	/**
329 	 * TrackingDatabase uses a separate thread to pass ID information for cleaning,
330 	 * shutdown must be called to recover that native resource to avoid leaks.
331 	 * 
332 	 */
333 	public void shutdown() {
334 		clearingThread.interrupt();
335 	}
336 }