1 package org.sourceforge.jemm.client;
2
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Map.Entry;
11 import java.util.Set;
12 import java.util.concurrent.locks.Lock;
13 import java.util.concurrent.locks.ReadWriteLock;
14 import java.util.concurrent.locks.ReentrantReadWriteLock;
15
16 import org.sourceforge.jemm.client.id.TrackedID;
17 import org.sourceforge.jemm.client.id.TrackedIDFactory;
18 import org.sourceforge.jemm.client.id.TrackedIDListener;
19 import org.sourceforge.jemm.client.shared.ValueEncoder;
20 import org.sourceforge.jemm.database.ClassId;
21 import org.sourceforge.jemm.database.ClientId;
22 import org.sourceforge.jemm.database.ClientThreadId;
23 import org.sourceforge.jemm.database.Database;
24 import org.sourceforge.jemm.database.FieldInfo;
25 import org.sourceforge.jemm.database.GetObjectResp;
26 import org.sourceforge.jemm.database.LockAcquiredListener;
27 import org.sourceforge.jemm.database.ObjectSyncData;
28 import org.sourceforge.jemm.database.ObjectSyncResp;
29 import org.sourceforge.jemm.lifecycle.TypeRequest;
30 import org.sourceforge.jemm.lifecycle.TypeResponse;
31 import org.sourceforge.jemm.types.ID;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class TrackingDatabase extends DelegatingDatabase {
50
51 public static final long SLEEP_TIME=10;
52
53 private Set<ID> queued;
54 private Thread clearingThread;
55
56 private final ClientId refClearedClientId = new ClientId("CLIENT");
57
58 private final TrackedIDFactory trackedIDFactory;
59
60
61 private ClientId callerListenerClientId;
62 private LockAcquiredListener callerLockListener;
63
64 private final ReadWriteLock lock;
65 private final Lock updateLock;
66 private final Lock gcLock;
67
68 private final WrappingMapper wrappingMapper = new WrappingMapper();
69 private final UnwrappingMapper unwrappingMapper = new UnwrappingMapper();
70
71 public TrackingDatabase(Database delegate,TrackedIDFactory idFactory) {
72 super(delegate);
73 lock = new ReentrantReadWriteLock();
74 updateLock = lock.readLock();
75 gcLock = lock.writeLock();
76 queued = Collections.synchronizedSet(new HashSet<ID>());
77
78 this.trackedIDFactory = idFactory;
79 trackedIDFactory.addListener(new TrackedIDListener() {
80 @Override
81 public void expired(ID id) {
82 queued.add(id);
83 }
84 });
85
86 clearingThread = new Thread(new QueueReader(),"Reference Cleaner");
87 clearingThread.setDaemon(true);
88 clearingThread.start();
89 }
90
91 private ID unwrapId(ID wrappedId) {
92
93 if(wrappedId == null)
94 return null;
95
96
97 if(wrappedId instanceof TrackedID)
98 return new ID(wrappedId.getIDValue());
99 else
100
101 throw new IllegalStateException("All ids passed in must be TrackedIDs");
102 }
103
104
105
106
107
108
109 private ID wrapId(ID unwrappedId) {
110 if(unwrappedId == null)
111 return null;
112 else if(unwrappedId instanceof TrackedID)
113 throw new IllegalStateException("TrackedID should not occur in lower levels");
114 else
115 return trackedIDFactory.create(unwrappedId);
116 }
117
118 private class WrappingMapper extends ValueEncoder implements ValueMapper {
119
120 @Override public ID mapID(ID id) {
121 return wrapId((ID) id);
122 }
123
124 @Override public Object map(Object o) {
125 if(o instanceof ID)
126 return wrapId((ID) o);
127 else
128 return o;
129 }
130
131 @SuppressWarnings("unchecked")
132 @Override
133 public <K> K encode(K value) {
134 if(value instanceof ID)
135 return (K) wrapId((ID) value);
136 else
137 return value;
138 }
139 }
140
141 private class UnwrappingMapper extends ValueEncoder implements ValueMapper {
142
143 @Override public ID mapID(ID id) {
144 return unwrapId((ID) id);
145 }
146 @Override public Object map(Object o) {
147 if(o instanceof ID)
148 return unwrapId((ID) o);
149 else
150 return o;
151 }
152
153 @SuppressWarnings("unchecked")
154 @Override public <K> K encode(K value) {
155 if(value instanceof ID)
156 return (K) unwrapId((ID) value);
157 else
158 return value;
159 }
160 }
161
162 @Override public void acquireLock(ClientThreadId threadId, ID jemmId) {
163 super.acquireLock(threadId, unwrapId(jemmId));
164 }
165
166 @Override public GetObjectResp getObject(ClientId clientId, ID jemmId) {
167 try {
168 updateLock.lock();
169 GetObjectResp resp = super.getObject(clientId, unwrapId(jemmId));
170 return mapGetObjResp(resp,wrappingMapper);
171 } finally {
172 updateLock.unlock();
173 }
174 }
175 @Override public ID getRoot(ClientId clientId, String rootName) {
176 try {
177 updateLock.lock();
178 return wrapId(super.getRoot(clientId, rootName));
179 } finally {
180 updateLock.unlock();
181 }
182 }
183 @Override public ID newObject(ClientId clientId, ClassId classId) {
184 return wrapId(super.newObject(clientId, classId));
185 }
186 @Override public void referenceCleared(ClientId clientId, ID... ids) {
187 throw new IllegalStateException("Higher level database should not call this method");
188 }
189 @Override public void releaseLock(ClientThreadId threadId, ID jemmId) {
190 super.releaseLock(threadId, unwrapId(jemmId));
191 }
192 @Override public void setRoot(ClientId clientId, String rootName, ID newValue) {
193 super.setRoot(clientId, rootName, unwrapId(newValue));
194 }
195 @Override public ID setRootIfNull(ClientId clientId, String rootName, ID newValue) {
196 try {
197 updateLock.lock();
198 return wrapId(super.setRootIfNull(clientId, rootName, unwrapId(newValue)));
199 } finally {
200 updateLock.unlock();
201 }
202 }
203 @Override public ObjectSyncResp synchroniseObject(ClientId clientId, ID jemmId,
204 ObjectSyncData syncData) {
205 try {
206 updateLock.lock();
207
208 ObjectSyncData toSendSyncData = mapSyncData(syncData,unwrappingMapper);
209 ObjectSyncResp resp = super.synchroniseObject(clientId, unwrapId(jemmId), toSendSyncData);
210 return mapSyncRespData(resp,wrappingMapper);
211 } finally {
212 updateLock.unlock();
213 }
214 }
215 @Override public TypeResponse<?> processTypeRequest(ClientId clientId,
216 ClassId classId, ID objId, TypeRequest<?> request) {
217 try {
218 updateLock.lock();
219 TypeRequest<?> toSendRequest = (TypeRequest<?>) request.encode(unwrappingMapper);
220 TypeResponse<?> resp = super.processTypeRequest(clientId, classId, unwrapId(objId), toSendRequest);
221 return (TypeResponse<?>) resp.encode(wrappingMapper);
222 } finally {
223 updateLock.unlock();
224 }
225 }
226
227 @Override public synchronized void removeLockAcquiredListener(ClientId clientId) {
228 super.removeLockAcquiredListener(clientId);
229 callerListenerClientId = null;
230 callerLockListener = null;
231 }
232 @Override public synchronized void setClientLockAcquiredListener(ClientId clientId,
233 LockAcquiredListener listener) {
234 if(this.callerListenerClientId != null)
235 throw new IllegalStateException("client lock acquire listener already set");
236
237 this.callerListenerClientId = clientId;
238 this.callerLockListener = listener;
239 super.setClientLockAcquiredListener(clientId, new LockAcquiredListener() {
240
241 @Override
242 public void lockAcquired(ClientThreadId threadId, ID objectId) {
243 callerLockListener.lockAcquired(threadId, wrapId(objectId));
244 }
245
246 });
247 }
248
249
250
251
252 private GetObjectResp mapGetObjResp(GetObjectResp src, ValueMapper mapper) {
253
254 return new GetObjectResp(mapper.mapID(src.jemmId),src.getClassId(),src.clientVersion,
255 mapFieldValues(src.fieldValues,mapper));
256 }
257
258 private Map<FieldInfo, Object> mapFieldValues(Map<FieldInfo, Object> src,ValueMapper mapper) {
259 Map<FieldInfo,Object> newFields = new HashMap<FieldInfo, Object>();
260 for (Entry<FieldInfo, Object> srcField : src.entrySet())
261 newFields.put(srcField.getKey(), mapper.map(srcField.getValue()));
262
263 return newFields;
264 }
265
266 private ObjectSyncResp mapSyncRespData(ObjectSyncResp resp, ValueMapper mapper) {
267 return new ObjectSyncResp(resp.getNewVersionNo(),
268 mapFieldValues(resp.getUpdatedFields(), mapper));
269 }
270
271 private ObjectSyncData mapSyncData(ObjectSyncData syncData,ValueMapper mapper) {
272 return new ObjectSyncData(mapper.mapID(syncData.jemmId),
273 syncData.clientVersion,mapFieldValues(syncData.fieldValues, mapper));
274 }
275
276
277
278
279
280
281
282
283
284 class QueueReader implements Runnable {
285 public void run() {
286 try {
287 while(true) {
288 if(queued.size()==0) {
289 Thread.sleep(SLEEP_TIME);
290 } else {
291 acquireLockAndClear();
292 }
293 }
294 } catch (InterruptedException e) {
295
296 }
297 }
298
299 public void acquireLockAndClear() {
300 try {
301 gcLock.lock();
302 ID[] candidates = queued.toArray(new ID[0]);
303 ID[] toClear = removeCurrentlyTracked(candidates);
304 if(toClear.length>0)
305 internalClearRererence(toClear);
306
307 queued.removeAll(Arrays.asList(candidates));
308 } finally {
309 gcLock.unlock();
310 }
311 }
312
313 public ID[] removeCurrentlyTracked(ID[] candidateIds) {
314 List<ID> toClearIds = new ArrayList<ID>();
315 for(ID id : candidateIds) {
316 if(!trackedIDFactory.contains(id))
317 toClearIds.add(id);
318 }
319
320 return toClearIds.toArray(new ID[0]);
321 }
322 }
323
324 public void internalClearRererence(ID[] toClear) {
325 super.referenceCleared(refClearedClientId, toClear);
326 }
327
328
329
330
331
332
333 public void shutdown() {
334 clearingThread.interrupt();
335 }
336 }