Yuta HIGUCHI

GossipHostStore: add AE support

- modified HostDescription family to hold Set of IpAddresses

Change-Id: Id920fdc83817802885e8528af185a5ad590bf999
package org.onlab.onos.net.host;
import java.util.Collections;
import java.util.Set;
import org.onlab.onos.net.AbstractDescription;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.SparseAnnotations;
......@@ -7,6 +10,8 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import com.google.common.collect.ImmutableSet;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
......@@ -18,7 +23,7 @@ public class DefaultHostDescription extends AbstractDescription
private final MacAddress mac;
private final VlanId vlan;
private final HostLocation location;
private final IpPrefix ip;
private final Set<IpPrefix> ip;
/**
* Creates a host description using the supplied information.
......@@ -31,7 +36,7 @@ public class DefaultHostDescription extends AbstractDescription
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation location,
SparseAnnotations... annotations) {
this(mac, vlan, location, null, annotations);
this(mac, vlan, location, Collections.<IpPrefix>emptySet(), annotations);
}
/**
......@@ -46,11 +51,26 @@ public class DefaultHostDescription extends AbstractDescription
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation location, IpPrefix ip,
SparseAnnotations... annotations) {
this(mac, vlan, location, ImmutableSet.of(ip), annotations);
}
/**
* Creates a host description using the supplied information.
*
* @param mac host MAC address
* @param vlan host VLAN identifier
* @param location host location
* @param ip host IP addresses
* @param annotations optional key/value annotations map
*/
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation location, Set<IpPrefix> ip,
SparseAnnotations... annotations) {
super(annotations);
this.mac = mac;
this.vlan = vlan;
this.location = location;
this.ip = ip;
this.ip = ImmutableSet.copyOf(ip);
}
@Override
......@@ -69,7 +89,7 @@ public class DefaultHostDescription extends AbstractDescription
}
@Override
public IpPrefix ipAddress() {
public Set<IpPrefix> ipAddress() {
return ip;
}
......
package org.onlab.onos.net.host;
import java.util.Set;
import org.onlab.onos.net.Description;
import org.onlab.onos.net.HostLocation;
import org.onlab.packet.IpPrefix;
......@@ -38,6 +40,6 @@ public interface HostDescription extends Description {
* @return host IP address
*/
// FIXME: Switch to IpAddress
IpPrefix ipAddress();
Set<IpPrefix> ipAddress();
}
......
......@@ -8,6 +8,8 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import com.google.common.collect.ImmutableSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -33,7 +35,7 @@ public class DefualtHostDecriptionTest {
assertEquals("incorrect mac", MAC, host.hwAddress());
assertEquals("incorrect vlan", VLAN, host.vlan());
assertEquals("incorrect location", LOC, host.location());
assertEquals("incorrect ip's", IP, host.ipAddress());
assertEquals("incorrect ip's", ImmutableSet.of(IP), host.ipAddress());
assertTrue("incorrect toString", host.toString().contains("vlan=10"));
}
......
package org.onlab.onos.store.host.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -12,6 +15,8 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
......@@ -19,6 +24,7 @@ import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostClockService;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
......@@ -42,12 +48,19 @@ import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.host.HostEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
//TODO: multi-provider, annotation not supported.
......@@ -88,24 +101,58 @@ public class GossipHostStore
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
.register(HostFragmentId.class)
.register(HostAntiEntropyAdvertisement.class)
.build()
.populate(1);
}
};
private ScheduledExecutorService executor;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
GossipHostStoreMessageSubjects.HOST_UPDATED, new InternalHostEventListener());
GossipHostStoreMessageSubjects.HOST_UPDATED,
new InternalHostEventListener());
clusterCommunicator.addSubscriber(
GossipHostStoreMessageSubjects.HOST_REMOVED,
new InternalHostRemovedEventListener());
clusterCommunicator.addSubscriber(
GossipHostStoreMessageSubjects.HOST_REMOVED, new InternalHostRemovedEventListener());
GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
new InternalHostAntiEntropyAdvertisementListener());
executor =
newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
executor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdownNow();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
log.error("Error during executor shutdown", e);
}
hosts.clear();
removedHosts.clear();
locations.clear();
portAddresses.clear();
log.info("Stopped");
}
......@@ -153,7 +200,7 @@ public class GossipHostStore
descr.hwAddress(),
descr.vlan(),
new Timestamped<>(descr.location(), timestamp),
ImmutableSet.of(descr.ipAddress()));
ImmutableSet.copyOf(descr.ipAddress()));
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
return new HostEvent(HOST_ADDED, newhost);
......@@ -169,12 +216,12 @@ public class GossipHostStore
return new HostEvent(HOST_MOVED, host);
}
if (host.ipAddresses().contains(descr.ipAddress())) {
if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
addresses.add(descr.ipAddress());
addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
host.location, addresses);
......@@ -381,6 +428,10 @@ public class GossipHostStore
public HostLocation location() {
return location.value();
}
public Timestamp timestamp() {
return location.timestamp();
}
}
private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
......@@ -399,6 +450,16 @@ public class GossipHostStore
clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, peer);
}
private void notifyDelegateIfNotNull(HostEvent event) {
if (event != null) {
notifyDelegate(event);
......@@ -434,4 +495,165 @@ public class GossipHostStore
notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
}
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
.transform(toNodeId())
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.debug("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
HostAntiEntropyAdvertisement ad = createAdvertisement();
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement", e);
return;
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private HostAntiEntropyAdvertisement createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
final HostId hostId = e.getKey();
final StoredHost hostInfo = e.getValue();
final ProviderId providerId = hostInfo.providerId();
timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
}
for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
tombstones.put(e.getKey(), e.getValue().timestamp());
}
return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
}
private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
final NodeId sender = ad.sender();
for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
// for each locally live Hosts...
final HostId hostId = host.getKey();
final StoredHost localHost = host.getValue();
final ProviderId providerId = localHost.providerId();
final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
final Timestamp localLiveTimestamp = localHost.timestamp();
Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
if (remoteTimestamp == null) {
remoteTimestamp = ad.tombstones().get(hostId);
}
if (remoteTimestamp == null ||
localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
// local is more recent, push
// TODO: annotation is lost
final HostDescription desc = new DefaultHostDescription(
localHost.mac(),
localHost.vlan(),
localHost.location(),
localHost.ipAddresses());
try {
unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
} catch (IOException e1) {
log.debug("Failed to send advertisement response", e1);
}
}
final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
if (remoteDeadTimestamp != null &&
remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
// sender has recent remove
notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
}
}
for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
// for each locally dead Hosts
final HostId hostId = dead.getKey();
final Timestamp localDeadTimestamp = dead.getValue().timestamp();
// TODO: pick proper ProviderId, when supporting multi-provider
final ProviderId providerId = dead.getValue().value().providerId();
final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
if (remoteLiveTimestamp != null &&
localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
// sender has zombie, push
try {
unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
new InternalHostRemovedEvent(hostId, localDeadTimestamp));
} catch (IOException e1) {
log.debug("Failed to send advertisement response", e1);
}
}
}
for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
// for each remote tombstone advertisement...
final HostId hostId = e.getKey();
final Timestamp adRemoveTimestamp = e.getValue();
final StoredHost storedHost = hosts.get(hostId);
if (storedHost == null) {
continue;
}
if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
// sender has recent remove info, locally remove
notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
}
}
}
private final class InternalHostAntiEntropyAdvertisementListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
handleAntiEntropyAdvertisement(advertisement);
}
}
}
......
......@@ -4,6 +4,11 @@ import org.onlab.onos.store.cluster.messaging.MessageSubject;
public final class GossipHostStoreMessageSubjects {
private GossipHostStoreMessageSubjects() {}
public static final MessageSubject HOST_UPDATED = new MessageSubject("peer-host-updated");
public static final MessageSubject HOST_REMOVED = new MessageSubject("peer-host-removed");
public static final MessageSubject HOST_UPDATED
= new MessageSubject("peer-host-updated");
public static final MessageSubject HOST_REMOVED
= new MessageSubject("peer-host-removed");
public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
= new MessageSubject("host-enti-entropy-advertisement");;
}
......
package org.onlab.onos.store.host.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.HostId;
import org.onlab.onos.store.Timestamp;
/**
* Host AE Advertisement message.
*/
public final class HostAntiEntropyAdvertisement {
private final NodeId sender;
private final Map<HostFragmentId, Timestamp> timestamps;
private final Map<HostId, Timestamp> tombstones;
public HostAntiEntropyAdvertisement(NodeId sender,
Map<HostFragmentId, Timestamp> timestamps,
Map<HostId, Timestamp> tombstones) {
this.sender = checkNotNull(sender);
this.timestamps = checkNotNull(timestamps);
this.tombstones = checkNotNull(tombstones);
}
public NodeId sender() {
return sender;
}
public Map<HostFragmentId, Timestamp> timestamps() {
return timestamps;
}
public Map<HostId, Timestamp> tombstones() {
return tombstones;
}
// For serializer
@SuppressWarnings("unused")
private HostAntiEntropyAdvertisement() {
this.sender = null;
this.timestamps = null;
this.tombstones = null;
}
}
package org.onlab.onos.store.host.impl;
import java.util.Objects;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.base.MoreObjects;
/**
* Identifier for HostDescription from a Provider.
*/
public final class HostFragmentId {
public final ProviderId providerId;
public final HostId hostId;
public HostFragmentId(HostId hostId, ProviderId providerId) {
this.providerId = providerId;
this.hostId = hostId;
}
public HostId hostId() {
return hostId;
}
public ProviderId providerId() {
return providerId;
}
@Override
public int hashCode() {
return Objects.hash(providerId, hostId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof HostFragmentId)) {
return false;
}
HostFragmentId that = (HostFragmentId) obj;
return Objects.equals(this.hostId, that.hostId) &&
Objects.equals(this.providerId, that.providerId);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("hostId", hostId)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private HostFragmentId() {
this.providerId = null;
this.hostId = null;
}
}
......@@ -84,7 +84,7 @@ public class DistributedHostStore
descr.hwAddress(),
descr.vlan(),
descr.location(),
ImmutableSet.of(descr.ipAddress()));
ImmutableSet.copyOf(descr.ipAddress()));
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
......@@ -101,12 +101,12 @@ public class DistributedHostStore
return new HostEvent(HOST_MOVED, host);
}
if (host.ipAddresses().contains(descr.ipAddress())) {
if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
addresses.add(descr.ipAddress());
addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
descr.location(), addresses);
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.PortNumber;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link HostLocation}.
*/
public class HostLocationSerializer extends Serializer<HostLocation> {
/**
* Creates {@link HostLocation} serializer instance.
*/
public HostLocationSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, HostLocation object) {
kryo.writeClassAndObject(output, object.deviceId());
kryo.writeClassAndObject(output, object.port());
output.writeLong(object.time());
}
@Override
public HostLocation read(Kryo kryo, Input input, Class<HostLocation> type) {
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
PortNumber portNumber = (PortNumber) kryo.readClassAndObject(input);
long time = input.readLong();
return new HostLocation(deviceId, portNumber, time);
}
}
......@@ -17,6 +17,8 @@ import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
......@@ -24,15 +26,20 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.Timestamp;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public final class KryoPoolUtil {
......@@ -42,6 +49,8 @@ public final class KryoPoolUtil {
public static final KryoPool MISC = KryoPool.newBuilder()
.register(IpPrefix.class, new IpPrefixSerializer())
.register(IpAddress.class, new IpAddressSerializer())
.register(MacAddress.class, new MacAddressSerializer())
.register(VlanId.class)
.build();
// TODO: Populate other classes
......@@ -52,6 +61,7 @@ public final class KryoPoolUtil {
.register(MISC)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
//
ArrayList.class,
......@@ -71,8 +81,10 @@ public final class KryoPoolUtil {
DefaultPortDescription.class,
Element.class,
Link.Type.class,
Timestamp.class
Timestamp.class,
HostId.class,
HostDescription.class,
DefaultHostDescription.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
......@@ -85,6 +97,7 @@ public final class KryoPoolUtil {
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.register(MastershipRole.class, new MastershipRoleSerializer())
.register(HostLocation.class, new HostLocationSerializer())
.build();
......
package org.onlab.onos.store.serializers;
import org.onlab.packet.MacAddress;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link MacAddress}.
*/
public class MacAddressSerializer extends Serializer<MacAddress> {
/**
* Creates {@link MacAddress} serializer instance.
*/
public MacAddressSerializer() {
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, MacAddress object) {
output.writeBytes(object.getAddress());
}
@Override
public MacAddress read(Kryo kryo, Input input, Class<MacAddress> type) {
return MacAddress.valueOf(input.readBytes(MacAddress.MAC_ADDRESS_LENGTH));
}
}
......@@ -84,7 +84,7 @@ public class SimpleHostStore
descr.hwAddress(),
descr.vlan(),
descr.location(),
ImmutableSet.of(descr.ipAddress()));
ImmutableSet.copyOf(descr.ipAddress()));
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
......@@ -101,12 +101,12 @@ public class SimpleHostStore
return new HostEvent(HOST_MOVED, host);
}
if (host.ipAddresses().contains(descr.ipAddress())) {
if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
addresses.add(descr.ipAddress());
addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
descr.location(), addresses);
......