alshabib
Committed by Gerrit Code Review

fix for reactive forwarding failing in a

distributed setting.

Change-Id: I992d62bbbd3d873bc8715419592951704903c49d

making the ECHostStore respect sequentiality of events.

Change-Id: I14fa65fc78742c3ea7d417cddefef9f171472246
......@@ -27,10 +27,10 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.event.Event;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.edge.EdgePortEvent;
import org.onosproject.net.edge.EdgePortListener;
......@@ -38,17 +38,16 @@ import org.onosproject.net.edge.EdgePortService;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.Topology;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
......@@ -73,7 +72,9 @@ public class EdgeManager
private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
private final TopologyListener topologyListener = new InnerTopologyListener();
private final LinkListener linkListener = new InnerLinkListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
......@@ -84,17 +85,23 @@ public class EdgeManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Activate
public void activate() {
eventDispatcher.addSink(EdgePortEvent.class, listenerRegistry);
topologyService.addListener(topologyListener);
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
loadAllEdgePorts();
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(EdgePortEvent.class);
topologyService.removeListener(topologyListener);
deviceService.removeListener(deviceListener);
linkService.removeListener(linkListener);
log.info("Stopped");
}
......@@ -142,31 +149,27 @@ public class EdgeManager
return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
}
// Internal listener for topo events used to keep our edge-port cache
// up to date.
private class InnerTopologyListener implements TopologyListener {
private class InnerLinkListener implements LinkListener {
@Override
public void event(TopologyEvent event) {
topology = event.subject();
List<Event> triggers = event.reasons();
if (triggers != null) {
triggers.forEach(reason -> {
if (reason instanceof DeviceEvent) {
processDeviceEvent((DeviceEvent) reason);
} else if (reason instanceof LinkEvent) {
processLinkEvent((LinkEvent) reason);
}
});
} else {
//FIXME special case of preexisting edgeport & no triggerless events could cause this to never hit and
//never discover an edgeport that should have been discovered.
loadAllEdgePorts();
}
public void event(LinkEvent event) {
topology = topologyService.currentTopology();
processLinkEvent(event);
}
}
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
topology = topologyService.currentTopology();
processDeviceEvent(event);
}
}
// Initial loading of the edge port cache.
private void loadAllEdgePorts() {
topology = topologyService.currentTopology();
deviceService.getAvailableDevices().forEach(d -> deviceService.getPorts(d.id())
.forEach(p -> addEdgePort(new ConnectPoint(d.id(), p.number()))));
}
......
......@@ -15,26 +15,8 @@
*/
package org.onosproject.store.host.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -60,22 +42,34 @@ import org.onosproject.net.host.HostStoreDelegate;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
*/
@Component(immediate = true)
@Service
public class ECHostStore
public class DistributedHostStore
extends AbstractStore<HostEvent, HostStoreDelegate>
implements HostStore {
......@@ -84,15 +78,13 @@ public class ECHostStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
private EventuallyConsistentMap<HostId, DefaultHost> hosts;
private ConsistentMap<HostId, DefaultHost> host;
private Map<HostId, DefaultHost> hosts;
private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
new ConcurrentHashMap<>();
private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
@Activate
......@@ -100,21 +92,22 @@ public class ECHostStore
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
host = storageService.<HostId, DefaultHost>consistentMapBuilder()
.withName("onos-hosts")
.withSerializer(hostSerializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(hostSerializer.build()))
.build();
hosts.addListener(hostLocationTracker);
hosts = host.asJavaMap();
host.addListener(hostLocationTracker);
log.info("Started");
}
@Deactivate
public void deactivate() {
hosts.removeListener(hostLocationTracker);
hosts.destroy();
host.removeListener(hostLocationTracker);
prevHosts.clear();
log.info("Stopped");
......@@ -249,11 +242,11 @@ public class ECHostStore
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
@Override
public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
DefaultHost host = checkNotNull(event.value());
if (event.type() == PUT) {
public void event(MapEvent<HostId, DefaultHost> event) {
DefaultHost host = checkNotNull(event.value().value());
if (event.type() == MapEvent.Type.INSERT) {
Host prevHost = prevHosts.put(host.id(), host);
if (prevHost == null) {
notifyDelegate(new HostEvent(HOST_ADDED, host));
......@@ -262,7 +255,7 @@ public class ECHostStore
} else if (!Objects.equals(prevHost, host)) {
notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
}
} else if (event.type() == REMOVE) {
} else if (event.type() == MapEvent.Type.REMOVE) {
if (prevHosts.remove(host.id()) != null) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
......
......@@ -113,7 +113,7 @@ public class DistributedProxyArpStore implements ProxyArpStore {
@Override
public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
/*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
if (nodeId.equals(localNodeId)) {
if (delegate != null) {
delegate.emitResponse(outPort, packet);
......@@ -122,7 +122,10 @@ public class DistributedProxyArpStore implements ProxyArpStore {
log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
}
}*/
//FIXME: Code above may be unnecessary and therefore cluster messaging
// and pendingMessages could be pruned as well.
delegate.emitResponse(outPort, packet);
}
@Override
......
......@@ -27,8 +27,6 @@ import org.onosproject.net.HostLocation;
import org.onosproject.net.host.DefaultHostDescription;
import org.onosproject.net.host.HostDescription;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.TestStorageService;
import java.util.HashSet;
......@@ -37,9 +35,9 @@ import java.util.Set;
/**
* Tests for the ECHostStore.
*/
public class ECHostStoreTest extends TestCase {
public class DistributedHostStoreTest extends TestCase {
private ECHostStore ecXHostStore;
private DistributedHostStore ecXHostStore;
private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
......@@ -50,10 +48,9 @@ public class ECHostStoreTest extends TestCase {
@Before
public void setUp() {
ecXHostStore = new ECHostStore();
ecXHostStore = new DistributedHostStore();
ecXHostStore.storageService = new TestStorageService();
ecXHostStore.clockService = new TestLogicalClockService();
ecXHostStore.activate();
}
......@@ -83,13 +80,4 @@ public class ECHostStoreTest extends TestCase {
assertTrue(host.ipAddresses().contains(IP2));
}
/**
* Mocks the LogicalClockService class.
*/
class TestLogicalClockService implements LogicalClockService {
@Override
public Timestamp getTimestamp() {
return null;
}
}
}
\ No newline at end of file
......
......@@ -15,19 +15,23 @@ from mininet.util import dumpNodeConnections
class AttMplsTopo( Topo ):
"Internet Topology Zoo Specimen."
def build( self ):
def __init__( self ):
"Create a topology."
# Initialize Topology
Topo.__init__( self )
# add nodes, switches first...
NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
CMBR = self.addSwitch( 's1' ) # 42.373730, -71.109734
CHCG = self.addSwitch( 's2' ) # 41.877461, -87.642892
CHCG = self.addSwitch( 's2', protocols='OpenFlow13' ) # 41.877461, -87.642892
CLEV = self.addSwitch( 's3' ) # 41.498928, -81.695217
RLGH = self.addSwitch( 's4' ) # 35.780150, -78.644026
ATLN = self.addSwitch( 's5' ) # 33.749017, -84.394168
PHLA = self.addSwitch( 's6' ) # 39.952906, -75.172278
WASH = self.addSwitch( 's7' ) # 38.906696, -77.035509
NSVL = self.addSwitch( 's8' ) # 36.166410, -86.787305
STLS = self.addSwitch( 's9' ) # 38.626418, -90.198143
STLS = self.addSwitch( 's9', protocols='OpenFlow13' ) # 38.626418, -90.198143
NWOR = self.addSwitch( 's10' ) # 29.951475, -90.078434
HSTN = self.addSwitch( 's11' ) # 29.763249, -95.368332
SNAN = self.addSwitch( 's12' ) # 29.424331, -98.491745
......@@ -40,12 +44,12 @@ class AttMplsTopo( Topo ):
PTLD = self.addSwitch( 's19' ) # 45.523317, -122.677768
STTL = self.addSwitch( 's20' ) # 47.607326, -122.331786
SLKC = self.addSwitch( 's21' ) # 40.759577, -111.895079
LA03 = self.addSwitch( 's22' ) # 34.056346, -118.235951
LA03 = self.addSwitch( 's22', protocols='OpenFlow13' ) # 34.056346, -118.235951
SNDG = self.addSwitch( 's23' ) # 32.714564, -117.153528
PHNX = self.addSwitch( 's24' ) # 33.448289, -112.076299
NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
# ... and now hosts
NY54_host = self.addHost( 'h25' )
CMBR_host = self.addHost( 'h1' )
CHCG_host = self.addHost( 'h2' )
CLEV_host = self.addHost( 'h3' )
......@@ -70,7 +74,6 @@ class AttMplsTopo( Topo ):
LA03_host = self.addHost( 'h22' )
SNDG_host = self.addHost( 'h23' )
PHNX_host = self.addHost( 'h24' )
NY54_host = self.addHost( 'h25' )
# add edges between switch and corresponding host
self.addLink( NY54 , NY54_host )
......