Madan Jampani
Committed by Gerrit Code Review

ONOS-4218: Fixes for resource store transaction failures

Change-Id: Ie48bb04d7daf6ed7b63c33a3c3c2703496179aa6
......@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkState;
import java.util.function.Function;
import org.onlab.util.ByteArraySizeHashPrinter;
import com.google.common.base.MoreObjects;
/**
......@@ -153,7 +155,7 @@ public final class MapUpdate<K, V> {
.add("mapName", mapName)
.add("type", type)
.add("key", key)
.add("value", value)
.add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
.add("currentValue", currentValue)
.add("currentVersion", currentVersion)
.toString();
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Completion status of transaction.
*/
public enum CommitStatus {
/**
* Indicates a successfully completed transaction with all the updates committed.
*/
SUCCESS,
/**
* Indicates a aborted transaction i.e. no updates were committed.
*/
FAILURE
}
\ No newline at end of file
......@@ -16,6 +16,8 @@
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.TransactionId;
/**
......@@ -63,9 +65,9 @@ public interface TransactionContext extends DistributedPrimitive {
* Commits a transaction that was previously started thereby making its changes permanent
* and externally visible.
*
* @return true if this transaction succeeded, otherwise false.
* @return A future that will be completed when the operation completes
*/
boolean commit();
CompletableFuture<CommitStatus> commit();
/**
* Aborts any changes made in this transaction context and discarding all locally cached updates.
......
......@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
......@@ -41,6 +42,7 @@ import org.onosproject.net.resource.ResourceStoreDelegate;
import org.onosproject.net.resource.Resources;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
......@@ -178,18 +180,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
}
}
boolean success = tx.commit();
if (success) {
log.trace("Transaction commit succeeded on registration: resources={}", resources);
List<ResourceEvent> events = resources.stream()
.filter(x -> x.parent().isPresent())
.map(x -> new ResourceEvent(RESOURCE_ADDED, x))
.collect(Collectors.toList());
notifyDelegate(events);
} else {
log.debug("Transaction commit failed on registration: resources={}", resources);
}
return success;
return tx.commit().whenComplete((status, error) -> {
if (status == CommitStatus.SUCCESS) {
log.trace("Transaction commit succeeded on registration: resources={}", resources);
List<ResourceEvent> events = resources.stream()
.filter(x -> x.parent().isPresent())
.map(x -> new ResourceEvent(RESOURCE_ADDED, x))
.collect(Collectors.toList());
notifyDelegate(events);
} else {
log.warn("Transaction commit failed on registration", error);
}
}).join() == CommitStatus.SUCCESS;
}
@Override
......@@ -252,17 +254,17 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
}
}
boolean success = tx.commit();
if (success) {
List<ResourceEvent> events = resources.stream()
.filter(x -> x.parent().isPresent())
.map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
.collect(Collectors.toList());
notifyDelegate(events);
} else {
log.warn("Failed to unregister {}: Commit failed.", ids);
}
return success;
return tx.commit().whenComplete((status, error) -> {
if (status == CommitStatus.SUCCESS) {
List<ResourceEvent> events = resources.stream()
.filter(x -> x.parent().isPresent())
.map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
.collect(Collectors.toList());
notifyDelegate(events);
} else {
log.warn("Failed to unregister {}: Commit failed.", ids, error);
}
}).join() == CommitStatus.SUCCESS;
}
@Override
......@@ -308,7 +310,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
}
}
return tx.commit();
return tx.commit().join() == CommitStatus.SUCCESS;
}
@Override
......@@ -348,7 +350,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
}
}
return tx.commit();
return tx.commit().join() == CommitStatus.SUCCESS;
}
// computational complexity: O(1) if the resource is discrete type.
......
......@@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.*;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
......@@ -96,22 +97,23 @@ public class DefaultTransactionContext implements TransactionContext {
@SuppressWarnings("unchecked")
@Override
public boolean commit() {
public CompletableFuture<CommitStatus> commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
CommitResult result = null;
CommitStatus status;
try {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
Transaction transaction = new Transaction(transactionId, updates);
result = Futures.getUnchecked(transactionCommitter.apply(transaction));
return result == CommitResult.OK;
status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
? CommitStatus.SUCCESS : CommitStatus.FAILURE;
} catch (Exception e) {
abort();
return false;
status = CommitStatus.FAILURE;
} finally {
isOpen = false;
}
return CompletableFuture.completedFuture(status);
}
@Override
......
......@@ -16,13 +16,16 @@
package org.onosproject.store.primitives.impl;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
import org.onosproject.utils.MeteringAgent;
import com.google.common.collect.Sets;
......@@ -36,6 +39,7 @@ public class NewDefaultTransactionContext implements TransactionContext {
private final TransactionId transactionId;
private final TransactionCoordinator transactionCoordinator;
private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
private final MeteringAgent monitor;
public NewDefaultTransactionContext(TransactionId transactionId,
DistributedPrimitiveCreator creator,
......@@ -43,6 +47,7 @@ public class NewDefaultTransactionContext implements TransactionContext {
this.transactionId = transactionId;
this.creator = creator;
this.transactionCoordinator = transactionCoordinator;
this.monitor = new MeteringAgent("transactionContext", "*", true);
}
@Override
......@@ -68,9 +73,10 @@ public class NewDefaultTransactionContext implements TransactionContext {
}
@Override
public boolean commit() {
transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
return true;
public CompletableFuture<CommitStatus> commit() {
final MeteringAgent.Context timer = monitor.startTimer("commit");
return transactionCoordinator.commit(transactionId, txParticipants)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......
......@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.CommitStatus;
/**
* Coordinator for a two-phase commit protocol.
......@@ -37,45 +38,47 @@ public class TransactionCoordinator {
/**
* Commits a transaction.
*
* @param transactionId transaction
* @param transactionId transaction identifier
* @param transactionParticipants set of transaction participants
* @return future for commit result
*/
CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
CompletableFuture<CommitStatus> commit(TransactionId transactionId,
Set<TransactionParticipant> transactionParticipants) {
if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
}
return transactions.put(transactionId, Transaction.State.PREPARING)
CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
.thenCompose(v -> this.doPrepare(transactionParticipants))
.thenCompose(result -> result
? transactions.put(transactionId, Transaction.State.COMMITTING)
.thenCompose(v -> doCommit(transactionParticipants))
.thenApply(v -> null)
.thenApply(v -> CommitStatus.SUCCESS)
: transactions.put(transactionId, Transaction.State.ROLLINGBACK)
.thenCompose(v -> doRollback(transactionParticipants))
.thenApply(v -> null))
.thenCompose(v -> transactions.remove(transactionId))
.thenApply(v -> null);
.thenApply(v -> CommitStatus.FAILURE));
return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
}
private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
return Tools.allOf(transactionParticipants
.stream()
.map(TransactionParticipant::prepare)
.collect(Collectors.toList()))
return Tools.allOf(transactionParticipants.stream()
.filter(TransactionParticipant::hasPendingUpdates)
.map(TransactionParticipant::prepare)
.collect(Collectors.toList()))
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
.map(p -> p.commit())
.filter(TransactionParticipant::hasPendingUpdates)
.map(TransactionParticipant::commit)
.toArray(CompletableFuture[]::new));
}
private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
.map(p -> p.rollback())
.filter(TransactionParticipant::hasPendingUpdates)
.map(TransactionParticipant::rollback)
.toArray(CompletableFuture[]::new));
}
}
\ No newline at end of file
......
......@@ -87,13 +87,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event -> mapEventListeners.forEach(listener -> {
try {
listener.event(event);
} catch (Exception e) {
log.warn("Error processing map event", e);
}
}));
events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
}
@Override
......