Charles Chan

Fix duplicate host event

Change-Id: I632a1482e7b1b768ce7a0243cc8b6b398b9825b7
......@@ -88,10 +88,14 @@ public class SimpleHostStore
boolean replaceIps) {
//TODO We need a way to detect conflicting changes and abort update.
StoredHost host = hosts.get(hostId);
HostEvent hostEvent;
if (host == null) {
return createHost(providerId, hostId, hostDescription);
hostEvent = createHost(providerId, hostId, hostDescription);
} else {
hostEvent = updateHost(providerId, host, hostDescription, replaceIps);
}
return updateHost(providerId, host, hostDescription, replaceIps);
notifyDelegate(hostEvent);
return hostEvent;
}
// creates a new host and sends HOST_ADDED
......@@ -153,7 +157,9 @@ public class SimpleHostStore
Host host = hosts.remove(hostId);
if (host != null) {
locations.remove((host.location()), host);
return new HostEvent(HOST_REMOVED, host);
HostEvent hostEvent = new HostEvent(HOST_REMOVED, host);
notifyDelegate(hostEvent);
return hostEvent;
}
return null;
}
......
......@@ -192,10 +192,7 @@ public class HostManager
@Override
public void removeHost(HostId hostId) {
checkNotNull(hostId, HOST_ID_NULL);
HostEvent event = store.removeHost(hostId);
if (event != null) {
post(event);
}
store.removeHost(hostId);
}
// Personalized host provider service issued to the supplied provider.
......@@ -211,11 +208,8 @@ public class HostManager
checkNotNull(hostId, HOST_ID_NULL);
checkValidity();
hostDescription = validateHost(hostDescription, hostId);
HostEvent event = store.createOrUpdateHost(provider().id(), hostId,
store.createOrUpdateHost(provider().id(), hostId,
hostDescription, replaceIps);
if (event != null) {
post(event);
}
}
// returns a HostDescription made from the union of the BasicHostConfig
......@@ -231,20 +225,14 @@ public class HostManager
public void hostVanished(HostId hostId) {
checkNotNull(hostId, HOST_ID_NULL);
checkValidity();
HostEvent event = store.removeHost(hostId);
if (event != null) {
post(event);
}
store.removeHost(hostId);
}
@Override
public void removeIpFromHost(HostId hostId, IpAddress ipAddress) {
checkNotNull(hostId, HOST_ID_NULL);
checkValidity();
HostEvent event = store.removeIp(hostId, ipAddress);
if (event != null) {
post(event);
}
store.removeIp(hostId, ipAddress);
}
}
......
......@@ -20,6 +20,7 @@ import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
......@@ -129,13 +130,15 @@ public class HostManagerTest {
}
private void validateEvents(Enum... types) {
int i = 0;
assertEquals("wrong events received", types.length, listener.events.size());
for (Event event : listener.events) {
assertEquals("incorrect event type", types[i], event.type());
i++;
}
listener.events.clear();
TestTools.assertAfter(100, () -> {
int i = 0;
assertEquals("wrong events received", types.length, listener.events.size());
for (Event event : listener.events) {
assertEquals("incorrect event type", types[i], event.type());
i++;
}
listener.events.clear();
});
}
@Test
......
......@@ -28,9 +28,10 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -56,7 +57,6 @@ import org.onosproject.net.host.HostDescription;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostStore;
import org.onosproject.net.host.HostStoreDelegate;
import org.onosproject.net.host.HostEvent.Type;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -67,10 +67,7 @@ import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
/**
......@@ -90,13 +87,11 @@ public class ECHostStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
// Hosts tracked by their location
private final SetMultimap<ConnectPoint, Host> locations =
Multimaps.synchronizedSetMultimap(
HashMultimap.<ConnectPoint, Host>create());
private EventuallyConsistentMap<HostId, DefaultHost> hosts;
private final ConcurrentHashMap<HostId, ConnectPoint> locations =
new ConcurrentHashMap<>();
private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
......@@ -125,6 +120,7 @@ public class ECHostStore
log.info("Stopped");
}
// TODO No longer need to return HostEvent
@Override
public HostEvent createOrUpdateHost(ProviderId providerId,
HostId hostId,
......@@ -133,18 +129,7 @@ public class ECHostStore
// TODO: We need a way to detect conflicting changes and abort update.
// (BOC) Compute might do this for us.
final AtomicReference<Type> eventType = new AtomicReference<>();
final AtomicReference<DefaultHost> oldHost = new AtomicReference<>();
DefaultHost host = hosts.compute(hostId, (id, existingHost) -> {
if (existingHost != null) {
oldHost.set(existingHost);
checkState(Objects.equals(hostDescription.hwAddress(), existingHost.mac()),
"Existing and new MAC addresses differ.");
checkState(Objects.equals(hostDescription.vlan(), existingHost.vlan()),
"Existing and new VLANs differ.");
}
// TODO do we ever want the existing location?
hosts.compute(hostId, (id, existingHost) -> {
HostLocation location = hostDescription.location();
final Set<IpAddress> addresses;
......@@ -163,15 +148,6 @@ public class ECHostStore
annotations = hostDescription.annotations();
}
if (existingHost == null) {
eventType.set(HOST_ADDED);
} else if (!Objects.equals(existingHost.location(), hostDescription.location())) {
eventType.set(HOST_MOVED);
} else if (!existingHost.ipAddresses().containsAll(hostDescription.ipAddress()) ||
!hostDescription.annotations().keys().isEmpty()) {
eventType.set(HOST_UPDATED);
} // else, eventType == null; this means we don't send an event
return new DefaultHost(providerId,
hostId,
hostDescription.hwAddress(),
......@@ -181,24 +157,20 @@ public class ECHostStore
annotations);
});
if (oldHost.get() != null) {
DefaultHost old = oldHost.get();
locations.remove(old.location(), old);
}
locations.put(host.location(), host);
return eventType.get() != null ? new HostEvent(eventType.get(), host) : null;
return null;
}
// TODO No longer need to return HostEvent
@Override
public HostEvent removeHost(HostId hostId) {
Host host = hosts.remove(hostId);
return host != null ? new HostEvent(HOST_REMOVED, host) : null;
hosts.remove(hostId);
return null;
}
// TODO No longer need to return HostEvent
@Override
public HostEvent removeIp(HostId hostId, IpAddress ipAddress) {
DefaultHost host = hosts.compute(hostId, (id, existingHost) -> {
hosts.compute(hostId, (id, existingHost) -> {
if (existingHost != null) {
checkState(Objects.equals(hostId.mac(), existingHost.mac()),
"Existing and new MAC addresses differ.");
......@@ -222,7 +194,7 @@ public class ECHostStore
}
return null;
});
return host != null ? new HostEvent(HOST_UPDATED, host) : null;
return null;
}
@Override
......@@ -257,22 +229,19 @@ public class ECHostStore
@Override
public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
synchronized (locations) {
return ImmutableSet.copyOf(locations.get(connectPoint));
}
Set<Host> filtered = hosts.entrySet().stream()
.filter(entry -> entry.getValue().location().equals(connectPoint))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(filtered);
}
@Override
public Set<Host> getConnectedHosts(DeviceId deviceId) {
Set<Host> filtered;
synchronized (locations) {
filtered = locations
.entries()
.stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.map(entry -> entry.getValue())
.collect(Collectors.toSet());
}
Set<Host> filtered = hosts.entrySet().stream()
.filter(entry -> entry.getValue().location().deviceId().equals(deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(filtered);
}
......@@ -285,13 +254,18 @@ public class ECHostStore
public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
DefaultHost host = checkNotNull(event.value());
if (event.type() == PUT) {
boolean isNew = locations.put(host.location(), host);
notifyDelegate(new HostEvent(isNew ? HOST_ADDED : HOST_UPDATED, host));
ConnectPoint prevLocation = locations.put(host.id(), host.location());
if (prevLocation == null) {
notifyDelegate(new HostEvent(HOST_ADDED, host));
} else if (!Objects.equals(prevLocation, host.location())) {
notifyDelegate(new HostEvent(HOST_MOVED, host));
} else {
notifyDelegate(new HostEvent(HOST_UPDATED, host));
}
} else if (event.type() == REMOVE) {
if (locations.remove(host.location(), host)) {
if (locations.remove(host.id()) != null) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
}
}
}
......