HIGUCHI Yuta

Fix for ONOS-4315

- Additional log on error
- Allow count=0 using CountDownCompleter
- test case to detect the issue (@Ignored by default right now)
- other bug fixes found along the way

Based on patch by Madan@China

Change-Id: I7d6cb8c214052859900ef7ee0337a7e1c8a9d295
......@@ -18,6 +18,7 @@ package org.onosproject.store.primitives.resources.impl;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
......@@ -60,7 +61,9 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -72,6 +75,7 @@ import static com.google.common.base.Preconditions.checkState;
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
......@@ -384,7 +388,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
}
MapEntryValue existingValue = mapEntries.get(key);
if (existingValue == null) {
if (update.currentValue() != null) {
if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
} else {
......@@ -399,6 +403,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} catch (Exception e) {
log.warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
if (!ok) {
commit.close();
......@@ -416,6 +423,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
TransactionId transactionId = commit.operation().transactionId();
try {
return commitInternal(transactionId);
} catch (Exception e) {
log.warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -438,12 +448,11 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
checkState(preparedKeys.remove(key), "key is not prepared");
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
checkState(preparedKeys.remove(key), "key is not prepared");
if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
newValue = new TransactionalCommit(key,
versionCounter.incrementAndGet(), completer);
newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
}
eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
if (newValue != null) {
......
......@@ -16,6 +16,8 @@
package org.onosproject.store.primitives.resources.impl;
import io.atomix.resource.ResourceType;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.Arrays;
......@@ -348,6 +350,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
map.addListener(listener).join();
// PUT_IF_ABSENT
MapUpdate<String, byte[]> update1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey("foo")
......@@ -359,6 +362,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
// verify changes in Tx is not visible yet until commit
assertFalse(listener.eventReceived());
map.size().thenAccept(result -> {
......@@ -371,7 +375,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
try {
map.put("foo", value2).join();
assertTrue(false);
fail("update to map entry in open tx should fail with Exception");
} catch (CompletionException e) {
assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
}
......@@ -384,6 +388,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value1, event.newValue().value()));
// map should be update-able after commit
map.put("foo", value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
......@@ -391,6 +396,43 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
assertNotNull(event);
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
// REMOVE_IF_VERSION_MATCH
byte[] currFoo = map.get("foo").get().value();
long currFooVersion = map.get("foo").get().version();
MapUpdate<String, byte[]> remove1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey("foo")
.withCurrentVersion(currFooVersion)
.build();
tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
map.prepare(tx).thenAccept(result -> {
assertTrue("prepare should succeed", result);
}).join();
// verify changes in Tx is not visible yet until commit
assertFalse(listener.eventReceived());
map.size().thenAccept(size -> {
assertThat(size, is(1));
}).join();
map.get("foo").thenAccept(result -> {
assertThat(result.value(), is(currFoo));
}).join();
map.commit(tx.transactionId()).join();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.REMOVE, event.type());
assertArrayEquals(currFoo, event.oldValue().value());
map.size().thenAccept(size -> {
assertThat(size, is(0));
}).join();
}
protected void transactionRollbackTests(int clusterSize) throws Throwable {
......
......@@ -45,10 +45,13 @@ public final class CountDownCompleter<T> {
* @param onCompleteCallback callback to invoke when completer is completed
*/
public CountDownCompleter(T object, long count, Consumer<T> onCompleteCallback) {
checkState(count > 0, "count must be positive");
checkState(count >= 0, "count must be non-negative");
this.counter = new AtomicLong(count);
this.object = checkNotNull(object);
this.onCompleteCallback = checkNotNull(onCompleteCallback);
if (count == 0) {
onCompleteCallback.accept(object);
}
}
/**
......