Yuta HIGUCHI

Wrapper around Hazelcast IMap

Change-Id: I43353b97483353be83b49214970a5fceedaea980
1 +package org.onlab.onos.store.common;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import java.util.ArrayList;
6 +import java.util.Collection;
7 +import java.util.Collections;
8 +import java.util.HashMap;
9 +import java.util.HashSet;
10 +import java.util.IdentityHashMap;
11 +import java.util.Map;
12 +import java.util.Set;
13 +import java.util.concurrent.Future;
14 +import java.util.concurrent.TimeUnit;
15 +
16 +import org.apache.commons.lang3.tuple.Pair;
17 +import org.onlab.onos.store.serializers.StoreSerializer;
18 +
19 +import com.google.common.base.Function;
20 +import com.google.common.util.concurrent.Futures;
21 +import com.hazelcast.core.EntryEvent;
22 +import com.hazelcast.core.EntryListener;
23 +import com.hazelcast.core.EntryView;
24 +import com.hazelcast.core.ExecutionCallback;
25 +import com.hazelcast.core.IMap;
26 +import com.hazelcast.core.MapEvent;
27 +import com.hazelcast.map.EntryProcessor;
28 +import com.hazelcast.map.MapInterceptor;
29 +import com.hazelcast.mapreduce.JobTracker;
30 +import com.hazelcast.mapreduce.aggregation.Aggregation;
31 +import com.hazelcast.mapreduce.aggregation.Supplier;
32 +import com.hazelcast.monitor.LocalMapStats;
33 +import com.hazelcast.query.Predicate;
34 +
35 +// TODO: implement Predicate, etc. if we need them.
36 +/**
37 + * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
38 + * Key and Value using StoreSerializer.
39 + *
40 + * @param <K> key type
41 + * @param <V> value type
42 + */
43 +public class SMap<K, V> implements IMap<K, V> {
44 +
45 + private final IMap<byte[], byte[]> m;
46 + private final StoreSerializer serializer;
47 +
48 + /**
49 + * Creates a SMap instance.
50 + *
51 + * @param baseMap base IMap to use
52 + * @param serializer serializer to use for both key and value
53 + */
54 + public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
55 + this.m = checkNotNull(baseMap);
56 + this.serializer = checkNotNull(serializer);
57 + }
58 +
59 + @Override
60 + public int size() {
61 + return m.size();
62 + }
63 +
64 + @Override
65 + public boolean isEmpty() {
66 + return m.isEmpty();
67 + }
68 +
69 + @Override
70 + public void putAll(Map<? extends K, ? extends V> map) {
71 + Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
72 + for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
73 + sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
74 + }
75 + m.putAll(sm);
76 + }
77 +
78 + @Deprecated
79 + @Override
80 + public Object getId() {
81 + return m.getId();
82 + }
83 +
84 + @Override
85 + public String getPartitionKey() {
86 + return m.getPartitionKey();
87 + }
88 +
89 + @Override
90 + public String getName() {
91 + return m.getName();
92 + }
93 +
94 + @Override
95 + public String getServiceName() {
96 + return m.getServiceName();
97 + }
98 +
99 + @Override
100 + public void destroy() {
101 + m.destroy();
102 + }
103 +
104 + @Override
105 + public boolean containsKey(Object key) {
106 + return m.containsKey(serializeKey(key));
107 + }
108 +
109 + @Override
110 + public boolean containsValue(Object value) {
111 + return m.containsValue(serializeVal(value));
112 + }
113 +
114 + @Override
115 + public V get(Object key) {
116 + return deserializeVal(m.get(serializeKey(key)));
117 + }
118 +
119 + @Override
120 + public V put(K key, V value) {
121 + return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
122 + }
123 +
124 + @Override
125 + public V remove(Object key) {
126 + return deserializeVal(m.remove(serializeKey(key)));
127 + }
128 +
129 + @Override
130 + public boolean remove(Object key, Object value) {
131 + return m.remove(serializeKey(key), serializeVal(value));
132 + }
133 +
134 + @Override
135 + public void delete(Object key) {
136 + m.delete(serializeKey(key));
137 + }
138 +
139 + @Override
140 + public void flush() {
141 + m.flush();
142 + }
143 +
144 + @Override
145 + public Map<K, V> getAll(Set<K> keys) {
146 + Set<byte[]> sk = serializeKeySet(keys);
147 + Map<byte[], byte[]> bm = m.getAll(sk);
148 + Map<K, V> dsm = new HashMap<>(bm.size());
149 + for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
150 + dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
151 + }
152 + return dsm;
153 + }
154 +
155 + @Override
156 + public void loadAll(boolean replaceExistingValues) {
157 + m.loadAll(replaceExistingValues);
158 + }
159 +
160 + @Override
161 + public void loadAll(Set<K> keys, boolean replaceExistingValues) {
162 + Set<byte[]> sk = serializeKeySet(keys);
163 + m.loadAll(sk, replaceExistingValues);
164 + }
165 +
166 + @Override
167 + public void clear() {
168 + m.clear();
169 + }
170 +
171 + @Override
172 + public Future<V> getAsync(K key) {
173 + Future<byte[]> f = m.getAsync(serializeKey(key));
174 + return Futures.lazyTransform(f, new DeserializeVal());
175 + }
176 +
177 + @Override
178 + public Future<V> putAsync(K key, V value) {
179 + Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
180 + return Futures.lazyTransform(f, new DeserializeVal());
181 + }
182 +
183 + @Override
184 + public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
185 + Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
186 + return Futures.lazyTransform(f, new DeserializeVal());
187 + }
188 +
189 + @Override
190 + public Future<V> removeAsync(K key) {
191 + Future<byte[]> f = m.removeAsync(serializeKey(key));
192 + return Futures.lazyTransform(f, new DeserializeVal());
193 + }
194 +
195 + @Override
196 + public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
197 + return m.tryRemove(serializeKey(key), timeout, timeunit);
198 + }
199 +
200 + @Override
201 + public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
202 + return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
203 + }
204 +
205 + @Override
206 + public V put(K key, V value, long ttl, TimeUnit timeunit) {
207 + return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
208 + }
209 +
210 + @Override
211 + public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
212 + m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
213 + }
214 +
215 + @Override
216 + public V putIfAbsent(K key, V value) {
217 + return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
218 + }
219 +
220 + @Override
221 + public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
222 + return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
223 + }
224 +
225 + @Override
226 + public boolean replace(K key, V oldValue, V newValue) {
227 + return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
228 + }
229 +
230 + @Override
231 + public V replace(K key, V value) {
232 + return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
233 + }
234 +
235 + @Override
236 + public void set(K key, V value) {
237 + m.set(serializeKey(key), serializeVal(value));
238 + }
239 +
240 + @Override
241 + public void set(K key, V value, long ttl, TimeUnit timeunit) {
242 + m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
243 + }
244 +
245 + @Override
246 + public void lock(K key) {
247 + m.lock(serializeKey(key));
248 + }
249 +
250 + @Override
251 + public void lock(K key, long leaseTime, TimeUnit timeUnit) {
252 + m.lock(serializeKey(key), leaseTime, timeUnit);
253 + }
254 +
255 + @Override
256 + public boolean isLocked(K key) {
257 + return m.isLocked(serializeKey(key));
258 + }
259 +
260 + @Override
261 + public boolean tryLock(K key) {
262 + return m.tryLock(serializeKey(key));
263 + }
264 +
265 + @Override
266 + public boolean tryLock(K key, long time, TimeUnit timeunit)
267 + throws InterruptedException {
268 + return m.tryLock(serializeKey(key), time, timeunit);
269 + }
270 +
271 + @Override
272 + public void unlock(K key) {
273 + m.unlock(serializeKey(key));
274 + }
275 +
276 + @Override
277 + public void forceUnlock(K key) {
278 + m.forceUnlock(serializeKey(key));
279 + }
280 +
281 + @Override
282 + public String addLocalEntryListener(EntryListener<K, V> listener) {
283 + return m.addLocalEntryListener(new BaseEntryListener(listener));
284 + }
285 +
286 + @Deprecated // marking method not implemented
287 + @Override
288 + public String addLocalEntryListener(EntryListener<K, V> listener,
289 + Predicate<K, V> predicate, boolean includeValue) {
290 + throw new UnsupportedOperationException();
291 + }
292 +
293 + @Deprecated // marking method not implemented
294 + @Override
295 + public String addLocalEntryListener(EntryListener<K, V> listener,
296 + Predicate<K, V> predicate, K key, boolean includeValue) {
297 + throw new UnsupportedOperationException();
298 + }
299 +
300 + @Deprecated // marking method not implemented
301 + @Override
302 + public String addInterceptor(MapInterceptor interceptor) {
303 + throw new UnsupportedOperationException();
304 + }
305 +
306 + @Override
307 + public void removeInterceptor(String id) {
308 + m.removeInterceptor(id);
309 + }
310 +
311 + @Override
312 + public String addEntryListener(EntryListener<K, V> listener,
313 + boolean includeValue) {
314 + return m.addEntryListener(new BaseEntryListener(listener), includeValue);
315 + }
316 +
317 + @Override
318 + public boolean removeEntryListener(String id) {
319 + return m.removeEntryListener(id);
320 + }
321 +
322 + @Override
323 + public String addEntryListener(EntryListener<K, V> listener, K key,
324 + boolean includeValue) {
325 + return m.addEntryListener(new BaseEntryListener(listener),
326 + serializeKey(key), includeValue);
327 + }
328 +
329 + @Deprecated // marking method not implemented
330 + @Override
331 + public String addEntryListener(EntryListener<K, V> listener,
332 + Predicate<K, V> predicate, boolean includeValue) {
333 + throw new UnsupportedOperationException();
334 + }
335 +
336 + @Deprecated // marking method not implemented
337 + @Override
338 + public String addEntryListener(EntryListener<K, V> listener,
339 + Predicate<K, V> predicate, K key, boolean includeValue) {
340 + throw new UnsupportedOperationException();
341 + }
342 +
343 + @Deprecated // marking method not implemented
344 + @Override
345 + public EntryView<K, V> getEntryView(K key) {
346 + throw new UnsupportedOperationException();
347 + }
348 +
349 + @Override
350 + public boolean evict(K key) {
351 + return m.evict(serializeKey(key));
352 + }
353 +
354 + @Override
355 + public void evictAll() {
356 + m.evictAll();
357 + }
358 +
359 + @Override
360 + public Set<K> keySet() {
361 + return deserializeKeySet(m.keySet());
362 + }
363 +
364 + @Override
365 + public Collection<V> values() {
366 + return deserializeVal(m.values());
367 + }
368 +
369 + @Override
370 + public Set<java.util.Map.Entry<K, V>> entrySet() {
371 + return deserializeEntrySet(m.entrySet());
372 + }
373 +
374 + @Deprecated // marking method not implemented
375 + @SuppressWarnings("rawtypes")
376 + @Override
377 + public Set<K> keySet(Predicate predicate) {
378 + throw new UnsupportedOperationException();
379 + }
380 +
381 + @Deprecated // marking method not implemented
382 + @SuppressWarnings("rawtypes")
383 + @Override
384 + public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
385 + throw new UnsupportedOperationException();
386 + }
387 +
388 + @Deprecated // marking method not implemented
389 + @SuppressWarnings("rawtypes")
390 + @Override
391 + public Collection<V> values(Predicate predicate) {
392 + throw new UnsupportedOperationException();
393 + }
394 +
395 + @Override
396 + public Set<K> localKeySet() {
397 + return deserializeKeySet(m.localKeySet());
398 + }
399 +
400 + @Deprecated // marking method not implemented
401 + @SuppressWarnings("rawtypes")
402 + @Override
403 + public Set<K> localKeySet(Predicate predicate) {
404 + throw new UnsupportedOperationException();
405 + }
406 +
407 + @Deprecated // marking method not implemented
408 + @Override
409 + public void addIndex(String attribute, boolean ordered) {
410 + throw new UnsupportedOperationException();
411 + }
412 +
413 + @Override
414 + public LocalMapStats getLocalMapStats() {
415 + return m.getLocalMapStats();
416 + }
417 +
418 + @Deprecated // marking method not implemented
419 + @SuppressWarnings("rawtypes")
420 + @Override
421 + public Object executeOnKey(K key, EntryProcessor entryProcessor) {
422 + throw new UnsupportedOperationException();
423 + }
424 +
425 + @Deprecated // marking method not implemented
426 + @SuppressWarnings("rawtypes")
427 + @Override
428 + public Map<K, Object> executeOnKeys(Set<K> keys,
429 + EntryProcessor entryProcessor) {
430 + throw new UnsupportedOperationException();
431 + }
432 +
433 + @Deprecated // marking method not implemented
434 + @SuppressWarnings("rawtypes")
435 + @Override
436 + public void submitToKey(K key, EntryProcessor entryProcessor,
437 + ExecutionCallback callback) {
438 + throw new UnsupportedOperationException();
439 + }
440 +
441 + @Deprecated // marking method not implemented
442 + @SuppressWarnings("rawtypes")
443 + @Override
444 + public Future submitToKey(K key, EntryProcessor entryProcessor) {
445 + throw new UnsupportedOperationException();
446 + }
447 +
448 + @Deprecated // marking method not implemented
449 + @SuppressWarnings("rawtypes")
450 + @Override
451 + public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
452 + throw new UnsupportedOperationException();
453 + }
454 +
455 + @Deprecated // marking method not implemented
456 + @SuppressWarnings("rawtypes")
457 + @Override
458 + public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
459 + Predicate predicate) {
460 + throw new UnsupportedOperationException();
461 + }
462 +
463 + @Deprecated // marking method not implemented
464 + @Override
465 + public <SuppliedValue, Result> Result aggregate(
466 + Supplier<K, V, SuppliedValue> supplier,
467 + Aggregation<K, SuppliedValue, Result> aggregation) {
468 +
469 + throw new UnsupportedOperationException();
470 + }
471 +
472 + @Deprecated // marking method not implemented
473 + @Override
474 + public <SuppliedValue, Result> Result aggregate(
475 + Supplier<K, V, SuppliedValue> supplier,
476 + Aggregation<K, SuppliedValue, Result> aggregation,
477 + JobTracker jobTracker) {
478 +
479 + throw new UnsupportedOperationException();
480 + }
481 +
482 + private byte[] serializeKey(Object key) {
483 + return serializer.encode(key);
484 + }
485 +
486 + private K deserializeKey(byte[] key) {
487 + return serializer.decode(key);
488 + }
489 +
490 + private byte[] serializeVal(Object val) {
491 + return serializer.encode(val);
492 + }
493 +
494 + private V deserializeVal(byte[] val) {
495 + return serializer.decode(val);
496 + }
497 +
498 + private Set<byte[]> serializeKeySet(Set<K> keys) {
499 + Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
500 + for (K key : keys) {
501 + sk.add(serializeKey(key));
502 + }
503 + return sk;
504 + }
505 +
506 + private Set<K> deserializeKeySet(Set<byte[]> keys) {
507 + Set<K> dsk = new HashSet<>(keys.size());
508 + for (byte[] key : keys) {
509 + dsk.add(deserializeKey(key));
510 + }
511 + return dsk;
512 + }
513 +
514 + private Collection<V> deserializeVal(Collection<byte[]> vals) {
515 + Collection<V> dsl = new ArrayList<>(vals.size());
516 + for (byte[] val : vals) {
517 + dsl.add(deserializeVal(val));
518 + }
519 + return dsl;
520 + }
521 +
522 + private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
523 + Set<java.util.Map.Entry<byte[], byte[]>> entries) {
524 +
525 + Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
526 + for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
527 + dse.add(Pair.of(deserializeKey(entry.getKey()),
528 + deserializeVal(entry.getValue())));
529 + }
530 + return dse;
531 + }
532 +
533 + private final class BaseEntryListener
534 + implements EntryListener<byte[], byte[]> {
535 +
536 + private final EntryListener<K, V> listener;
537 +
538 + public BaseEntryListener(EntryListener<K, V> listener) {
539 + this.listener = listener;
540 + }
541 +
542 + @Override
543 + public void mapEvicted(MapEvent event) {
544 + listener.mapEvicted(event);
545 + }
546 +
547 + @Override
548 + public void mapCleared(MapEvent event) {
549 + listener.mapCleared(event);
550 + }
551 +
552 + @Override
553 + public void entryUpdated(EntryEvent<byte[], byte[]> event) {
554 + EntryEvent<K, V> evt = new EntryEvent<K, V>(
555 + event.getSource(),
556 + event.getMember(),
557 + event.getEventType().getType(),
558 + deserializeKey(event.getKey()),
559 + deserializeVal(event.getOldValue()),
560 + deserializeVal(event.getValue()));
561 +
562 + listener.entryUpdated(evt);
563 + }
564 +
565 + @Override
566 + public void entryRemoved(EntryEvent<byte[], byte[]> event) {
567 + EntryEvent<K, V> evt = new EntryEvent<K, V>(
568 + event.getSource(),
569 + event.getMember(),
570 + event.getEventType().getType(),
571 + deserializeKey(event.getKey()),
572 + deserializeVal(event.getOldValue()),
573 + null);
574 +
575 + listener.entryRemoved(evt);
576 + }
577 +
578 + @Override
579 + public void entryEvicted(EntryEvent<byte[], byte[]> event) {
580 + EntryEvent<K, V> evt = new EntryEvent<K, V>(
581 + event.getSource(),
582 + event.getMember(),
583 + event.getEventType().getType(),
584 + deserializeKey(event.getKey()),
585 + deserializeVal(event.getOldValue()),
586 + deserializeVal(event.getValue()));
587 +
588 + listener.entryEvicted(evt);
589 + }
590 +
591 + @Override
592 + public void entryAdded(EntryEvent<byte[], byte[]> event) {
593 + EntryEvent<K, V> evt = new EntryEvent<K, V>(
594 + event.getSource(),
595 + event.getMember(),
596 + event.getEventType().getType(),
597 + deserializeKey(event.getKey()),
598 + null,
599 + deserializeVal(event.getValue()));
600 +
601 + listener.entryAdded(evt);
602 + }
603 + }
604 +
605 + private final class DeserializeVal implements Function<byte[], V> {
606 + @Override
607 + public V apply(byte[] input) {
608 + return deserializeVal(input);
609 + }
610 + }
611 +
612 +}