Madan Jampani
Committed by Gerrit Code Review

ONOS-3565: Retry host store updates that fail due to concurrent modification

Change-Id: Id2af2795b0c9f9b1c8d0c4985781ff24e576c7e3
......@@ -24,6 +24,10 @@ public class ConsistentMapException extends StorageException {
public ConsistentMapException() {
}
public ConsistentMapException(String message) {
super(message);
}
public ConsistentMapException(Throwable t) {
super(t);
}
......@@ -38,6 +42,13 @@ public class ConsistentMapException extends StorageException {
* ConsistentMap update conflicts with an in flight transaction.
*/
public static class ConcurrentModification extends ConsistentMapException {
public ConcurrentModification() {
super();
}
public ConcurrentModification(String message) {
super(message);
}
}
/**
......
......@@ -24,6 +24,10 @@ public class StorageException extends RuntimeException {
public StorageException() {
}
public StorageException(String message) {
super(message);
}
public StorageException(Throwable t) {
super(t);
}
......
......@@ -27,6 +27,7 @@ import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
......@@ -293,7 +294,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
if (v.updated()) {
return v.newValue();
} else {
throw new ConsistentMapException.ConcurrentModification();
throw new ConcurrentModification("Concurrent update to " + name + " detected");
}
});
});
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.host.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -27,6 +28,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
......@@ -43,10 +45,12 @@ import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
......@@ -56,6 +60,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -155,37 +160,42 @@ public class DistributedHostStore
HostId hostId,
HostDescription hostDescription,
boolean replaceIPs) {
// TODO: We need a way to detect conflicting changes and abort update.
host.computeIf(hostId,
Supplier<Versioned<DefaultHost>> supplier =
() -> host.computeIf(hostId,
existingHost -> shouldUpdate(existingHost, providerId, hostId,
hostDescription, replaceIPs),
(id, existingHost) -> {
HostLocation location = hostDescription.location();
final Set<IpAddress> addresses;
if (existingHost == null || replaceIPs) {
addresses = ImmutableSet.copyOf(hostDescription.ipAddress());
} else {
addresses = Sets.newHashSet(existingHost.ipAddresses());
addresses.addAll(hostDescription.ipAddress());
}
final Annotations annotations;
if (existingHost != null) {
annotations = merge((DefaultAnnotations) existingHost.annotations(),
hostDescription.annotations());
} else {
annotations = hostDescription.annotations();
}
return new DefaultHost(providerId,
hostId,
hostDescription.hwAddress(),
hostDescription.vlan(),
location,
addresses,
annotations);
});
HostLocation location = hostDescription.location();
final Set<IpAddress> addresses;
if (existingHost == null || replaceIPs) {
addresses = ImmutableSet.copyOf(hostDescription.ipAddress());
} else {
addresses = Sets.newHashSet(existingHost.ipAddresses());
addresses.addAll(hostDescription.ipAddress());
}
final Annotations annotations;
if (existingHost != null) {
annotations = merge((DefaultAnnotations) existingHost.annotations(),
hostDescription.annotations());
} else {
annotations = hostDescription.annotations();
}
return new DefaultHost(providerId,
hostId,
hostDescription.hwAddress(),
hostDescription.vlan(),
location,
addresses,
annotations);
});
Tools.retryable(supplier,
ConsistentMapException.ConcurrentModification.class,
Integer.MAX_VALUE,
50).get();
return null;
}
......
......@@ -86,6 +86,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
.build();
childMap.put(ResourcePath.ROOT, ImmutableList.of());
log.info("Started");
}
@Override
......