Jonathan Hart

Unit tests for EventuallyConsistentMapImpl.

Most functionality is tested here, except for the anti-entropy code.

ONOS-859.

Change-Id: Ib9e83518f8a91d599364106bc0f7d869e62f5133
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
15 */ 15 */
16 package org.onosproject.store.cluster.messaging; 16 package org.onosproject.store.cluster.messaging;
17 17
18 -import java.io.IOException; 18 +import com.google.common.base.MoreObjects;
19 -
20 -import org.onosproject.cluster.NodeId;
21 import org.onlab.util.ByteArraySizeHashPrinter; 19 import org.onlab.util.ByteArraySizeHashPrinter;
20 +import org.onosproject.cluster.NodeId;
22 21
23 -import com.google.common.base.MoreObjects; 22 +import java.io.IOException;
23 +import java.util.Arrays;
24 +import java.util.Objects;
24 25
25 // TODO: Should payload type be ByteBuffer? 26 // TODO: Should payload type be ByteBuffer?
26 /** 27 /**
...@@ -79,7 +80,7 @@ public class ClusterMessage { ...@@ -79,7 +80,7 @@ public class ClusterMessage {
79 * @throws IOException when I/O exception of some sort has occurred 80 * @throws IOException when I/O exception of some sort has occurred
80 */ 81 */
81 public void respond(byte[] data) throws IOException { 82 public void respond(byte[] data) throws IOException {
82 - throw new IllegalStateException("One can only repond to message recived from others."); 83 + throw new IllegalStateException("One can only respond to message received from others.");
83 } 84 }
84 85
85 @Override 86 @Override
...@@ -90,4 +91,22 @@ public class ClusterMessage { ...@@ -90,4 +91,22 @@ public class ClusterMessage {
90 .add("payload", ByteArraySizeHashPrinter.of(payload)) 91 .add("payload", ByteArraySizeHashPrinter.of(payload))
91 .toString(); 92 .toString();
92 } 93 }
94 +
95 + @Override
96 + public boolean equals(Object o) {
97 + if (!(o instanceof ClusterMessage)) {
98 + return false;
99 + }
100 +
101 + ClusterMessage that = (ClusterMessage) o;
102 +
103 + return Objects.equals(this.sender, that.sender) &&
104 + Objects.equals(this.subject, that.subject) &&
105 + Arrays.equals(this.payload, that.payload);
106 + }
107 +
108 + @Override
109 + public int hashCode() {
110 + return Objects.hash(sender, subject, payload);
111 + }
93 } 112 }
......
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
15 */ 15 */
16 package org.onosproject.store.impl; 16 package org.onosproject.store.impl;
17 17
18 +import com.google.common.base.MoreObjects;
19 +
20 +import java.util.Objects;
21 +
18 /** 22 /**
19 * Event object signalling that the map was modified. 23 * Event object signalling that the map was modified.
20 */ 24 */
...@@ -68,4 +72,30 @@ public class EventuallyConsistentMapEvent<K, V> { ...@@ -68,4 +72,30 @@ public class EventuallyConsistentMapEvent<K, V> {
68 public V value() { 72 public V value() {
69 return value; 73 return value;
70 } 74 }
75 +
76 + @Override
77 + public boolean equals(Object o) {
78 + if (!(o instanceof EventuallyConsistentMapEvent)) {
79 + return false;
80 + }
81 +
82 + EventuallyConsistentMapEvent that = (EventuallyConsistentMapEvent) o;
83 + return Objects.equals(this.type, that.type) &&
84 + Objects.equals(this.key, that.key) &&
85 + Objects.equals(this.value, that.value);
86 + }
87 +
88 + @Override
89 + public int hashCode() {
90 + return Objects.hash(type, key, value);
91 + }
92 +
93 + @Override
94 + public String toString() {
95 + return MoreObjects.toStringHelper(getClass())
96 + .add("type", type)
97 + .add("key", key)
98 + .add("value", value)
99 + .toString();
100 + }
71 } 101 }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.impl; 16 package org.onosproject.store.impl;
17 17
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 +import com.google.common.collect.ImmutableList;
19 import org.apache.commons.lang3.RandomUtils; 20 import org.apache.commons.lang3.RandomUtils;
20 import org.onlab.util.KryoNamespace; 21 import org.onlab.util.KryoNamespace;
21 import org.onosproject.cluster.ClusterService; 22 import org.onosproject.cluster.ClusterService;
...@@ -33,11 +34,11 @@ import org.slf4j.LoggerFactory; ...@@ -33,11 +34,11 @@ import org.slf4j.LoggerFactory;
33 import java.io.IOException; 34 import java.io.IOException;
34 import java.util.ArrayList; 35 import java.util.ArrayList;
35 import java.util.Collection; 36 import java.util.Collection;
36 -import java.util.Collections;
37 import java.util.HashMap; 37 import java.util.HashMap;
38 import java.util.LinkedList; 38 import java.util.LinkedList;
39 import java.util.List; 39 import java.util.List;
40 import java.util.Map; 40 import java.util.Map;
41 +import java.util.Objects;
41 import java.util.Set; 42 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.CopyOnWriteArraySet; 44 import java.util.concurrent.CopyOnWriteArraySet;
...@@ -290,13 +291,15 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -290,13 +291,15 @@ public class EventuallyConsistentMapImpl<K, V>
290 } 291 }
291 } 292 }
292 293
293 - notifyPeers(new InternalPutEvent<>(updates)); 294 + if (!updates.isEmpty()) {
295 + notifyPeers(new InternalPutEvent<>(updates));
294 296
295 - for (PutEntry<K, V> entry : updates) { 297 + for (PutEntry<K, V> entry : updates) {
296 - EventuallyConsistentMapEvent<K, V> externalEvent = 298 + EventuallyConsistentMapEvent<K, V> externalEvent = new EventuallyConsistentMapEvent<>(
297 - new EventuallyConsistentMapEvent<>( 299 + EventuallyConsistentMapEvent.Type.PUT, entry.key(),
298 - EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value()); 300 + entry.value());
299 - notifyListeners(externalEvent); 301 + notifyListeners(externalEvent);
302 + }
300 } 303 }
301 } 304 }
302 305
...@@ -314,13 +317,16 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -314,13 +317,16 @@ public class EventuallyConsistentMapImpl<K, V>
314 } 317 }
315 } 318 }
316 319
317 - notifyPeers(new InternalRemoveEvent<>(removed)); 320 + if (!removed.isEmpty()) {
321 + notifyPeers(new InternalRemoveEvent<>(removed));
318 322
319 - for (RemoveEntry<K> entry : removed) { 323 + for (RemoveEntry<K> entry : removed) {
320 - EventuallyConsistentMapEvent<K, V> externalEvent = 324 + EventuallyConsistentMapEvent<K, V> externalEvent
321 - new EventuallyConsistentMapEvent<>( 325 + = new EventuallyConsistentMapEvent<>(
322 - EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null); 326 + EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
323 - notifyListeners(externalEvent); 327 + null);
328 + notifyListeners(externalEvent);
329 + }
324 } 330 }
325 } 331 }
326 332
...@@ -370,8 +376,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -370,8 +376,11 @@ public class EventuallyConsistentMapImpl<K, V>
370 executor.shutdown(); 376 executor.shutdown();
371 backgroundExecutor.shutdown(); 377 backgroundExecutor.shutdown();
372 378
379 + listeners.clear();
380 +
373 clusterCommunicator.removeSubscriber(updateMessageSubject); 381 clusterCommunicator.removeSubscriber(updateMessageSubject);
374 clusterCommunicator.removeSubscriber(removeMessageSubject); 382 clusterCommunicator.removeSubscriber(removeMessageSubject);
383 + clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
375 } 384 }
376 385
377 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) { 386 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
...@@ -430,6 +439,23 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -430,6 +439,23 @@ public class EventuallyConsistentMapImpl<K, V>
430 public V setValue(V value) { 439 public V setValue(V value) {
431 throw new UnsupportedOperationException(); 440 throw new UnsupportedOperationException();
432 } 441 }
442 +
443 + @Override
444 + public boolean equals(Object o) {
445 + if (!(o instanceof Map.Entry)) {
446 + return false;
447 + }
448 +
449 + Map.Entry that = (Map.Entry) o;
450 +
451 + return Objects.equals(this.key, that.getKey()) &&
452 + Objects.equals(this.value, that.getValue());
453 + }
454 +
455 + @Override
456 + public int hashCode() {
457 + return Objects.hash(key, value);
458 + }
433 } 459 }
434 460
435 private final class SendAdvertisementTask implements Runnable { 461 private final class SendAdvertisementTask implements Runnable {
...@@ -728,12 +754,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -728,12 +754,11 @@ public class EventuallyConsistentMapImpl<K, V>
728 } 754 }
729 } 755 }
730 756
731 - private static final class InternalPutEvent<K, V> { 757 + static final class InternalPutEvent<K, V> {
732 private final List<PutEntry<K, V>> entries; 758 private final List<PutEntry<K, V>> entries;
733 759
734 public InternalPutEvent(K key, V value, Timestamp timestamp) { 760 public InternalPutEvent(K key, V value, Timestamp timestamp) {
735 - entries = Collections 761 + entries = ImmutableList.of(new PutEntry<>(key, value, timestamp));
736 - .singletonList(new PutEntry<>(key, value, timestamp));
737 } 762 }
738 763
739 public InternalPutEvent(List<PutEntry<K, V>> entries) { 764 public InternalPutEvent(List<PutEntry<K, V>> entries) {
...@@ -751,7 +776,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -751,7 +776,7 @@ public class EventuallyConsistentMapImpl<K, V>
751 } 776 }
752 } 777 }
753 778
754 - private static final class PutEntry<K, V> { 779 + static final class PutEntry<K, V> {
755 private final K key; 780 private final K key;
756 private final V value; 781 private final V value;
757 private final Timestamp timestamp; 782 private final Timestamp timestamp;
...@@ -791,12 +816,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -791,12 +816,11 @@ public class EventuallyConsistentMapImpl<K, V>
791 } 816 }
792 } 817 }
793 818
794 - private static final class InternalRemoveEvent<K> { 819 + static final class InternalRemoveEvent<K> {
795 private final List<RemoveEntry<K>> entries; 820 private final List<RemoveEntry<K>> entries;
796 821
797 public InternalRemoveEvent(K key, Timestamp timestamp) { 822 public InternalRemoveEvent(K key, Timestamp timestamp) {
798 - entries = Collections.singletonList( 823 + entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
799 - new RemoveEntry<>(key, timestamp));
800 } 824 }
801 825
802 public InternalRemoveEvent(List<RemoveEntry<K>> entries) { 826 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
...@@ -814,7 +838,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -814,7 +838,7 @@ public class EventuallyConsistentMapImpl<K, V>
814 } 838 }
815 } 839 }
816 840
817 - private static final class RemoveEntry<K> { 841 + static final class RemoveEntry<K> {
818 private final K key; 842 private final K key;
819 private final Timestamp timestamp; 843 private final Timestamp timestamp;
820 844
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.impl;
17 +
18 +import com.google.common.collect.ComparisonChain;
19 +import com.google.common.util.concurrent.ListenableFuture;
20 +import org.junit.After;
21 +import org.junit.Before;
22 +import org.junit.Test;
23 +import org.onlab.packet.IpAddress;
24 +import org.onlab.util.KryoNamespace;
25 +import org.onosproject.cluster.ClusterService;
26 +import org.onosproject.cluster.ControllerNode;
27 +import org.onosproject.cluster.DefaultControllerNode;
28 +import org.onosproject.cluster.NodeId;
29 +import org.onosproject.store.Timestamp;
30 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31 +import org.onosproject.store.cluster.messaging.ClusterMessage;
32 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
33 +import org.onosproject.store.cluster.messaging.MessageSubject;
34 +import org.onosproject.store.serializers.KryoNamespaces;
35 +import org.onosproject.store.serializers.KryoSerializer;
36 +
37 +import java.io.IOException;
38 +import java.util.ArrayList;
39 +import java.util.HashMap;
40 +import java.util.HashSet;
41 +import java.util.Map;
42 +import java.util.Objects;
43 +import java.util.Set;
44 +import java.util.concurrent.CountDownLatch;
45 +import java.util.concurrent.TimeUnit;
46 +import java.util.concurrent.atomic.AtomicLong;
47 +
48 +import static com.google.common.base.Preconditions.checkArgument;
49 +import static junit.framework.TestCase.assertFalse;
50 +import static org.easymock.EasyMock.*;
51 +import static org.junit.Assert.assertEquals;
52 +import static org.junit.Assert.assertNull;
53 +import static org.junit.Assert.assertTrue;
54 +import static org.junit.Assert.fail;
55 +
56 +/**
57 + * Unit tests for EventuallyConsistentMapImpl.
58 + */
59 +public class EventuallyConsistentMapImplTest {
60 +
61 + private EventuallyConsistentMap<String, String> ecMap;
62 +
63 + private ClusterService clusterService;
64 + private ClusterCommunicationService clusterCommunicator;
65 + private SequentialClockService<String> clockService;
66 +
67 + private static final String MAP_NAME = "test";
68 + private static final MessageSubject PUT_MESSAGE_SUBJECT
69 + = new MessageSubject("ecm-" + MAP_NAME + "-update");
70 + private static final MessageSubject REMOVE_MESSAGE_SUBJECT
71 + = new MessageSubject("ecm-" + MAP_NAME + "-remove");
72 + private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
73 + = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
74 +
75 + private static final String KEY1 = "one";
76 + private static final String KEY2 = "two";
77 + private static final String VALUE1 = "oneValue";
78 + private static final String VALUE2 = "twoValue";
79 +
80 + private final ControllerNode self =
81 + new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
82 +
83 + private ClusterMessageHandler putHandler;
84 + private ClusterMessageHandler removeHandler;
85 + private ClusterMessageHandler antiEntropyHandler;
86 +
87 + /*
88 + * Serialization is a bit tricky here. We need to serialize in the tests
89 + * to set the expectations, which will use this serializer here, but the
90 + * EventuallyConsistentMap will use its own internal serializer. This means
91 + * this serializer must be set up exactly the same as map's internal
92 + * serializer.
93 + */
94 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 + @Override
96 + protected void setupKryoPool() {
97 + serializerPool = KryoNamespace.newBuilder()
98 + // Classes we give to the map
99 + .register(KryoNamespaces.API)
100 + .register(TestTimestamp.class)
101 + // Below is the classes that the map internally registers
102 + .register(WallClockTimestamp.class)
103 + .register(EventuallyConsistentMapImpl.PutEntry.class)
104 + .register(EventuallyConsistentMapImpl.RemoveEntry.class)
105 + .register(ArrayList.class)
106 + .register(EventuallyConsistentMapImpl.InternalPutEvent.class)
107 + .register(EventuallyConsistentMapImpl.InternalRemoveEvent.class)
108 + .register(AntiEntropyAdvertisement.class)
109 + .register(HashMap.class)
110 + .build();
111 + }
112 + };
113 +
114 + @Before
115 + public void setUp() throws Exception {
116 + clusterService = createMock(ClusterService.class);
117 + expect(clusterService.getLocalNode()).andReturn(self)
118 + .anyTimes();
119 + replay(clusterService);
120 +
121 + clusterCommunicator = createMock(ClusterCommunicationService.class);
122 +
123 + // Add expectation for adding cluster message subscribers which
124 + // delegate to our ClusterCommunicationService implementation. This
125 + // allows us to get a reference to the map's internal cluster message
126 + // handlers so we can induce events coming in from a peer.
127 + clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
128 + anyObject(ClusterMessageHandler.class));
129 + expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
130 +
131 + replay(clusterCommunicator);
132 +
133 + clockService = new SequentialClockService<>();
134 +
135 + KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
136 + .register(KryoNamespaces.API)
137 + .register(TestTimestamp.class);
138 +
139 + ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
140 + clusterCommunicator,
141 + serializer, clockService);
142 +
143 + // Reset ready for tests to add their own expectations
144 + reset(clusterCommunicator);
145 + }
146 +
147 + @After
148 + public void tearDown() {
149 + reset(clusterCommunicator);
150 + ecMap.destroy();
151 + }
152 +
153 + @Test
154 + public void testSize() throws Exception {
155 + expectAnyMessage(clusterCommunicator);
156 +
157 + assertEquals(0, ecMap.size());
158 + ecMap.put(KEY1, VALUE1);
159 + assertEquals(1, ecMap.size());
160 + ecMap.put(KEY1, VALUE2);
161 + assertEquals(1, ecMap.size());
162 + ecMap.put(KEY2, VALUE2);
163 + assertEquals(2, ecMap.size());
164 + for (int i = 0; i < 10; i++) {
165 + ecMap.put("" + i, "" + i);
166 + }
167 + assertEquals(12, ecMap.size());
168 + ecMap.remove(KEY1);
169 + assertEquals(11, ecMap.size());
170 + ecMap.remove(KEY1);
171 + assertEquals(11, ecMap.size());
172 + }
173 +
174 + @Test
175 + public void testIsEmpty() throws Exception {
176 + expectAnyMessage(clusterCommunicator);
177 +
178 + assertTrue(ecMap.isEmpty());
179 + ecMap.put(KEY1, VALUE1);
180 + assertFalse(ecMap.isEmpty());
181 + ecMap.remove(KEY1);
182 + assertTrue(ecMap.isEmpty());
183 + }
184 +
185 + @Test
186 + public void testContainsKey() throws Exception {
187 + expectAnyMessage(clusterCommunicator);
188 +
189 + assertFalse(ecMap.containsKey(KEY1));
190 + ecMap.put(KEY1, VALUE1);
191 + assertTrue(ecMap.containsKey(KEY1));
192 + assertFalse(ecMap.containsKey(KEY2));
193 + ecMap.remove(KEY1);
194 + assertFalse(ecMap.containsKey(KEY1));
195 + }
196 +
197 + @Test
198 + public void testContainsValue() throws Exception {
199 + expectAnyMessage(clusterCommunicator);
200 +
201 + assertFalse(ecMap.containsValue(VALUE1));
202 + ecMap.put(KEY1, VALUE1);
203 + assertTrue(ecMap.containsValue(VALUE1));
204 + assertFalse(ecMap.containsValue(VALUE2));
205 + ecMap.put(KEY1, VALUE2);
206 + assertFalse(ecMap.containsValue(VALUE1));
207 + assertTrue(ecMap.containsValue(VALUE2));
208 + ecMap.remove(KEY1);
209 + assertFalse(ecMap.containsValue(VALUE2));
210 + }
211 +
212 + @Test
213 + public void testGet() throws Exception {
214 + expectAnyMessage(clusterCommunicator);
215 +
216 + CountDownLatch latch;
217 +
218 + // Local put
219 + assertNull(ecMap.get(KEY1));
220 + ecMap.put(KEY1, VALUE1);
221 + assertEquals(VALUE1, ecMap.get(KEY1));
222 +
223 + // Remote put
224 + ClusterMessage message
225 + = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2));
226 +
227 + // Create a latch so we know when the put operation has finished
228 + latch = new CountDownLatch(1);
229 + ecMap.addListener(new TestListener(latch));
230 +
231 + assertNull(ecMap.get(KEY2));
232 + putHandler.handle(message);
233 + assertTrue("External listener never got notified of internal event",
234 + latch.await(100, TimeUnit.MILLISECONDS));
235 + assertEquals(VALUE2, ecMap.get(KEY2));
236 +
237 + // Local remove
238 + ecMap.remove(KEY2);
239 + assertNull(ecMap.get(KEY2));
240 +
241 + // Remote remove
242 + ClusterMessage removeMessage
243 + = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1));
244 +
245 + // Create a latch so we know when the remove operation has finished
246 + latch = new CountDownLatch(1);
247 + ecMap.addListener(new TestListener(latch));
248 +
249 + removeHandler.handle(removeMessage);
250 + assertTrue("External listener never got notified of internal event",
251 + latch.await(100, TimeUnit.MILLISECONDS));
252 + assertNull(ecMap.get(KEY1));
253 + }
254 +
255 + @Test
256 + public void testPut() throws Exception {
257 + // Set up expectations of external events to be sent to listeners during
258 + // the test. These don't use timestamps so we can set them all up at once.
259 + EventuallyConsistentMapListener<String, String> listener
260 + = createMock(EventuallyConsistentMapListener.class);
261 + listener.event(new EventuallyConsistentMapEvent<>(
262 + EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
263 + listener.event(new EventuallyConsistentMapEvent<>(
264 + EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
265 + replay(listener);
266 +
267 + ecMap.addListener(listener);
268 +
269 + // Set up expected internal message to be broadcast to peers on first put
270 + expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
271 + .peekAtNextTimestamp()), clusterCommunicator);
272 +
273 + // Put first value
274 + assertNull(ecMap.get(KEY1));
275 + ecMap.put(KEY1, VALUE1);
276 + assertEquals(VALUE1, ecMap.get(KEY1));
277 +
278 + verify(clusterCommunicator);
279 +
280 + // Set up expected internal message to be broadcast to peers on second put
281 + expectSpecificMessage(generatePutMessage(
282 + KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
283 +
284 + // Update same key to a new value
285 + ecMap.put(KEY1, VALUE2);
286 + assertEquals(VALUE2, ecMap.get(KEY1));
287 +
288 + verify(clusterCommunicator);
289 +
290 + // Do a put with a older timestamp than the value already there.
291 + // The map data should not be changed and no notifications should be sent.
292 + reset(clusterCommunicator);
293 + replay(clusterCommunicator);
294 +
295 + clockService.turnBackTime();
296 + ecMap.put(KEY1, VALUE1);
297 + // Value should not have changed.
298 + assertEquals(VALUE2, ecMap.get(KEY1));
299 +
300 + verify(clusterCommunicator);
301 +
302 + // Check that our listener received the correct events during the test
303 + verify(listener);
304 + }
305 +
306 + @Test
307 + public void testRemove() throws Exception {
308 + // Set up expectations of external events to be sent to listeners during
309 + // the test. These don't use timestamps so we can set them all up at once.
310 + EventuallyConsistentMapListener<String, String> listener
311 + = createMock(EventuallyConsistentMapListener.class);
312 + listener.event(new EventuallyConsistentMapEvent<>(
313 + EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
314 + expectLastCall().times(2);
315 + listener.event(new EventuallyConsistentMapEvent<>(
316 + EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
317 + listener.event(new EventuallyConsistentMapEvent<>(
318 + EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
319 + replay(listener);
320 +
321 + ecMap.addListener(listener);
322 +
323 + // Put in an initial value
324 + expectAnyMessage(clusterCommunicator);
325 + ecMap.put(KEY1, VALUE1);
326 + assertEquals(VALUE1, ecMap.get(KEY1));
327 +
328 + // Remove the value and check the correct internal cluster messages
329 + // are sent
330 + expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
331 + clusterCommunicator);
332 +
333 + ecMap.remove(KEY1);
334 + assertNull(ecMap.get(KEY1));
335 +
336 + verify(clusterCommunicator);
337 +
338 + // Remove the same value again. Even though the value is no longer in
339 + // the map, we expect that the tombstone is updated and another remove
340 + // event is sent to the cluster and external listeners.
341 + expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
342 + clusterCommunicator);
343 +
344 + ecMap.remove(KEY1);
345 + assertNull(ecMap.get(KEY1));
346 +
347 + verify(clusterCommunicator);
348 +
349 +
350 + // Put in a new value for us to try and remove
351 + expectAnyMessage(clusterCommunicator);
352 +
353 + ecMap.put(KEY2, VALUE2);
354 +
355 + clockService.turnBackTime();
356 +
357 + // Remove should have no effect, since it has an older timestamp than
358 + // the put. Expect no notifications to be sent out
359 + reset(clusterCommunicator);
360 + replay(clusterCommunicator);
361 +
362 + ecMap.remove(KEY2);
363 +
364 + verify(clusterCommunicator);
365 +
366 + // Check that our listener received the correct events during the test
367 + verify(listener);
368 + }
369 +
370 + @Test
371 + public void testPutAll() throws Exception {
372 + // putAll() with an empty map is a no-op - no messages will be sent
373 + reset(clusterCommunicator);
374 + replay(clusterCommunicator);
375 +
376 + ecMap.putAll(new HashMap<>());
377 +
378 + verify(clusterCommunicator);
379 +
380 + // Set up the listener with our expected events
381 + EventuallyConsistentMapListener<String, String> listener
382 + = createMock(EventuallyConsistentMapListener.class);
383 + listener.event(new EventuallyConsistentMapEvent<>(
384 + EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
385 + listener.event(new EventuallyConsistentMapEvent<>(
386 + EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
387 + replay(listener);
388 +
389 + ecMap.addListener(listener);
390 +
391 + // Expect a multi-update inter-instance message
392 + expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
393 + clusterCommunicator);
394 +
395 + Map<String, String> putAllValues = new HashMap<>();
396 + putAllValues.put(KEY1, VALUE1);
397 + putAllValues.put(KEY2, VALUE2);
398 +
399 + // Put the values in the map
400 + ecMap.putAll(putAllValues);
401 +
402 + // Check the correct messages and events were sent
403 + verify(clusterCommunicator);
404 + verify(listener);
405 + }
406 +
407 + @Test
408 + public void testClear() throws Exception {
409 + EventuallyConsistentMapListener<String, String> listener
410 + = createMock(EventuallyConsistentMapListener.class);
411 + listener.event(new EventuallyConsistentMapEvent<>(
412 + EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
413 + listener.event(new EventuallyConsistentMapEvent<>(
414 + EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
415 + replay(listener);
416 +
417 + // clear() on an empty map is a no-op - no messages will be sent
418 + reset(clusterCommunicator);
419 + replay(clusterCommunicator);
420 +
421 + assertTrue(ecMap.isEmpty());
422 + ecMap.clear();
423 + verify(clusterCommunicator);
424 +
425 + // Put some items in the map
426 + expectAnyMessage(clusterCommunicator);
427 + ecMap.put(KEY1, VALUE1);
428 + ecMap.put(KEY2, VALUE2);
429 +
430 + ecMap.addListener(listener);
431 + expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
432 +
433 + ecMap.clear();
434 +
435 + verify(clusterCommunicator);
436 + verify(listener);
437 + }
438 +
439 + @Test
440 + public void testKeySet() throws Exception {
441 + expectAnyMessage(clusterCommunicator);
442 +
443 + assertTrue(ecMap.keySet().isEmpty());
444 +
445 + // Generate some keys
446 + Set<String> keys = new HashSet<>();
447 + for (int i = 1; i <= 10; i++) {
448 + keys.add("" + i);
449 + }
450 +
451 + // Put each key in the map
452 + keys.forEach(k -> ecMap.put(k, "value" + k));
453 +
454 + // Check keySet() returns the correct value
455 + assertEquals(keys, ecMap.keySet());
456 +
457 + // Update the value for one of the keys
458 + ecMap.put(keys.iterator().next(), "new-value");
459 +
460 + // Check the key set is still the same
461 + assertEquals(keys, ecMap.keySet());
462 +
463 + // Remove a key
464 + String removeKey = keys.iterator().next();
465 + keys.remove(removeKey);
466 + ecMap.remove(removeKey);
467 +
468 + // Check the key set is still correct
469 + assertEquals(keys, ecMap.keySet());
470 + }
471 +
472 + @Test
473 + public void testValues() throws Exception {
474 + expectAnyMessage(clusterCommunicator);
475 +
476 + assertTrue(ecMap.values().isEmpty());
477 +
478 + // Generate some values
479 + Map<String, String> expectedValues = new HashMap<>();
480 + for (int i = 1; i <= 10; i++) {
481 + expectedValues.put("" + i, "value" + i);
482 + }
483 +
484 + // Add them into the map
485 + expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
486 +
487 + // Check the values collection is correct
488 + assertEquals(expectedValues.values().size(), ecMap.values().size());
489 + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
490 +
491 + // Update the value for one of the keys
492 + Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
493 + expectedValues.put(first.getKey(), "new-value");
494 + ecMap.put(first.getKey(), "new-value");
495 +
496 + // Check the values collection is still correct
497 + assertEquals(expectedValues.values().size(), ecMap.values().size());
498 + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
499 +
500 + // Remove a key
501 + String removeKey = expectedValues.keySet().iterator().next();
502 + expectedValues.remove(removeKey);
503 + ecMap.remove(removeKey);
504 +
505 + // Check the values collection is still correct
506 + assertEquals(expectedValues.values().size(), ecMap.values().size());
507 + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
508 + }
509 +
510 + @Test
511 + public void testEntrySet() throws Exception {
512 + expectAnyMessage(clusterCommunicator);
513 +
514 + assertTrue(ecMap.entrySet().isEmpty());
515 +
516 + // Generate some values
517 + Map<String, String> expectedValues = new HashMap<>();
518 + for (int i = 1; i <= 10; i++) {
519 + expectedValues.put("" + i, "value" + i);
520 + }
521 +
522 + // Add them into the map
523 + expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
524 +
525 + // Check the entry set is correct
526 + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
527 +
528 + // Update the value for one of the keys
529 + Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
530 + expectedValues.put(first.getKey(), "new-value");
531 + ecMap.put(first.getKey(), "new-value");
532 +
533 + // Check the entry set is still correct
534 + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
535 +
536 + // Remove a key
537 + String removeKey = expectedValues.keySet().iterator().next();
538 + expectedValues.remove(removeKey);
539 + ecMap.remove(removeKey);
540 +
541 + // Check the entry set is still correct
542 + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
543 + }
544 +
545 + private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
546 + if (expectedMap.entrySet().size() != actual.size()) {
547 + return false;
548 + }
549 +
550 + for (Map.Entry<String, String> e : actual) {
551 + if (!expectedMap.containsKey(e.getKey())) {
552 + return false;
553 + }
554 + if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
555 + return false;
556 + }
557 + }
558 + return true;
559 + }
560 +
561 + @Test
562 + public void testDestroy() throws Exception {
563 + clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
564 + clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
565 + clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
566 +
567 + replay(clusterCommunicator);
568 +
569 + ecMap.destroy();
570 +
571 + verify(clusterCommunicator);
572 +
573 + try {
574 + ecMap.get(KEY1);
575 + fail("get after destroy should throw exception");
576 + } catch (IllegalStateException e) {
577 + assertTrue(true);
578 + }
579 +
580 + try {
581 + ecMap.put(KEY1, VALUE1);
582 + fail("put after destroy should throw exception");
583 + } catch (IllegalStateException e) {
584 + assertTrue(true);
585 + }
586 + }
587 +
588 + private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
589 + EventuallyConsistentMapImpl.InternalPutEvent<String, String> event =
590 + new EventuallyConsistentMapImpl.InternalPutEvent<>(
591 + key, value, timestamp);
592 +
593 + return new ClusterMessage(
594 + clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
595 + SERIALIZER.encode(event));
596 + }
597 +
598 + private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
599 + ArrayList<EventuallyConsistentMapImpl.PutEntry<String, String>> list = new ArrayList<>();
600 +
601 + Timestamp timestamp1 = clockService.peek(1);
602 + Timestamp timestamp2 = clockService.peek(2);
603 +
604 + EventuallyConsistentMapImpl.PutEntry<String, String> pe1
605 + = new EventuallyConsistentMapImpl.PutEntry<>(key1, value1, timestamp1);
606 + EventuallyConsistentMapImpl.PutEntry<String, String> pe2
607 + = new EventuallyConsistentMapImpl.PutEntry<>(key2, value2, timestamp2);
608 +
609 + list.add(pe1);
610 + list.add(pe2);
611 +
612 + EventuallyConsistentMapImpl.InternalPutEvent<String, String> event
613 + = new EventuallyConsistentMapImpl.InternalPutEvent<>(list);
614 +
615 + return new ClusterMessage(
616 + clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
617 + SERIALIZER.encode(event));
618 + }
619 +
620 + private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
621 + EventuallyConsistentMapImpl.InternalRemoveEvent<String> event =
622 + new EventuallyConsistentMapImpl.InternalRemoveEvent<>(
623 + key, timestamp);
624 +
625 + return new ClusterMessage(
626 + clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
627 + SERIALIZER.encode(event));
628 + }
629 +
630 + private ClusterMessage generateRemoveMessage(String key1, String key2) {
631 + ArrayList<EventuallyConsistentMapImpl.RemoveEntry<String>> list = new ArrayList<>();
632 +
633 + Timestamp timestamp1 = clockService.peek(1);
634 + Timestamp timestamp2 = clockService.peek(2);
635 +
636 + EventuallyConsistentMapImpl.RemoveEntry<String> re1
637 + = new EventuallyConsistentMapImpl.RemoveEntry<>(key1, timestamp1);
638 + EventuallyConsistentMapImpl.RemoveEntry<String> re2
639 + = new EventuallyConsistentMapImpl.RemoveEntry<>(key2, timestamp2);
640 +
641 + list.add(re1);
642 + list.add(re2);
643 +
644 + EventuallyConsistentMapImpl.InternalRemoveEvent<String> event
645 + = new EventuallyConsistentMapImpl.InternalRemoveEvent<>(list);
646 +
647 + return new ClusterMessage(
648 + clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
649 + SERIALIZER.encode(event));
650 + }
651 +
652 + /**
653 + * Sets up a mock ClusterCommunicationService to expect a specific cluster
654 + * message to be broadcast to the cluster.
655 + *
656 + * @param m message we expect to be sent
657 + * @param clusterCommunicator a mock ClusterCommunicationService to set up
658 + */
659 + private static void expectSpecificMessage(ClusterMessage m,
660 + ClusterCommunicationService clusterCommunicator) {
661 + reset(clusterCommunicator);
662 + expect(clusterCommunicator.broadcast(m)).andReturn(true);
663 + replay(clusterCommunicator);
664 + }
665 +
666 + /**
667 + * Sets up a mock ClusterCommunicationService to expect any cluster message
668 + * that is sent to it. This is useful for unit tests where we aren't
669 + * interested in testing the messaging component.
670 + *
671 + * @param clusterCommunicator a mock ClusterCommunicationService to set up
672 + */
673 + private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
674 + reset(clusterCommunicator);
675 + expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
676 + .andReturn(true)
677 + .anyTimes();
678 + replay(clusterCommunicator);
679 + }
680 +
681 + /**
682 + * ClusterCommunicationService implementation that the map's addSubscriber
683 + * call will delegate to. This means we can get a reference to the
684 + * internal cluster message handler used by the map, so that we can simulate
685 + * events coming in from other instances.
686 + */
687 + private final class TestClusterCommunicationService
688 + implements ClusterCommunicationService {
689 +
690 + @Override
691 + public boolean broadcast(ClusterMessage message) {
692 + return false;
693 + }
694 +
695 + @Override
696 + public boolean broadcastIncludeSelf(ClusterMessage message) {
697 + return false;
698 + }
699 +
700 + @Override
701 + public boolean unicast(ClusterMessage message, NodeId toNodeId)
702 + throws IOException {
703 + return false;
704 + }
705 +
706 + @Override
707 + public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
708 + return false;
709 + }
710 +
711 + @Override
712 + public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
713 + NodeId toNodeId)
714 + throws IOException {
715 + return null;
716 + }
717 +
718 + @Override
719 + public void addSubscriber(MessageSubject subject,
720 + ClusterMessageHandler subscriber) {
721 + if (subject.equals(PUT_MESSAGE_SUBJECT)) {
722 + putHandler = subscriber;
723 + } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
724 + removeHandler = subscriber;
725 + } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
726 + antiEntropyHandler = subscriber;
727 + } else {
728 + throw new RuntimeException("Unexpected message subject " + subject.toString());
729 + }
730 + }
731 +
732 + @Override
733 + public void removeSubscriber(MessageSubject subject) {}
734 + }
735 +
736 + /**
737 + * ClockService implementation that gives out timestamps based on a
738 + * sequential counter. This clock service enables more control over the
739 + * timestamps that are given out, including being able to "turn back time"
740 + * to give out timestamps from the past.
741 + *
742 + * @param <T> Type that the clock service will give out timestamps for
743 + */
744 + private class SequentialClockService<T> implements ClockService<T> {
745 +
746 + private static final long INITIAL_VALUE = 1;
747 + private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
748 +
749 + @Override
750 + public Timestamp getTimestamp(T object) {
751 + return new TestTimestamp(counter.getAndIncrement());
752 + }
753 +
754 + /**
755 + * Returns what the next timestamp will be without consuming the
756 + * timestamp. This allows test code to set expectations correctly while
757 + * still allowing the CUT to get the same timestamp.
758 + *
759 + * @return timestamp equal to the timestamp that will be returned by the
760 + * next call to {@link #getTimestamp(T)}.
761 + */
762 + public Timestamp peekAtNextTimestamp() {
763 + return peek(1);
764 + }
765 +
766 + /**
767 + * Returns the ith timestamp to be given out in the future without
768 + * consuming the timestamp. For example, i=1 returns the next timestamp,
769 + * i=2 returns the timestamp after that, and so on.
770 + *
771 + * @param i number of the timestamp to peek at
772 + * @return the ith timestamp that will be given out
773 + */
774 + public Timestamp peek(int i) {
775 + checkArgument(i > 0, "i must be a positive integer");
776 +
777 + return new TestTimestamp(counter.get() + i - 1);
778 + }
779 +
780 + /**
781 + * Turns the clock back two ticks, so the next call to getTimestamp will
782 + * return an older timestamp than the previous call to getTimestamp.
783 + */
784 + public void turnBackTime() {
785 + // Not atomic, but should be OK for these tests.
786 + counter.decrementAndGet();
787 + counter.decrementAndGet();
788 + }
789 +
790 + }
791 +
792 + /**
793 + * Timestamp implementation where the value of the timestamp can be
794 + * specified explicitly at creation time.
795 + */
796 + private class TestTimestamp implements Timestamp {
797 +
798 + private final long timestamp;
799 +
800 + /**
801 + * Creates a new timestamp that has the specified value.
802 + *
803 + * @param timestamp value of the timestamp
804 + */
805 + public TestTimestamp(long timestamp) {
806 + this.timestamp = timestamp;
807 + }
808 +
809 + @Override
810 + public int compareTo(Timestamp o) {
811 + checkArgument(o instanceof TestTimestamp);
812 + TestTimestamp otherTimestamp = (TestTimestamp) o;
813 + return ComparisonChain.start()
814 + .compare(this.timestamp, otherTimestamp.timestamp)
815 + .result();
816 + }
817 + }
818 +
819 + /**
820 + * EventuallyConsistentMapListener implementation which triggers a latch
821 + * when it receives an event.
822 + */
823 + private class TestListener implements EventuallyConsistentMapListener<String, String> {
824 + private CountDownLatch latch;
825 +
826 + /**
827 + * Creates a new listener that will trigger the specified latch when it
828 + * receives and event.
829 + *
830 + * @param latch the latch to trigger on events
831 + */
832 + public TestListener(CountDownLatch latch) {
833 + this.latch = latch;
834 + }
835 +
836 + @Override
837 + public void event(EventuallyConsistentMapEvent<String, String> event) {
838 + latch.countDown();
839 + }
840 + }
841 +}