Madan Jampani

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 106 changed files with 1588 additions and 196 deletions
......@@ -100,6 +100,7 @@ public class ReactiveForwarding {
context.block();
return;
}
HostId id = HostId.hostId(ethPkt.getDestinationMAC());
// Do we know who this is for? If not, flood and bail.
......@@ -112,7 +113,9 @@ public class ReactiveForwarding {
// Are we on an edge switch that our destination is on? If so,
// simply forward out to the destination and bail.
if (pkt.receivedFrom().deviceId().equals(dst.location().deviceId())) {
installRule(context, dst.location().port());
if (!context.inPacket().receivedFrom().port().equals(dst.location().port())) {
installRule(context, dst.location().port());
}
return;
}
......@@ -175,21 +178,24 @@ public class ReactiveForwarding {
// We don't yet support bufferids in the flowservice so packet out first.
packetOut(context, portNumber);
// Install the flow rule to handle this type of message from now on.
Ethernet inPkt = context.inPacket().parsed();
TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder();
builder.matchEthType(inPkt.getEtherType())
.matchEthSrc(inPkt.getSourceMAC())
.matchEthDst(inPkt.getDestinationMAC())
.matchInport(context.inPacket().receivedFrom().port());
if (context.inPacket().parsed().getEtherType() == Ethernet.TYPE_IPV4) {
TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder();
treat.setOutput(portNumber);
// Install the flow rule to handle this type of message from now on.
Ethernet inPkt = context.inPacket().parsed();
TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder();
builder.matchEthType(inPkt.getEtherType())
.matchEthSrc(inPkt.getSourceMAC())
.matchEthDst(inPkt.getDestinationMAC())
.matchInport(context.inPacket().receivedFrom().port());
FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
builder.build(), treat.build(), 0, appId);
TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder();
treat.setOutput(portNumber);
flowRuleService.applyFlowRules(f);
FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
builder.build(), treat.build(), 0, appId);
flowRuleService.applyFlowRules(f);
}
}
}
......
......@@ -56,7 +56,8 @@ public interface MastershipService {
Set<DeviceId> getDevicesOf(NodeId nodeId);
/**
* Returns the mastership term service for getting term information.
* Returns the mastership term service for getting read-only
* term information.
*
* @return the MastershipTermService for this mastership manager
*/
......
......@@ -64,4 +64,14 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
* @return the current master's ID and the term value for device, or null
*/
MastershipTerm getTermFor(DeviceId deviceId);
/**
* Revokes a controller instance's mastership over a device and hands
* over mastership to another controller instance.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership for
* @return a mastership event
*/
MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
}
......
package org.onlab.onos.net;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Base abstraction of an annotated entity.
*/
public class AbstractAnnotated implements Annotated {
private static final Map<String, String> EMPTY = new HashMap<>();
private final Map<String, String> annotations;
// For serialization
protected AbstractAnnotated() {
this.annotations = EMPTY;
}
/**
* Creates a new entity, annotated with the specified annotations.
*
* @param annotations optional key/value annotations map
*/
protected AbstractAnnotated(Map<String, String>[] annotations) {
checkArgument(annotations.length <= 1, "Only one set of annotations is expected");
this.annotations = annotations.length == 1 ? annotations[0] : EMPTY;
}
@Override
public Set<String> annotationKeys() {
return ImmutableSet.copyOf(annotations.keySet());
}
@Override
public String annotation(String key) {
return annotations.get(key);
}
}
......@@ -2,10 +2,12 @@ package org.onlab.onos.net;
import org.onlab.onos.net.provider.ProviderId;
import java.util.Map;
/**
* Base implementation of a network model entity.
*/
public class AbstractModel implements Provided {
public class AbstractModel extends AbstractAnnotated implements Provided {
private final ProviderId providerId;
......@@ -15,11 +17,16 @@ public class AbstractModel implements Provided {
}
/**
* Creates a model entity attributed to the specified provider.
* Creates a model entity attributed to the specified provider and
* optionally annotated.
*
* @param providerId identity of the provider
* @param providerId identity of the provider
* @param annotations optional key/value annotations
*/
protected AbstractModel(ProviderId providerId) {
@SafeVarargs
protected AbstractModel(ProviderId providerId,
Map<String, String>... annotations) {
super(annotations);
this.providerId = providerId;
}
......
package org.onlab.onos.net;
import java.util.Set;
/**
* Represents an entity that carries arbitrary annotations.
*/
public interface Annotated {
/**
* Returns the set of annotation keys currently available.
*
* @return set of annotation keys
*/
Set<String> annotationKeys();
/**
* Returns the annotation value for the specified key.
*
* @param key annotation key
* @return annotation value; null if there is no annotation
*/
String annotation(String key);
}
......@@ -3,5 +3,5 @@ package org.onlab.onos.net;
/**
* Base abstraction of a piece of information about network elements.
*/
public interface Description {
public interface Description extends Annotated {
}
......
package org.onlab.onos.net.device;
import org.onlab.onos.net.AbstractAnnotated;
import java.net.URI;
import java.util.Map;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -9,7 +12,8 @@ import static org.onlab.onos.net.Device.Type;
/**
* Default implementation of immutable device description entity.
*/
public class DefaultDeviceDescription implements DeviceDescription {
public class DefaultDeviceDescription extends AbstractAnnotated
implements DeviceDescription {
private final URI uri;
private final Type type;
private final String manufacturer;
......@@ -26,10 +30,14 @@ public class DefaultDeviceDescription implements DeviceDescription {
* @param hwVersion device HW version
* @param swVersion device SW version
* @param serialNumber device serial number
* @param annotations optional key/value annotations map
*/
@SafeVarargs
public DefaultDeviceDescription(URI uri, Type type, String manufacturer,
String hwVersion, String swVersion,
String serialNumber) {
String serialNumber,
Map<String, String>... annotations) {
super(annotations);
this.uri = checkNotNull(uri, "Device URI cannot be null");
this.type = checkNotNull(type, "Device type cannot be null");
this.manufacturer = manufacturer;
......
......@@ -48,6 +48,7 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription);
// TODO: We may need to enforce that ancillary cannot interfere this state
/**
* Removes the specified infrastructure device.
*
......@@ -60,22 +61,24 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
* Updates the ports of the specified infrastructure device using the given
* list of port descriptions. The list is assumed to be comprehensive.
*
* @param providerId provider identifier
* @param deviceId device identifier
* @param portDescriptions list of port descriptions
* @return ready to send events describing what occurred; empty list if no change
*/
List<DeviceEvent> updatePorts(DeviceId deviceId,
List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions);
/**
* Updates the port status of the specified infrastructure device using the
* given port description.
*
* @param providerId provider identifier
* @param deviceId device identifier
* @param portDescription port description
* @return ready to send event describing what occurred; null if no change
*/
DeviceEvent updatePortStatus(DeviceId deviceId,
DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription);
/**
......
......@@ -27,9 +27,11 @@ public class DefaultFlowRule implements FlowRule {
private final ApplicationId appId;
private boolean expired;
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowRuleState state,
long life, long packets, long bytes, long flowId) {
long life, long packets, long bytes, long flowId, boolean expired) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
......@@ -37,7 +39,7 @@ public class DefaultFlowRule implements FlowRule {
this.state = state;
this.appId = ApplicationId.valueOf((int) (flowId >> 32));
this.id = FlowId.valueOf(flowId);
this.expired = expired;
this.life = life;
this.packets = packets;
this.bytes = bytes;
......@@ -186,4 +188,9 @@ public class DefaultFlowRule implements FlowRule {
.toString();
}
@Override
public boolean expired() {
return expired;
}
}
......
......@@ -111,4 +111,11 @@ public interface FlowRule {
*/
long bytes();
/**
* Indicates that this flow has expired at the device.
*
* @return true if it has expired, false otherwise
*/
boolean expired();
}
......
package org.onlab.onos.net.host;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.onlab.onos.net.AbstractAnnotated;
import org.onlab.onos.net.HostLocation;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.MoreObjects.toStringHelper;
public class DefaultHostDescription implements HostDescription {
/**
* Default implementation of an immutable host description.
*/
public class DefaultHostDescription extends AbstractAnnotated
implements HostDescription {
private final MacAddress mac;
private final VlanId vlan;
private final HostLocation location;
private final Set<IpPrefix> ips;
/**
* Creates a host description using the supplied information.
*
* @param mac host MAC address
* @param vlan host VLAN identifier
* @param location host location
* @param annotations optional key/value annotations map
*/
@SafeVarargs
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation loc) {
this.mac = mac;
this.vlan = vlan;
this.location = loc;
this.ips = new HashSet<IpPrefix>();
HostLocation location,
Map<String, String>... annotations) {
this(mac, vlan, location, new HashSet<IpPrefix>(), annotations);
}
/**
* Creates a host description using the supplied information.
*
* @param mac host MAC address
* @param vlan host VLAN identifier
* @param location host location
* @param ips of host IP addresses
* @param annotations optional key/value annotations map
*/
@SafeVarargs
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation loc, Set<IpPrefix> ips) {
HostLocation location, Set<IpPrefix> ips,
Map<String, String>... annotations) {
super(annotations);
this.mac = mac;
this.vlan = vlan;
this.location = loc;
this.ips = new HashSet<IpPrefix>(ips);
this.location = location;
this.ips = new HashSet<>(ips);
}
@Override
......
......@@ -35,10 +35,22 @@ public abstract class AbstractProviderRegistry<P extends Provider, S extends Pro
public synchronized S register(P provider) {
checkNotNull(provider, "Provider cannot be null");
checkState(!services.containsKey(provider.id()), "Provider %s already registered", provider.id());
// If the provider is a primary one, check for a conflict.
ProviderId pid = provider.id();
checkState(pid.isAncillary() || !providersByScheme.containsKey(pid.scheme()),
"A primary provider with id %s is already registered",
providersByScheme.get(pid.scheme()));
S service = createProviderService(provider);
services.put(provider.id(), service);
providers.put(provider.id(), provider);
// FIXME populate scheme look-up
// Register the provider by URI scheme only if it is not ancillary.
if (!pid.isAncillary()) {
providersByScheme.put(pid.scheme(), provider);
}
return service;
}
......
......@@ -5,12 +5,35 @@ import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Notion of provider identity.
* External identity of a {@link org.onlab.onos.net.provider.Provider} family.
* It also carriers two designations of external characteristics, the URI
* scheme and primary/ancillary indicator.
* <p/>
* The device URI scheme is used to determine applicability of a provider to
* operations on a specific device. The ancillary indicator serves to designate
* a provider as a primary or ancillary.
*
* A {@link org.onlab.onos.net.provider.ProviderRegistry} uses this designation
* to permit only one primary provider per device URI scheme. Multiple
* ancillary providers can register with the same device URI scheme however.
*/
public class ProviderId {
private final String scheme;
private final String id;
private final boolean ancillary;
/**
* Creates a new primary provider identifier from the specified string.
* The providers are expected to follow the reverse DNS convention, e.g.
* {@code org.onlab.onos.provider.of.device}
*
* @param scheme device URI scheme to which this provider is bound, e.g. "of", "snmp"
* @param id string identifier
*/
public ProviderId(String scheme, String id) {
this(scheme, id, false);
}
/**
* Creates a new provider identifier from the specified string.
......@@ -19,10 +42,12 @@ public class ProviderId {
*
* @param scheme device URI scheme to which this provider is bound, e.g. "of", "snmp"
* @param id string identifier
* @param ancillary ancillary provider indicator
*/
public ProviderId(String scheme, String id) {
public ProviderId(String scheme, String id, boolean ancillary) {
this.scheme = scheme;
this.id = id;
this.ancillary = ancillary;
}
/**
......@@ -43,6 +68,15 @@ public class ProviderId {
return id;
}
/**
* Indicates whether this identifier designates an ancillary providers.
*
* @return true if the provider is ancillary; false if primary
*/
public boolean isAncillary() {
return ancillary;
}
@Override
public int hashCode() {
return Objects.hash(scheme, id);
......@@ -56,14 +90,16 @@ public class ProviderId {
if (obj instanceof ProviderId) {
final ProviderId other = (ProviderId) obj;
return Objects.equals(this.scheme, other.scheme) &&
Objects.equals(this.id, other.id);
Objects.equals(this.id, other.id) &&
this.ancillary == other.ancillary;
}
return false;
}
@Override
public String toString() {
return toStringHelper(this).add("scheme", scheme).add("id", id).toString();
return toStringHelper(this).add("scheme", scheme).add("id", id)
.add("ancillary", ancillary).toString();
}
}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.net.topology;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.onlab.onos.net.AbstractAnnotated;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -12,7 +13,8 @@ import java.util.Map;
/**
* Default implementation of an immutable topology graph data carrier.
*/
public class DefaultGraphDescription implements GraphDescription {
public class DefaultGraphDescription extends AbstractAnnotated
implements GraphDescription {
private final long nanos;
private final ImmutableSet<TopologyVertex> vertexes;
......@@ -25,11 +27,16 @@ public class DefaultGraphDescription implements GraphDescription {
* Creates a minimal topology graph description to allow core to construct
* and process the topology graph.
*
* @param nanos time in nanos of when the topology description was created
* @param devices collection of infrastructure devices
* @param links collection of infrastructure links
* @param nanos time in nanos of when the topology description was created
* @param devices collection of infrastructure devices
* @param links collection of infrastructure links
* @param annotations optional key/value annotations map
*/
public DefaultGraphDescription(long nanos, Iterable<Device> devices, Iterable<Link> links) {
@SafeVarargs
public DefaultGraphDescription(long nanos, Iterable<Device> devices,
Iterable<Link> links,
Map<String, String>... annotations) {
super(annotations);
this.nanos = nanos;
this.vertexes = buildVertexes(devices);
this.edges = buildEdges(links);
......
package org.onlab.onos.cluster;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.google.common.testing.EqualsTester;
public class MastershipTermTest {
private static final NodeId N1 = new NodeId("foo");
private static final NodeId N2 = new NodeId("bar");
private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
@Test
public void basics() {
assertEquals("incorrect term number", 0, TERM1.termNumber());
assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
}
@Test
public void testEquality() {
new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
.addEqualityGroup(TERM2, TERM3)
.addEqualityGroup(TERM4);
}
}
......@@ -35,7 +35,7 @@ public class AbstractProviderRegistryTest {
assertThat("provider not found", registry.getProviders().contains(fooId));
assertEquals("incorrect provider", psFoo.provider(), pFoo);
ProviderId barId = new ProviderId("of", "bar");
ProviderId barId = new ProviderId("snmp", "bar");
TestProvider pBar = new TestProvider(barId);
TestProviderService psBar = registry.register(pBar);
assertEquals("incorrect provider count", 2, registry.getProviders().size());
......@@ -49,6 +49,16 @@ public class AbstractProviderRegistryTest {
assertThat("provider not found", registry.getProviders().contains(barId));
}
@Test
public void ancillaryProviders() {
TestProviderRegistry registry = new TestProviderRegistry();
TestProvider pFoo = new TestProvider(new ProviderId("of", "foo"));
TestProvider pBar = new TestProvider(new ProviderId("of", "bar", true));
registry.register(pFoo);
registry.register(pBar);
assertEquals("incorrect provider count", 2, registry.getProviders().size());
}
@Test(expected = IllegalStateException.class)
public void duplicateRegistration() {
TestProviderRegistry registry = new TestProviderRegistry();
......@@ -57,6 +67,15 @@ public class AbstractProviderRegistryTest {
registry.register(pFoo);
}
@Test(expected = IllegalStateException.class)
public void duplicateSchemeRegistration() {
TestProviderRegistry registry = new TestProviderRegistry();
TestProvider pFoo = new TestProvider(new ProviderId("of", "foo"));
TestProvider pBar = new TestProvider(new ProviderId("of", "bar"));
registry.register(pFoo);
registry.register(pBar);
}
@Test
public void voidUnregistration() {
TestProviderRegistry registry = new TestProviderRegistry();
......
......@@ -11,6 +11,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
......@@ -52,9 +54,12 @@ implements MastershipService, MastershipAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@Activate
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
......@@ -62,6 +67,7 @@ implements MastershipService, MastershipAdminService {
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
......@@ -71,12 +77,16 @@ implements MastershipService, MastershipAdminService {
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
//TODO figure out appropriate action for non-MASTER roles, if we even set those
MastershipEvent event = null;
if (role.equals(MastershipRole.MASTER)) {
MastershipEvent event = store.setMaster(nodeId, deviceId);
if (event != null) {
post(event);
}
event = store.setMaster(nodeId, deviceId);
} else {
event = store.unsetMaster(nodeId, deviceId);
}
if (event != null) {
post(event);
}
}
......@@ -88,8 +98,16 @@ implements MastershipService, MastershipAdminService {
@Override
public void relinquishMastership(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
// FIXME: add method to store to give up mastership and trigger new master selection process
MastershipRole role = getLocalRole(deviceId);
if (!role.equals(MastershipRole.MASTER)) {
return;
}
MastershipEvent event = store.unsetMaster(
clusterService.getLocalNode().id(), deviceId);
if (event != null) {
post(event);
}
}
@Override
......@@ -146,6 +164,26 @@ implements MastershipService, MastershipAdminService {
}
//callback for reacting to cluster events
private class InternalClusterEventListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
//FIXME: worry about addition when the time comes
case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
break;
case INSTANCE_REMOVED:
case INSTANCE_DEACTIVATED:
break;
default:
log.warn("unknown cluster event {}", event);
}
}
}
public class InternalDelegate implements MastershipStoreDelegate {
@Override
......
......@@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
......@@ -76,6 +77,8 @@ public class DeviceManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
......@@ -84,6 +87,7 @@ public class DeviceManager
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
mastershipService.addListener(mastershipListener);
termService = mastershipService.requestTermService();
log.info("Started");
}
......@@ -198,7 +202,7 @@ public class DeviceManager
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.getLocalRole(deviceId));
mastershipService.requestRoleFor(deviceId));
post(event);
}
}
......@@ -208,8 +212,11 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
mastershipService.relinquishMastership(deviceId);
post(event);
}
}
......@@ -221,8 +228,9 @@ public class DeviceManager
checkNotNull(portDescriptions,
"Port descriptions list cannot be null");
checkValidity();
List<DeviceEvent> events = store.updatePorts(deviceId,
portDescriptions);
this.provider().id();
List<DeviceEvent> events = store.updatePorts(this.provider().id(),
deviceId, portDescriptions);
for (DeviceEvent event : events) {
post(event);
}
......@@ -234,8 +242,8 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(portDescription, PORT_DESCRIPTION_NULL);
checkValidity();
DeviceEvent event = store.updatePortStatus(deviceId,
portDescription);
DeviceEvent event = store.updatePortStatus(this.provider().id(),
deviceId, portDescription);
if (event != null) {
log.info("Device {} port {} status changed", deviceId, event
.port().number());
......
......@@ -161,7 +161,11 @@ implements FlowRuleService, FlowRuleProviderRegistry {
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(stored);
if (flowRule.expired()) {
event = store.removeFlowRule(flowRule);
} else {
frp.applyFlowRule(stored);
}
break;
case PENDING_REMOVE:
case REMOVED:
......
......@@ -231,6 +231,7 @@ public class ProxyArpManager implements ProxyArpService {
arp.setOpCode(ARP.OP_REPLY);
arp.setProtocolType(ARP.PROTO_TYPE_IP);
arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
arp.setProtocolAddressLength((byte) IpPrefix.INET_LEN);
arp.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH);
arp.setSenderHardwareAddress(h.mac().getAddress());
......@@ -238,7 +239,7 @@ public class ProxyArpManager implements ProxyArpService {
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toRealInt());
eth.setPayload(arp);
return eth;
}
......@@ -291,7 +292,6 @@ public class ProxyArpManager implements ProxyArpService {
case DEVICE_MASTERSHIP_CHANGED:
case DEVICE_SUSPENDED:
case DEVICE_UPDATED:
case PORT_UPDATED:
// nothing to do in these cases; handled when links get reported
break;
case DEVICE_REMOVED:
......@@ -301,9 +301,12 @@ public class ProxyArpManager implements ProxyArpService {
}
break;
case PORT_ADDED:
case PORT_UPDATED:
synchronized (externalPorts) {
externalPorts.put(device, event.port().number());
internalPorts.remove(device, event.port().number());
if (event.port().isEnabled()) {
externalPorts.put(device, event.port().number());
internalPorts.remove(device, event.port().number());
}
}
break;
case PORT_REMOVED:
......
......@@ -65,8 +65,8 @@ public class DefaultTopologyProvider extends AbstractProvider
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
private DeviceListener deviceListener = new InnerDeviceListener();
private LinkListener linkListener = new InnerLinkListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
......@@ -132,7 +132,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for device events
private class InnerDeviceListener implements DeviceListener {
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
......@@ -144,7 +144,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for link events
private class InnerLinkListener implements LinkListener {
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
......
......@@ -15,10 +15,11 @@ import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.trivial.impl.SimpleMastershipStore;
import org.onlab.onos.store.trivial.impl.SimpleMastershipStore;
import org.onlab.packet.IpPrefix;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.onlab.onos.net.MastershipRole.*;
/**
......@@ -65,7 +66,24 @@ public class MastershipManagerTest {
@Test
public void relinquishMastership() {
//TODO
//no backups - should turn to standby and no master for device
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
assertEquals("wrong role:", MASTER, mgr.getLocalRole(DEV_MASTER));
mgr.relinquishMastership(DEV_MASTER);
assertNull("wrong master:", mgr.getMasterFor(DEV_OTHER));
assertEquals("wrong role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
//not master, nothing should happen
mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
mgr.relinquishMastership(DEV_OTHER);
assertNull("wrong role:", mgr.getMasterFor(DEV_OTHER));
//provide NID_OTHER as backup and relinquish
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
assertEquals("wrong master:", NID_LOCAL, mgr.getMasterFor(DEV_MASTER));
mgr.setRole(NID_OTHER, DEV_MASTER, STANDBY);
mgr.relinquishMastership(DEV_MASTER);
assertEquals("wrong master:", NID_OTHER, mgr.getMasterFor(DEV_MASTER));
}
@Test
......@@ -95,7 +113,6 @@ public class MastershipManagerTest {
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
assertEquals("should be one device:", 1, mgr.getDevicesOf(NID_LOCAL).size());
//hand both devices to NID_LOCAL
mgr.setRole(NID_LOCAL, DEV_OTHER, MASTER);
assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size());
......
......@@ -27,7 +27,7 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleDeviceStore;
import org.onlab.onos.store.trivial.impl.SimpleDeviceStore;
import java.util.ArrayList;
import java.util.Iterator;
......
......@@ -40,7 +40,7 @@ import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleFlowRuleStore;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
......
......@@ -34,7 +34,7 @@ import org.onlab.onos.net.host.HostProviderService;
import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleHostStore;
import org.onlab.onos.store.trivial.impl.SimpleHostStore;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
......
......@@ -23,7 +23,7 @@ import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.device.impl.DeviceManager;
import org.onlab.onos.net.trivial.impl.SimpleLinkStore;
import org.onlab.onos.store.trivial.impl.SimpleLinkStore;
import java.util.ArrayList;
import java.util.Iterator;
......
......@@ -24,7 +24,7 @@ import org.onlab.onos.net.topology.TopologyProvider;
import org.onlab.onos.net.topology.TopologyProviderRegistry;
import org.onlab.onos.net.topology.TopologyProviderService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.onos.net.trivial.impl.SimpleTopologyStore;
import org.onlab.onos.store.trivial.impl.SimpleTopologyStore;
import java.util.ArrayList;
import java.util.List;
......
/**
* Distributed cluster store and messaging subsystem implementation.
*/
package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
package org.onlab.onos.store.cluster.impl;
......
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.Timestamp;
import com.google.common.collect.ImmutableMap;
/**
* Anti-Entropy advertisement message.
* <p>
* Message to advertise the information this node holds.
*
* @param <ID> ID type
*/
public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, Timestamp> advertisement;
/**
* Creates anti-entropy advertisement message.
*
* @param sender sender of this message
* @param advertisement timestamp information of the data sender holds
*/
public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
super(AE_ADVERTISEMENT);
this.sender = sender;
this.advertisement = ImmutableMap.copyOf(advertisement);
}
public NodeId sender() {
return sender;
}
public ImmutableMap<ID, Timestamp> advertisement() {
return advertisement;
}
// Default constructor for serializer
protected AntiEntropyAdvertisement() {
super(AE_ADVERTISEMENT);
this.sender = null;
this.advertisement = null;
}
}
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.device.impl.VersionedValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* Anti-Entropy reply message.
* <p>
* Message to send in reply to advertisement or another reply.
* Suggest to the sender about the more up-to-date data this node has,
* and request for more recent data that the receiver has.
*/
public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, V> suggestion;
private final ImmutableSet<ID> request;
/**
* Creates a reply to anti-entropy message.
*
* @param sender sender of this message
* @param suggestion collection of more recent values, sender had
* @param request Collection of identifiers
*/
public AntiEntropyReply(NodeId sender,
Map<ID, V> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
this.suggestion = ImmutableMap.copyOf(suggestion);
this.request = ImmutableSet.copyOf(request);
}
public NodeId sender() {
return sender;
}
/**
* Returns collection of values, which the recipient of this reply is likely
* to be missing or has outdated version.
*
* @return
*/
public ImmutableMap<ID, V> suggestion() {
return suggestion;
}
/**
* Returns collection of identifier to request.
*
* @return collection of identifier to request
*/
public ImmutableSet<ID> request() {
return request;
}
/**
* Checks if reply contains any suggestion or request.
*
* @return true if nothing is suggested and requested
*/
public boolean isEmpty() {
return suggestion.isEmpty() && request.isEmpty();
}
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
this.sender = null;
this.suggestion = null;
this.request = null;
}
}
......@@ -15,6 +15,12 @@ public enum MessageSubject {
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
ECHO,
/** Anti-Entropy advertisement message. */
AE_ADVERTISEMENT,
/** Anti-Entropy reply message. */
AE_REPLY,
}
......
/**
* Cluster messaging APIs for the use by the various distributed stores.
*/
package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
package org.onlab.onos.store.cluster.messaging;
......
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
public class DeviceAntiEntropyAdvertisement
extends AntiEntropyAdvertisement<DeviceId> {
public DeviceAntiEntropyAdvertisement(NodeId sender,
Map<DeviceId, Timestamp> advertisement) {
super(sender, advertisement);
}
// May need to add ProviderID, etc.
public static DeviceAntiEntropyAdvertisement create(
NodeId self,
Collection<VersionedValue<Device>> localValues) {
Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
for (VersionedValue<Device> e : localValues) {
ads.put(e.entity().id(), e.timestamp());
}
return new DeviceAntiEntropyAdvertisement(self, ads);
}
// For serializer
protected DeviceAntiEntropyAdvertisement() {}
}
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class DeviceAntiEntropyReply
extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
public DeviceAntiEntropyReply(NodeId sender,
Map<DeviceId, VersionedValue<Device>> suggestion,
Set<DeviceId> request) {
super(sender, suggestion, request);
}
/**
* Creates a reply to Anti-Entropy advertisement.
*
* @param advertisement to respond to
* @param self node identifier representing local node
* @param localValues local values held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyAdvertisement advertisement,
NodeId self,
Collection<VersionedValue<Device>> localValues
) {
ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
sug = ImmutableMap.builder();
Set<DeviceId> req = new HashSet<>(ads.keySet());
for (VersionedValue<Device> e : localValues) {
final DeviceId id = e.entity().id();
final Timestamp local = e.timestamp();
final Timestamp theirs = ads.get(id);
if (theirs == null) {
// they don't have it, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.compareTo(theirs) < 0) {
// they got older one, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.equals(theirs)) {
// same, don't need theirs
req.remove(id);
}
}
return new DeviceAntiEntropyReply(self, sug.build(), req);
}
/**
* Creates a reply to request for values held locally.
*
* @param requests message containing the request
* @param self node identifier representing local node
* @param localValues local valeds held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyReply requests,
NodeId self,
Map<DeviceId, VersionedValue<Device>> localValues
) {
Set<DeviceId> reqs = requests.request();
Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
for (DeviceId id : reqs) {
final VersionedValue<Device> value = localValues.get(id);
if (value != null) {
requested.put(id, value);
}
}
Set<DeviceId> empty = ImmutableSet.of();
return new DeviceAntiEntropyReply(self, requested, empty);
}
// For serializer
protected DeviceAntiEntropyReply() {}
}
......@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
......@@ -191,7 +192,7 @@ public class OnosDistributedDeviceStore
}
@Override
public List<DeviceEvent> updatePorts(DeviceId deviceId,
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
......@@ -295,7 +296,7 @@ public class OnosDistributedDeviceStore
}
@Override
public DeviceEvent updatePortStatus(DeviceId deviceId,
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
......
package org.onlab.onos.store.device.impl;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
/**
......@@ -42,4 +44,35 @@ public class VersionedValue<T> {
public Timestamp timestamp() {
return timestamp;
}
@Override
public int hashCode() {
return Objects.hash(entity, timestamp, isUp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked")
VersionedValue<T> that = (VersionedValue<T>) obj;
return Objects.equals(this.entity, that.entity) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.isUp, that.isUp);
}
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
this.isUp = false;
this.timestamp = null;
}
}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.cluster.impl;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
......@@ -58,6 +59,7 @@ public class ClusterCommunicationManagerTest {
ccm2.deactivate();
}
@Ignore("FIXME: failing randomly?")
@Test
public void connect() throws Exception {
cnd1.latch = new CountDownLatch(1);
......
......@@ -123,6 +123,12 @@ implements MastershipStore {
return null;
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
// TODO Auto-generated method stub
return null;
}
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
......
......@@ -221,7 +221,7 @@ public class DistributedDeviceStore
}
@Override
public List<DeviceEvent> updatePorts(DeviceId deviceId,
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
......@@ -319,7 +319,7 @@ public class DistributedDeviceStore
}
@Override
public DeviceEvent updatePortStatus(DeviceId deviceId,
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
synchronized (this) {
Device device = devices.getUnchecked(deviceId).orNull();
......
......@@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
* TEMPORARY: Manages inventory of flow rules using distributed store implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
......
/**
* Implementation of flow store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.flow.impl;
......@@ -39,8 +39,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Manages inventory of end-station hosts using trivial in-memory
* implementation.
* TEMPORARY: Manages inventory of end-station hosts using distributed
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
......
/**
* Implementation of host store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.host.impl;
......@@ -28,7 +28,7 @@ import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
/**
* Manages inventory of topology snapshots using trivial in-memory
* TEMPORARY: Manages inventory of topology snapshots using distributed
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
......
/**
* Implementation of topology store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.topology.impl;
......@@ -201,7 +201,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P2, true)
);
List<DeviceEvent> events = deviceStore.updatePorts(DID1, pds);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -220,7 +220,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
events = deviceStore.updatePorts(DID1, pds2);
events = deviceStore.updatePorts(PID, DID1, pds2);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -243,7 +243,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
events = deviceStore.updatePorts(DID1, pds3);
events = deviceStore.updatePorts(PID, DID1, pds3);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -268,9 +268,9 @@ public class DistributedDeviceStoreTest {
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(DID1,
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
......@@ -286,7 +286,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
List<Port> ports = deviceStore.getPorts(DID1);
......@@ -309,7 +309,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, false)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Port port1 = deviceStore.getPort(DID1, P1);
assertEquals(P1, port1.number());
......
package org.onlab.onos.store.serializers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
import com.google.common.collect.ImmutableMap;
/**
* Kryo Serializer for {@link ImmutableMap}.
*/
public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
private final MapSerializer mapSerializer = new MapSerializer();
public ImmutableMapSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
// wrapping with unmodifiableMap proxy
// to avoid Kryo from writing only the reference marker of this instance,
// which will be embedded right before this method call.
kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
}
@Override
public ImmutableMap<?, ?> read(Kryo kryo, Input input,
Class<ImmutableMap<?, ?>> type) {
Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
return ImmutableMap.copyOf(map);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableMap.of().getClass(), this);
kryo.register(ImmutableMap.of(1, 2).getClass(), this);
kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
// TODO register required ImmutableMap variants
}
}
package org.onlab.onos.store.serializers;
import java.util.ArrayList;
import java.util.List;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableSet;
/**
* Kryo Serializer for {@link ImmutableSet}.
*/
public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
private final CollectionSerializer serializer = new CollectionSerializer();
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
kryo.writeObject(output, object.asList(), serializer);
}
@Override
public ImmutableSet<?> read(Kryo kryo, Input input,
Class<ImmutableSet<?>> type) {
List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
return ImmutableSet.copyOf(elms);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableSet.of().getClass(), this);
kryo.register(ImmutableSet.of(1).getClass(), this);
kryo.register(ImmutableSet.of(1, 2).getClass(), this);
// TODO register required ImmutableSet variants
}
}
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -100,4 +101,14 @@ public class KryoSerializationManager implements KryoSerializationService {
return serializerPool.deserialize(bytes);
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
}
......
package org.onlab.onos.store.serializers;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
......@@ -16,6 +18,15 @@ public interface KryoSerializationService {
public byte[] serialize(final Object obj);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
......@@ -24,4 +35,12 @@ public interface KryoSerializationService {
*/
public <T> T deserialize(final byte[] bytes);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param buffer bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final ByteBuffer buffer);
}
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.net.MastershipRole;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.net.MastershipRole}.
*/
public class MastershipRoleSerializer extends Serializer<MastershipRole> {
@Override
public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
final String role = kryo.readObject(input, String.class);
return MastershipRole.valueOf(role);
}
@Override
public void write(Kryo kryo, Output output, MastershipRole object) {
kryo.writeObject(output, object.toString());
}
}
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.cluster.MastershipTerm}.
*/
public class MastershipTermSerializer extends Serializer<MastershipTerm> {
@Override
public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
final NodeId node = new NodeId(kryo.readObject(input, String.class));
final int term = input.readInt();
return MastershipTerm.of(node, term);
}
@Override
public void write(Kryo kryo, Output output, MastershipTerm object) {
output.writeString(object.master().toString());
output.writeInt(object.termNumber());
}
}
package org.onlab.onos.store.serializers;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
import de.javakaffee.kryoserializers.URISerializer;
public class KryoSerializerTests {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber P1 = portNumber(1);
private static final PortNumber P2 = portNumber(2);
private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
private static final String MFR = "whitebox";
private static final String HW = "1.1.x";
private static final String SW1 = "3.8.1";
private static final String SW2 = "3.9.5";
private static final String SN = "43311-12345";
private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
private static KryoPool kryos;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kryos = KryoPool.newBuilder()
.register(
ArrayList.class,
HashMap.class
)
.register(
Device.Type.class,
Link.Type.class
// ControllerNode.State.class,
// DefaultControllerNode.class,
// MastershipRole.class,
// Port.class,
// Element.class,
)
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(IpPrefix.class, new IpPrefixSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DefaultDevice.class)
.register(URI.class, new URISerializer())
.register(MastershipRole.class, new MastershipRoleSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.build();
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
// removing Kryo instance to use fresh Kryo on each tests
kryos.getKryo();
}
private static <T> void testSerialized(T original) {
ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
kryos.serialize(original, buffer);
buffer.flip();
T copy = kryos.deserialize(buffer);
new EqualsTester()
.addEqualityGroup(original, copy)
.testEquals();
}
@Test
public final void test() {
testSerialized(new ConnectPoint(DID1, P1));
testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
testSerialized(new DefaultPort(DEV1, P1, true));
testSerialized(DID1);
testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
testSerialized(ImmutableMap.of(DID1, DEV1));
testSerialized(ImmutableMap.of());
testSerialized(ImmutableSet.of(DID1, DID2));
testSerialized(ImmutableSet.of(DID1));
testSerialized(ImmutableSet.of());
testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
testSerialized(new LinkKey(CP1, CP2));
testSerialized(new NodeId("SomeNodeIdentifier"));
testSerialized(P1);
testSerialized(PID);
}
}
......@@ -25,6 +25,10 @@
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.onlab.graph.AdjacencyListsGraph;
import org.onlab.onos.net.topology.TopologyEdge;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.onlab.onos.net.DeviceId;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -7,8 +7,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
......@@ -48,8 +46,10 @@ public class SimpleMastershipStore
new DefaultControllerNode(new NodeId("local"), LOCALHOST);
//devices mapped to their masters, to emulate multiple nodes
protected final ConcurrentMap<DeviceId, NodeId> masterMap =
new ConcurrentHashMap<>();
protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
//emulate backups with pile of nodes
protected final Set<NodeId> backups = new HashSet<>();
//terms
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
@Activate
......@@ -64,25 +64,29 @@ public class SimpleMastershipStore
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
NodeId node = masterMap.get(deviceId);
if (node == null) {
synchronized (this) {
masterMap.put(deviceId, nodeId);
termMap.put(deviceId, new AtomicInteger());
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
case MASTER:
return null;
case STANDBY:
masterMap.put(deviceId, nodeId);
termMap.get(deviceId).incrementAndGet();
backups.add(nodeId);
break;
case NONE:
masterMap.put(deviceId, nodeId);
termMap.put(deviceId, new AtomicInteger());
backups.add(nodeId);
break;
default:
log.warn("unknown Mastership Role {}", role);
return null;
}
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
if (node.equals(nodeId)) {
return null;
} else {
synchronized (this) {
masterMap.put(deviceId, nodeId);
termMap.get(deviceId).incrementAndGet();
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
}
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
@Override
......@@ -103,34 +107,111 @@ public class SimpleMastershipStore
@Override
public MastershipRole requestRole(DeviceId deviceId) {
return getRole(instance.id(), deviceId);
//query+possible reelection
NodeId node = instance.id();
MastershipRole role = getRole(node, deviceId);
switch (role) {
case MASTER:
break;
case STANDBY:
synchronized (this) {
//try to "re-elect", since we're really not distributed
NodeId rel = reelect(node);
if (rel == null) {
masterMap.put(deviceId, node);
termMap.put(deviceId, new AtomicInteger());
role = MastershipRole.MASTER;
}
backups.add(node);
}
break;
case NONE:
//first to get to it, say we are master
synchronized (this) {
masterMap.put(deviceId, node);
termMap.put(deviceId, new AtomicInteger());
backups.add(node);
role = MastershipRole.MASTER;
}
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return role;
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
NodeId node = masterMap.get(deviceId);
//just query
NodeId current = masterMap.get(deviceId);
MastershipRole role;
if (node != null) {
if (node.equals(nodeId)) {
if (current == null) {
if (backups.contains(nodeId)) {
role = MastershipRole.STANDBY;
} else {
role = MastershipRole.NONE;
}
} else {
if (current.equals(nodeId)) {
role = MastershipRole.MASTER;
} else {
role = MastershipRole.STANDBY;
}
} else {
//masterMap doesn't contain it.
role = MastershipRole.MASTER;
masterMap.put(deviceId, nodeId);
}
return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
if (masterMap.get(deviceId) == null) {
if ((masterMap.get(deviceId) == null) ||
(termMap.get(deviceId) == null)) {
return null;
}
return MastershipTerm.of(
masterMap.get(deviceId), termMap.get(deviceId).get());
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
case MASTER:
NodeId backup = reelect(nodeId);
if (backup == null) {
masterMap.remove(deviceId);
} else {
masterMap.put(deviceId, backup);
termMap.get(deviceId).incrementAndGet();
return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
}
case STANDBY:
case NONE:
if (!termMap.containsKey(deviceId)) {
termMap.put(deviceId, new AtomicInteger());
}
backups.add(nodeId);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
}
return null;
}
//dumbly selects next-available node that's not the current one
//emulate leader election
private NodeId reelect(NodeId nodeId) {
NodeId backup = null;
for (NodeId n : backups) {
if (!n.equals(nodeId)) {
backup = n;
break;
}
}
return backup;
}
}
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......
......@@ -2,4 +2,4 @@
* Implementations of in-memory stores suitable for unit testing and
* experimentation; not for production use.
*/
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.junit.Before;
import org.junit.Test;
......
/**
*
*/
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
......@@ -44,6 +44,7 @@ import com.google.common.collect.Sets;
public class SimpleDeviceStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final String MFR = "whitebox";
......@@ -89,6 +90,13 @@ public class SimpleDeviceStoreTest {
deviceStore.createOrUpdateDevice(PID, deviceId, description);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
}
private static void assertDevice(DeviceId id, String swVersion, Device device) {
assertNotNull(device);
assertEquals(id, device.id());
......@@ -160,6 +168,33 @@ public class SimpleDeviceStoreTest {
}
@Test
public final void testCreateOrUpdateDeviceAncillary() {
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(PIDA, event.subject().providerId());
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
assertEquals(PID, event2.subject().providerId());
assertTrue(deviceStore.isAvailable(DID1));
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
// For now, Ancillary is ignored once primary appears
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
}
@Test
public final void testMarkOffline() {
putDevice(DID1, SW1);
......@@ -182,7 +217,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P2, true)
);
List<DeviceEvent> events = deviceStore.updatePorts(DID1, pds);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -201,7 +236,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
events = deviceStore.updatePorts(DID1, pds2);
events = deviceStore.updatePorts(PID, DID1, pds2);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -224,7 +259,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
events = deviceStore.updatePorts(DID1, pds3);
events = deviceStore.updatePorts(PID, DID1, pds3);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -249,14 +284,42 @@ public class SimpleDeviceStoreTest {
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
}
@Test
public final void testUpdatePortStatusAncillary() {
putDeviceAncillary(DID1, SW1);
putDevice(DID1, SW1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(DID1,
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true));
assertNull("Ancillary is ignored if primary exists", event2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P2, true));
assertEquals(PORT_ADDED, event3.type());
assertDevice(DID1, SW1, event3.subject());
assertEquals(P2, event3.port().number());
assertFalse("Port is disabled if not given from provider", event3.port().isEnabled());
}
@Test
......@@ -267,7 +330,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
List<Port> ports = deviceStore.getPorts(DID1);
......@@ -290,7 +353,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, false)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Port port1 = deviceStore.getPort(DID1, P1);
assertEquals(P1, port1.number());
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.junit.Assert.*;
import static org.onlab.onos.net.DeviceId.deviceId;
......
package org.onlab.onos.store.trivial.impl;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import com.google.common.collect.Sets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.net.MastershipRole.*;
import static org.onlab.onos.cluster.MastershipEvent.Type.*;
/**
* Test for the simple MastershipStore implementation.
*/
public class SimpleMastershipStoreTest {
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
private static final DeviceId DID4 = DeviceId.deviceId("of:04");
private static final NodeId N1 = new NodeId("local");
private static final NodeId N2 = new NodeId("other");
private SimpleMastershipStore sms;
@Before
public void setUp() throws Exception {
sms = new SimpleMastershipStore();
sms.activate();
}
@After
public void tearDown() throws Exception {
sms.deactivate();
}
@Test
public void getRole() {
//special case, no backup or master
put(DID1, N1, false, false);
assertEquals("wrong role", NONE, sms.getRole(N1, DID1));
//backup exists but we aren't mapped
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
//N2 is master
put(DID3, N2, true, true);
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
//N2 is master but N1 is only in backups set
put(DID4, N2, true, false);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID4));
}
@Test
public void getMaster() {
put(DID3, N2, true, true);
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
assertEquals("wrong device", N2, sms.getMaster(DID3));
}
@Test
public void setMaster() {
put(DID1, N1, false, false);
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID1).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID1));
//set node that's already master - should be ignored
assertNull("wrong event", sms.setMaster(N1, DID1));
//set STANDBY to MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID2).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID2));
}
@Test
public void getDevices() {
Set<DeviceId> d = Sets.newHashSet(DID1, DID2);
put(DID1, N2, true, true);
put(DID2, N2, true, true);
put(DID3, N1, true, true);
assertTrue("wrong devices", d.equals(sms.getDevices(N2)));
}
@Test
public void getTermFor() {
put(DID1, N1, true, true);
assertEquals("wrong term", MastershipTerm.of(N1, 0), sms.getTermFor(DID1));
//switch to N2 and back - 2 term switches
sms.setMaster(N2, DID1);
sms.setMaster(N1, DID1);
assertEquals("wrong term", MastershipTerm.of(N1, 2), sms.getTermFor(DID1));
}
@Test
public void requestRole() {
//NONE - become MASTER
put(DID1, N1, false, false);
assertEquals("wrong role", MASTER, sms.requestRole(DID1));
//STANDBY without backup - become MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID2));
//STANDBY with backup - stay STANDBY
put(DID3, N2, false, true);
assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
//local (N1) is MASTER - stay MASTER
put(DID4, N1, true, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID4));
}
@Test
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.unsetMaster(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
sms.unsetMaster(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
assertNull("wrong event", sms.unsetMaster(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
}
//helper to populate master/backup structures
private void put(DeviceId dev, NodeId node, boolean store, boolean backup) {
if (store) {
sms.masterMap.put(dev, node);
}
if (backup) {
sms.backups.add(node);
}
sms.termMap.put(dev, new AtomicInteger());
}
}
......@@ -11,6 +11,7 @@
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
<bundle>mvn:com.codahale.metrics/metrics-core/3.0.2</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
<bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle>
......
......@@ -48,7 +48,7 @@ public final class DefaultOpenFlowPacketContext implements OpenFlowPacketContext
OFPacketOut.Builder builder = sw.factory().buildPacketOut();
OFAction act = buildOutput(outPort.getPortNumber());
pktout = builder.setXid(pktin.getXid())
.setInPort(pktin.getInPort())
.setInPort(inport())
.setBufferId(pktin.getBufferId())
.setActions(Collections.singletonList(act))
.build();
......@@ -63,7 +63,7 @@ public final class DefaultOpenFlowPacketContext implements OpenFlowPacketContext
OFAction act = buildOutput(outPort.getPortNumber());
pktout = builder.setXid(pktin.getXid())
.setBufferId(OFBufferId.NO_BUFFER)
.setInPort(pktin.getInPort())
.setInPort(inport())
.setActions(Collections.singletonList(act))
.setData(ethFrame.serialize())
.build();
......@@ -88,10 +88,16 @@ public final class DefaultOpenFlowPacketContext implements OpenFlowPacketContext
@Override
public Integer inPort() {
return inport().getPortNumber();
}
private OFPort inport() {
//FIXME: this has to change in fucking loxi
try {
return pktin.getInPort().getPortNumber();
return pktin.getInPort();
} catch (UnsupportedOperationException e) {
return pktin.getMatch().get(MatchField.IN_PORT).getPortNumber();
return pktin.getMatch().get(MatchField.IN_PORT);
}
}
......
......@@ -243,6 +243,8 @@ public abstract class AbstractOpenFlowSwitch implements OpenFlowSwitchDriver {
if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
this.role = role;
}
} else {
this.role = role;
}
} catch (IOException e) {
log.error("Unable to write to switch {}.", this.dpid);
......
......@@ -651,7 +651,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
* @param error The error message
*/
protected void logError(OFChannelHandler h, OFErrorMsg error) {
log.error("{} from switch {} in state {}",
log.info("{} from switch {} in state {}",
new Object[] {
error,
h.getSwitchInfoString(),
......
......@@ -6,6 +6,7 @@ import java.util.Collections;
import java.util.List;
import org.onlab.onos.openflow.controller.Dpid;
import org.onlab.onos.openflow.controller.RoleState;
import org.onlab.onos.openflow.controller.driver.AbstractOpenFlowSwitch;
import org.onlab.onos.openflow.controller.driver.OpenFlowSwitchDriver;
import org.onlab.onos.openflow.controller.driver.OpenFlowSwitchDriverFactory;
......@@ -61,6 +62,11 @@ public final class DriverManager implements OpenFlowSwitchDriverFactory {
return new AbstractOpenFlowSwitch(dpid, desc) {
@Override
public void setRole(RoleState state) {
this.role = RoleState.MASTER;
}
@Override
public void write(List<OFMessage> msgs) {
channel.write(msgs);
}
......
......@@ -425,7 +425,7 @@
<group>
<title>Core Subsystems</title>
<packages>
org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.net.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*
org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.store.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*
</packages>
</group>
<group>
......
......@@ -73,10 +73,11 @@ public class FlowModBuilder {
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
OFFlowMod fm = factory.buildFlowAdd()
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
.setIdleTimeout(10)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
......@@ -93,7 +94,7 @@ public class FlowModBuilder {
OFFlowMod fm = factory.buildFlowDelete()
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
//.setActions(actions)
.setActions(actions)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
......
......@@ -18,6 +18,7 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowRemovedReason;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
......@@ -70,14 +71,15 @@ public class FlowRuleBuilder {
buildSelector(), buildTreatment(), stat.getPriority(),
FlowRuleState.ADDED, stat.getDurationNsec() / 1000000,
stat.getPacketCount().getValue(), stat.getByteCount().getValue(),
stat.getCookie().getValue());
stat.getCookie().getValue(), false);
} else {
// TODO: revisit potentially.
return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), null, removed.getPriority(),
FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000,
removed.getPacketCount().getValue(), removed.getByteCount().getValue(),
removed.getCookie().getValue());
removed.getCookie().getValue(),
removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal());
}
}
......
......@@ -158,7 +158,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
case BARRIER_REPLY:
case ERROR:
default:
log.warn("Unhandled message type: {}", msg.getType());
log.debug("Unhandled message type: {}", msg.getType());
}
}
......
......@@ -131,7 +131,7 @@ public class LinkDiscovery implements TimerTask {
}
timeout = Timer.getTimer().newTimeout(this, 0,
TimeUnit.MILLISECONDS);
this.log.debug("Started discovery manager for switch {}",
this.log.info("Started discovery manager for switch {}",
sw.getId());
}
......
package org.onlab.onos.provider.of.packet.impl;
import static org.onlab.onos.openflow.controller.RoleState.SLAVE;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
......@@ -95,9 +94,6 @@ public class OpenFlowPacketProvider extends AbstractProvider implements PacketPr
if (sw == null) {
log.warn("Device {} isn't available?", devId);
return;
} else if (sw.getRole().equals(SLAVE)) {
log.warn("Can't write to Device {} as slave", devId);
return;
}
Ethernet eth = new Ethernet();
......
......@@ -140,12 +140,12 @@ public class OpenFlowPacketProviderTest {
sw.sent.clear();
//wrong Role
sw.setRole(RoleState.SLAVE);
provider.emit(passPkt);
assertEquals("invalid switch", sw, controller.current);
assertEquals("message sent incorrectly", 0, sw.sent.size());
//sw.setRole(RoleState.SLAVE);
//provider.emit(passPkt);
//assertEquals("invalid switch", sw, controller.current);
//assertEquals("message sent incorrectly", 0, sw.sent.size());
sw.setRole(RoleState.MASTER);
//sw.setRole(RoleState.MASTER);
//missing switch
OutboundPacket swFailPkt = outPacket(DID_MISSING, TR, eth);
......
......@@ -34,6 +34,8 @@ cp -r $ONOS_ROOT/tools/package/etc/* $KARAF_DIST/etc
mkdir -p $KARAF_DIST/system/org/onlab
cp -r $M2_REPO/org/onlab $KARAF_DIST/system/org/
export ONOS_FEATURES="${ONOS_FEATURES:-webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo}"
# Cellar Patching --------------------------------------------------------------
# Patch the Apache Karaf distribution file to add Cellar features repository
......@@ -51,7 +53,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution file to load ONOS features
perl -pi.old -e 's|^(featuresBoot=.*)|\1,webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo|' \
perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution with ONOS branding bundle
......
......@@ -9,5 +9,10 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
onos-package
for node in $nodes; do (printf "%s: %s\n" "$node" "`onos-install -f $node`")& done
for node in $nodes; do onos-install -f $node 1>/dev/null & done
# Wait for shutdown before waiting for restart
sleep 3
for node in $nodes; do onos-wait-for-start $node; done
for node in $nodes; do onos-check-logs $node; done
......
......@@ -6,22 +6,21 @@
export ONOS_ROOT=${ONOS_ROOT:-~/onos-next}
# Setup some environmental context for developers
export JAVA_HOME=$(/usr/libexec/java_home)
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
export MAVEN=${MAVEN:-~/Applications/apache-maven-3.2.2}
export KARAF=${KARAF:-~/Applications/apache-karaf-3.0.1}
export KARAF_LOG=$KARAF/data/log/karaf.log
# Setup a path
export PS=":"
export PATH="$PATH:$ONOS_ROOT/tools/dev:$ONOS_ROOT/tools/build"
export PATH="$PATH:$ONOS_ROOT/tools/test/bin"
export PATH="$PATH:$ONOS_ROOT/tools/dev/bin:$ONOS_ROOT/tools/test/bin"
export PATH="$PATH:$ONOS_ROOT/tools/build"
export PATH="$PATH:$MAVEN/bin:$KARAF/bin"
export PATH="$PATH:."
# Convenience utility to warp to various ONOS source projects
# e.g. 'o api', 'o dev', 'o'
function o {
cd $(find $ONOS_ROOT/ -type d | egrep -v '\.git|target|src' | \
cd $(find $ONOS_ROOT/ -type d | egrep -v '\.git|target' | \
egrep "${1:-$ONOS_ROOT}" | head -n 1)
}
......@@ -30,11 +29,12 @@ alias mci='mvn clean install'
# Short-hand for ONOS build, package and test.
alias ob='onos-build'
alias obs='onos-build-selective'
alias op='onos-package'
alias ot='onos-test'
# Short-hand for tailing the ONOS (karaf) log
alias tl='$ONOS_ROOT/tools/dev/watchLog'
alias tl='$ONOS_ROOT/tools/dev/bin/onos-local-log'
alias tlo='tl | grep --colour=always org.onlab'
# Pretty-print JSON output
......@@ -57,6 +57,7 @@ function cell {
if [ -n "$1" ]; then
[ ! -f $ONOS_ROOT/tools/test/cells/$1 ] && \
echo "No such cell: $1" >&2 && return 1
unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN OCI
. $ONOS_ROOT/tools/test/cells/$1
export OCI=$OC1
export ONOS_CELL=$1
......@@ -66,6 +67,7 @@ function cell {
env | egrep "OCI"
env | egrep "OC[0-9]+" | sort
env | egrep "OCN"
env | egrep "ONOS_" | egrep -v 'ONOS_ROOT|ONOS_CELL'
fi
}
......@@ -73,7 +75,11 @@ cell local >/dev/null # Default cell is the local VMs
# Lists available cells
function cells {
ls -1 $ONOS_ROOT/tools/test/cells
for cell in $(ls -1 $ONOS_ROOT/tools/test/cells); do
printf "%-12s %s\n" \
"$([ $cell = $ONOS_CELL ] && echo $cell '*' || echo $cell)" \
"$(grep '^#' $ONOS_ROOT/tools/test/cells/$cell | head -n 1)"
done
}
# Miscellaneous
......
#!/bin/bash
#------------------------------------------------------------------------------
# Selectively builds only those projects that contained modified Java files.
#------------------------------------------------------------------------------
projects=$(find $ONOS_ROOT -name '*.java' \
-not -path '*/openflowj/*' -and -not -path '.git/*' \
-exec $ONOS_ROOT/tools/dev/bin/onos-build-selective-hook {} \; | \
sort -u | sed "s:$ONOS_ROOT::g" | tr '\n' ',' | \
sed 's:/,:,:g;s:,/:,:g;s:^/::g;s:,$::g')
[ -n "$projects" ] && cd $ONOS_ROOT && mvn --projects $projects ${@:-clean install}
\ No newline at end of file
#------------------------------------------------------------------------------
# Echoes project-level directory if a Java file within is newer than its
# class file counterpart
#------------------------------------------------------------------------------
javaFile=${1#*\/src\/*\/java/}
basename=${1/*\//}
[ $basename = "package-info.java" ] && exit 0
src=${1/$javaFile/}
project=${src/src*/}
classFile=${javaFile/.java/.class}
[ ${project}target/classes/$classFile -nt ${src}$javaFile -o \
${project}target/test-classes/$classFile -nt ${src}$javaFile ] \
|| echo ${src/src*/}
unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC
# Default virtual box ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
# Local VirtualBox-based ONOS instances 1,2 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OC2="192.168.56.102"
......
# ProxMox-based cell of ONOS instance; no mininet-box
export ONOS_FEATURES="webconsole,onos-api,onos-core-trivial,onos-cli,onos-openflow,onos-app-fwd,onos-app-mobility,onos-app-tvue"
export ONOS_NIC="10.128.4.*"
export OC1="10.128.4.60"
# ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
export ONOS_NIC="10.1.9.*"
export OC1="10.1.9.94"
export OC2="10.1.9.82"
......
# Local VirtualBox-based single ONOS instance & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OCN="192.168.56.103"
# Default virtual box ONOS instances 1,2 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.11"
export OC2="192.168.56.12"
export OCN="192.168.56.7"
# Local VirtualBox-based ONOS instances 1,2,3 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OC2="192.168.56.102"
export OC3="192.168.56.104"
export OCN="192.168.56.103"
......@@ -55,6 +55,11 @@
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
</project>
......
package org.onlab.metrics;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Components to register for metrics.
*/
public class MetricsComponent implements MetricsComponentRegistry {
private final String name;
/**
* Registry to hold the Features defined in this Component.
*/
private final ConcurrentMap<String, MetricsFeature> featuresRegistry =
new ConcurrentHashMap<>();
/**
* Constructs a component from a name.
*
* @param newName name of the component
*/
MetricsComponent(final String newName) {
name = newName;
}
@Override public String getName() {
return name;
}
@Override public MetricsFeature registerFeature(final String featureName) {
MetricsFeature feature = featuresRegistry.get(featureName);
if (feature == null) {
final MetricsFeature createdFeature = new MetricsFeature(featureName);
feature = featuresRegistry.putIfAbsent(featureName, createdFeature);
if (feature == null) {
feature = createdFeature;
}
}
return feature;
}
}
package org.onlab.metrics;
/**
* Registry Entry for Metrics Components.
*/
public interface MetricsComponentRegistry {
String getName();
MetricsFeature registerFeature(String featureName);
}
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.