Madan Jampani
Committed by Gerrit Code Review

Default ConsistentMap consistency level to SEQUENTIAL and reenable all Atomix unit tests

Change-Id: Ic04ff81fbaaa7c007f20077391a72fdfa9fd382a
......@@ -236,12 +236,9 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
if (!mapEventListeners.isEmpty()) {
if (mapEventListeners.add(listener)) {
return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
} else {
mapEventListeners.add(listener);
return CompletableFuture.completedFuture(null);
}
}
mapEventListeners.add(listener);
return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
}
......
......@@ -52,7 +52,7 @@ public final class AtomixConsistentMapCommands {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.LINEARIZABLE;
return ConsistencyLevel.SEQUENTIAL;
}
@Override
......@@ -78,7 +78,7 @@ public final class AtomixConsistentMapCommands {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.BOUNDED_LINEARIZABLE;
return ConsistencyLevel.SEQUENTIAL;
}
@Override
......
......@@ -337,10 +337,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
*
* @param commit unlisten commit
*/
protected void unlisten(
Commit<? extends Unlisten> commit) {
protected void unlisten(Commit<? extends Unlisten> commit) {
try {
Commit<? extends Listen> listener = listeners.remove(commit.session());
Commit<? extends Listen> listener = listeners.remove(commit.session().id());
if (listener != null) {
listener.close();
}
......
......@@ -21,10 +21,11 @@ import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
......@@ -34,12 +35,12 @@ import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
@Ignore
public class AtomixConsistentMapTest extends AtomixTestBase {
@Override
......@@ -285,55 +286,52 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
map.addListener(listener).join();
map.put("foo", value1).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
MapEvent<String, byte[]> event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value1, event.newValue().value()));
// remove listener and verify listener is not notified.
map.removeListener(listener).join();
map.put("foo", value2).join();
assertNull(listener.event());
map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
assertFalse(listener.eventReceived());
// add the listener back and verify UPDATE events are received correctly
map.addListener(listener).join();
map.put("foo", value3).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
listener.clearEvent();
map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value3, event.newValue().value()));
// perform a non-state changing operation and verify no events are received.
map.putIfAbsent("foo", value1).join();
assertNull(listener.event());
assertFalse(listener.eventReceived());
// verify REMOVE events are received correctly.
map.remove("foo").join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.REMOVE, listener.event().type());
assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
listener.clearEvent();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.REMOVE, event.type());
assertTrue(Arrays.equals(value3, event.oldValue().value()));
// verify compute methods also generate events.
map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value1, event.newValue().value()));
map.compute("foo", (k, v) -> value2).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.REMOVE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
listener.clearEvent();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.REMOVE, event.type());
assertTrue(Arrays.equals(value2, event.oldValue().value()));
map.removeListener(listener).join();
}
......@@ -359,7 +357,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
assertNull(listener.event());
assertFalse(listener.eventReceived());
map.size().thenAccept(result -> {
assertTrue(result == 0);
......@@ -376,21 +374,21 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
}
assertNull(listener.event());
assertFalse(listener.eventReceived());
map.commit(tx.transactionId()).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
MapEvent<String, byte[]> event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value1, event.newValue().value()));
map.put("foo", value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
}
protected void transactionRollbackTests(int clusterSize) throws Throwable {
......@@ -412,10 +410,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
assertNull(listener.event());
assertFalse(listener.eventReceived());
map.rollback(tx.transactionId()).join();
assertNull(listener.event());
assertFalse(listener.eventReceived());
map.get("foo").thenAccept(result -> {
assertNull(result);
......@@ -424,27 +422,31 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
map.put("foo", value2).thenAccept(result -> {
assertNull(result);
}).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
MapEvent<String, byte[]> event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
}
private static class TestMapEventListener implements MapEventListener<String, byte[]> {
MapEvent<String, byte[]> event;
private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
@Override
public void event(MapEvent<String, byte[]> event) {
this.event = event;
try {
queue.put(event);
} catch (InterruptedException e) {
Throwables.propagate(e);
}
}
public MapEvent<String, byte[]> event() {
return event;
public boolean eventReceived() {
return !queue.isEmpty();
}
public void clearEvent() {
event = null;
public MapEvent<String, byte[]> event() throws InterruptedException {
return queue.take();
}
}
}
......
......@@ -20,7 +20,6 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
......@@ -35,7 +34,6 @@ import io.atomix.resource.ResourceType;
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
@Ignore
public class AtomixLeaderElectorTest extends AtomixTestBase {
NodeId node1 = new NodeId("node1");
......
......@@ -17,7 +17,6 @@ package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.*;
import org.junit.Ignore;
import org.junit.Test;
import io.atomix.Atomix;
......@@ -27,7 +26,6 @@ import io.atomix.variables.DistributedLong;
/**
* Unit tests for {@link AtomixCounter}.
*/
@Ignore
public class AtomixLongTest extends AtomixTestBase {
@Override
......