Madan Jampani

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 56 changed files with 2014 additions and 361 deletions
......@@ -14,6 +14,6 @@ public class NettyLoggingHandler implements MessageHandler {
@Override
public void handle(Message message) {
log.info("Received message. Payload has {} bytes", message.payload().length);
//log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
......
......@@ -10,12 +10,17 @@ import org.onlab.metrics.MetricsManager;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleNettyClient {
private SimpleNettyClient() {
private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
private SimpleNettyClient() {
}
public static void main(String[] args)
......@@ -29,30 +34,33 @@ public final class SimpleNettyClient {
System.exit(0);
}
public static void startStandalone(String... args) throws Exception {
public static void startStandalone(String[] args) throws Exception {
String host = args.length > 0 ? args[0] : "localhost";
int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
Endpoint endpoint = new Endpoint(host, port);
messaging.activate();
metrics.activate();
MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
log.info("connecting " + host + ":" + port + " warmup:" + warmup + " iterations:" + iterations);
for (int i = 0; i < warmup; i++) {
messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
Response response = messaging
.sendAndReceive(new Endpoint(host, port), "echo",
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
}
log.info("measuring async sender");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
......@@ -60,11 +68,12 @@ public final class SimpleNettyClient {
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(new Endpoint(host, port), "echo",
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.deactivate();
}
public static class TestNettyMessagingService extends NettyMessagingService {
......
......@@ -16,27 +16,26 @@ public class SimpleNettyClientCommand extends AbstractShellCommand {
//FIXME: replace these arguments with proper ones needed for the test.
@Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
String host = "localhost";
String hostname = "localhost";
@Argument(index = 3, name = "port", description = "Port",
@Argument(index = 1, name = "port", description = "Port",
required = false, multiValued = false)
String port = "8081";
@Argument(index = 1, name = "warmupCount", description = "Warm-up count",
@Argument(index = 2, name = "warmupCount", description = "Warm-up count",
required = false, multiValued = false)
String warmup = "1000";
String warmupCount = "1000";
@Argument(index = 2, name = "messageCount", description = "Message count",
@Argument(index = 3, name = "messageCount", description = "Message count",
required = false, multiValued = false)
String messageCount = "5000000";
String messageCount = "100000";
@Override
protected void execute() {
try {
startStandalone(new String[]{host, port, warmup, messageCount});
startStandalone(new String[]{hostname, port, warmupCount, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
}
}
......
......@@ -10,6 +10,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.Path;
......@@ -53,13 +54,16 @@ public class ReactiveForwarding {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
appId = coreService.registerApplication("org.onlab.onos.fwd");
packetService.addProcessor(processor, PacketProcessor.ADVISOR_MAX + 2);
log.info("Started with Application ID {}", appId.id());
}
......
......@@ -10,6 +10,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.device.DeviceService;
......@@ -44,11 +45,14 @@ public class HostMobility {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
appId = coreService.registerApplication("org.onlab.onos.mobility");
hostService.addListener(new InternalHostListener());
log.info("Started with Application ID {}", appId.id());
}
......
......@@ -8,6 +8,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
......@@ -31,11 +32,14 @@ public class ProxyArp {
private ProxyArpProcessor processor = new ProxyArpProcessor();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
appId = coreService.registerApplication("org.onlab.onos.proxyarp");
packetService.addProcessor(processor, PacketProcessor.ADVISOR_MAX + 1);
log.info("Started with Application ID {}", appId.id());
}
......
......@@ -27,7 +27,7 @@ public class AddHostToHostIntentCommand extends AbstractShellCommand {
required = true, multiValued = false)
String two = null;
private static long id = 1;
private static long id = 0x7870001;
@Override
protected void execute() {
......
......@@ -14,6 +14,7 @@ import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.packet.Ethernet;
/**
* Installs point-to-point connectivity intents.
......@@ -32,7 +33,7 @@ public class AddPointToPointIntentCommand extends AbstractShellCommand {
required = true, multiValued = false)
String egressDeviceString = null;
private static long id = 1;
private static long id = 0x7470001;
@Override
protected void execute() {
......@@ -48,7 +49,9 @@ public class AddPointToPointIntentCommand extends AbstractShellCommand {
PortNumber.portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
Intent intent =
......
package org.onlab.onos.cli.net;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.CoreService;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.cli.Comparators;
import org.onlab.onos.net.Device;
......@@ -18,37 +13,43 @@ import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRuleService;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
/**
* Lists all currently-known hosts.
*/
@Command(scope = "onos", name = "flows",
description = "Lists all currently-known flows.")
description = "Lists all currently-known flows.")
public class FlowsListCommand extends AbstractShellCommand {
public static final String ANY = "any";
private static final String FMT =
" id=%s, state=%s, bytes=%s, packets=%s, duration=%s, priority=%s";
" id=%s, state=%s, bytes=%s, packets=%s, duration=%s, priority=%s, appId=%s";
private static final String TFMT = " treatment=%s";
private static final String SFMT = " selector=%s";
@Argument(index = 1, name = "uri", description = "Device ID",
required = false, multiValued = false)
required = false, multiValued = false)
String uri = null;
@Argument(index = 0, name = "state", description = "Flow Rule state",
required = false, multiValued = false)
required = false, multiValued = false)
String state = null;
@Override
protected void execute() {
CoreService coreService = get(CoreService.class);
DeviceService deviceService = get(DeviceService.class);
FlowRuleService service = get(FlowRuleService.class);
Map<Device, List<FlowEntry>> flows = getSortedFlows(deviceService, service);
for (Device d : getSortedDevices(deviceService)) {
printFlows(d, flows.get(d));
printFlows(d, flows.get(d), coreService);
}
}
......@@ -67,7 +68,7 @@ public class FlowsListCommand extends AbstractShellCommand {
s = FlowEntryState.valueOf(state.toUpperCase());
}
Iterable<Device> devices = uri == null ? deviceService.getDevices() :
Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
for (Device d : devices) {
if (s == null) {
rules = newArrayList(service.getFlowEntries(d.id()));
......@@ -87,16 +88,19 @@ public class FlowsListCommand extends AbstractShellCommand {
/**
* Prints flows.
* @param d the device
*
* @param d the device
* @param flows the set of flows for that device.
*/
protected void printFlows(Device d, List<FlowEntry> flows) {
protected void printFlows(Device d, List<FlowEntry> flows,
CoreService coreService) {
boolean empty = flows == null || flows.isEmpty();
print("deviceId=%s, flowRuleCount=%d", d.id(), empty ? 0 : flows.size());
if (!empty) {
for (FlowEntry f : flows) {
print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
f.packets(), f.life(), f.priority());
print(FMT, Long.toHexString(f.id().value()), f.state(),
f.bytes(), f.packets(), f.life(), f.priority(),
coreService.getAppId(f.appId()).name());
print(SFMT, f.selector().criteria());
print(TFMT, f.treatment().instructions());
}
......
package org.onlab.onos;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Application id generator class.
* Application identifier.
*/
public final class ApplicationId {
public interface ApplicationId {
private static final AtomicInteger ID_DISPENCER = new AtomicInteger(1);
private final Integer id;
// Ban public construction
private ApplicationId(Integer id) {
this.id = id;
}
public Integer id() {
return id;
}
public static ApplicationId valueOf(Integer id) {
return new ApplicationId(id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof ApplicationId)) {
return false;
}
ApplicationId other = (ApplicationId) obj;
return Objects.equals(this.id, other.id);
}
/**
* Returns the application id.
* @return a short value
*/
short id();
/**
* Returns a new application id.
*
* @return app id
* Returns the applications supplied identifier.
* @return a string identifier
*/
public static ApplicationId getAppId() {
return new ApplicationId(ApplicationId.ID_DISPENCER.getAndIncrement());
}
String name();
}
......
......@@ -12,4 +12,21 @@ public interface CoreService {
*/
Version version();
/**
* Registers a new application by its name, which is expected
* to follow the reverse DNS convention, e.g.
* {@code org.flying.circus.app}
*
* @param identifier string identifier
* @return the application id
*/
ApplicationId registerApplication(String identifier);
/**
* Returns an existing application id from a given id.
* @param id the short value of the id
* @return an application id
*/
ApplicationId getAppId(Short id);
}
......
......@@ -34,7 +34,7 @@ public interface MastershipService {
/**
* Abandons mastership of the specified device on the local node thus
* forcing selection of a new master. If the local node is not a master
* for this device, no action will be taken.
* for this device, no master selection will occur.
*
* @param deviceId the identifier of the device
*/
......
......@@ -66,12 +66,25 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
MastershipTerm getTermFor(DeviceId deviceId);
/**
* Revokes a controller instance's mastership over a device and hands
* over mastership to another controller instance.
* Sets a controller instance's mastership role to STANDBY for a device.
* If the role is MASTER, another controller instance will be selected
* as a candidate master.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership for
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
/**
* Allows a controller instance to give up its current role for a device.
* If the role is MASTER, another controller instance will be selected
* as a candidate master.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
}
......
......@@ -42,6 +42,7 @@ public interface DeviceService {
* @param deviceId device identifier
* @return designated mastership role
*/
//XXX do we want this method here when MastershipService already does?
MastershipRole getRole(DeviceId deviceId);
......
package org.onlab.onos.net.flow;
import java.util.List;
/**
* Interface capturing the result of a batch operation.
*
*/
public interface BatchOperationResult<T> {
/**
* Returns whether the operation was successful.
* @return true if successful, false otherwise
*/
boolean isSuccess();
/**
* Obtains a list of items which failed.
* @return a list of failures
*/
List<T> failedItems();
}
package org.onlab.onos.net.flow;
public class CompletedBatchOperation {
import java.util.List;
import com.google.common.collect.ImmutableList;
public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
private final boolean success;
private final List<FlowEntry> failures;
public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
this.success = success;
this.failures = ImmutableList.copyOf(failures);
}
@Override
public boolean isSuccess() {
return success;
}
@Override
public List<FlowEntry> failedItems() {
return failures;
}
}
......
......@@ -17,6 +17,10 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
private long lastSeen = -1;
private final int errType;
private final int errCode;
public DefaultFlowEntry(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowEntryState state,
......@@ -27,6 +31,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = life;
this.packets = packets;
this.bytes = bytes;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
......@@ -37,6 +43,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = life;
this.packets = packets;
this.bytes = bytes;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
......@@ -46,9 +54,18 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = 0;
this.packets = 0;
this.bytes = 0;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
public DefaultFlowEntry(FlowRule rule, int errType, int errCode) {
super(rule);
this.state = FlowEntryState.FAILED;
this.errType = errType;
this.errCode = errCode;
}
@Override
public long life() {
return life;
......@@ -100,6 +117,16 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
@Override
public int errType() {
return this.errType;
}
@Override
public int errCode() {
return this.errCode;
}
@Override
public String toString() {
return toStringHelper(this)
.add("rule", super.toString())
......@@ -108,4 +135,6 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
}
......
......@@ -21,7 +21,7 @@ public class DefaultFlowRule implements FlowRule {
private final FlowId id;
private final ApplicationId appId;
private final short appId;
private final int timeout;
......@@ -36,7 +36,7 @@ public class DefaultFlowRule implements FlowRule {
this.timeout = timeout;
this.created = System.currentTimeMillis();
this.appId = ApplicationId.valueOf((int) (flowId >> 32));
this.appId = (short) (flowId >>> 48);
this.id = FlowId.valueOf(flowId);
}
......@@ -52,11 +52,11 @@ public class DefaultFlowRule implements FlowRule {
this.priority = priority;
this.selector = selector;
this.treatment = treatement;
this.appId = appId;
this.appId = appId.id();
this.timeout = timeout;
this.created = System.currentTimeMillis();
this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
this.id = FlowId.valueOf((((long) this.appId) << 48) | (this.hash() & 0x0000ffffffffL));
}
public DefaultFlowRule(FlowRule rule) {
......@@ -78,7 +78,7 @@ public class DefaultFlowRule implements FlowRule {
}
@Override
public ApplicationId appId() {
public short appId() {
return appId;
}
......
......@@ -140,6 +140,16 @@ public final class DefaultTrafficSelector implements TrafficSelector {
}
@Override
public Builder matchTcpSrc(Short tcpPort) {
return add(Criteria.matchTcpSrc(tcpPort));
}
@Override
public Builder matchTcpDst(Short tcpPort) {
return add(Criteria.matchTcpDst(tcpPort));
}
@Override
public TrafficSelector build() {
return new DefaultTrafficSelector(ImmutableSet.copyOf(selector.values()));
}
......
......@@ -29,7 +29,12 @@ public interface FlowEntry extends FlowRule {
/**
* Flow has been removed from flow table and can be purged.
*/
REMOVED
REMOVED,
/**
* Indicates that the installation of this flow has failed.
*/
FAILED
}
/**
......@@ -95,4 +100,16 @@ public interface FlowEntry extends FlowRule {
*/
void setBytes(long bytes);
/**
* Indicates the error type.
* @return an integer value of the error
*/
int errType();
/**
* Indicates the error code.
* @return an integer value of the error
*/
int errCode();
}
......
package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.intent.BatchOperationTarget;
......@@ -26,7 +25,7 @@ public interface FlowRule extends BatchOperationTarget {
*
* @return an applicationId
*/
ApplicationId appId();
short appId();
/**
* Returns the flow rule priority given in natural order; higher numbers
......
......@@ -37,6 +37,12 @@ public interface FlowRuleProvider extends Provider {
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
/**
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
......@@ -98,6 +98,20 @@ public interface TrafficSelector {
public Builder matchIPDst(IpPrefix ip);
/**
* Matches a TCP source port number.
* @param tcpPort a TCP source port number
* @return a selection builder
*/
public Builder matchTcpSrc(Short tcpPort);
/**
* Matches a TCP destination port number.
* @param tcpPort a TCP destination port number
* @return a selection builder
*/
public Builder matchTcpDst(Short tcpPort);
/**
* Builds an immutable traffic selector.
*
* @return traffic selector
......
......@@ -113,6 +113,25 @@ public final class Criteria {
return new IPCriterion(ip, Type.IPV4_DST);
}
/**
* Creates a match on TCP source port field using the specified value.
*
* @param tcpPort
* @return match criterion
*/
public static Criterion matchTcpSrc(Short tcpPort) {
return new TcpPortCriterion(tcpPort, Type.TCP_SRC);
}
/**
* Creates a match on TCP destination port field using the specified value.
*
* @param tcpPort
* @return match criterion
*/
public static Criterion matchTcpDst(Short tcpPort) {
return new TcpPortCriterion(tcpPort, Type.TCP_DST);
}
/*
* Implementations of criteria.
......@@ -437,4 +456,49 @@ public final class Criteria {
}
public static final class TcpPortCriterion implements Criterion {
private final Short tcpPort;
private final Type type;
public TcpPortCriterion(Short tcpPort, Type type) {
this.tcpPort = tcpPort;
this.type = type;
}
@Override
public Type type() {
return this.type;
}
public Short tcpPort() {
return this.tcpPort;
}
@Override
public String toString() {
return toStringHelper(type().toString())
.add("tcpPort", tcpPort).toString();
}
@Override
public int hashCode() {
return Objects.hash(tcpPort, type);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof TcpPortCriterion) {
TcpPortCriterion that = (TcpPortCriterion) obj;
return Objects.equals(tcpPort, that.tcpPort) &&
Objects.equals(type, that.type);
}
return false;
}
}
}
......
package org.onlab.onos.net.intent;
import java.util.concurrent.Future;
import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Abstraction of entity capable of installing intents to the environment.
*/
......@@ -10,7 +14,7 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
void install(T intent);
Future<CompletedBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
......@@ -18,5 +22,5 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
void uninstall(T intent);
Future<CompletedBatchOperation> uninstall(T intent);
}
......
......@@ -33,6 +33,8 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
/**
* Returns the number of intents in the store.
*
* @return the number of intents in the store
*/
long getIntentCount();
......@@ -44,7 +46,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
Iterable<Intent> getIntents();
/**
* Returns the intent with the specified identifer.
* Returns the intent with the specified identifier.
*
* @param intentId intent identification
* @return intent or null if not found
......@@ -94,7 +96,6 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* specified original intent.
*
* @param intentId original intent identifier
* @return compiled state transition event
*/
void removeInstalledIntents(IntentId intentId);
......
......@@ -53,4 +53,4 @@
* while the system determines where to perform the compilation or while it
* performs global recomputation/optimization across all prior intents.
*/
package org.onlab.onos.net.intent;
\ No newline at end of file
package org.onlab.onos.net.intent;
......
package org.onlab.onos.net.intent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.onlab.onos.net.intent.IntentEvent.Type.FAILED;
import static org.onlab.onos.net.intent.IntentEvent.Type.INSTALLED;
import static org.onlab.onos.net.intent.IntentEvent.Type.SUBMITTED;
import static org.onlab.onos.net.intent.IntentEvent.Type.WITHDRAWN;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import static org.junit.Assert.*;
import static org.onlab.onos.net.intent.IntentEvent.Type.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Suite of tests for the intent service contract.
......@@ -290,17 +298,19 @@ public class IntentServiceTest {
}
@Override
public void install(TestInstallableIntent intent) {
public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
return null;
}
@Override
public void uninstall(TestInstallableIntent intent) {
public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
return null;
}
}
......
......@@ -82,7 +82,7 @@ implements MastershipService, MastershipAdminService {
if (role.equals(MastershipRole.MASTER)) {
event = store.setMaster(nodeId, deviceId);
} else {
event = store.unsetMaster(nodeId, deviceId);
event = store.setStandby(nodeId, deviceId);
}
if (event != null) {
......@@ -98,13 +98,10 @@ implements MastershipService, MastershipAdminService {
@Override
public void relinquishMastership(DeviceId deviceId) {
MastershipRole role = getLocalRole(deviceId);
if (!role.equals(MastershipRole.MASTER)) {
return;
}
MastershipEvent event = store.unsetMaster(
MastershipEvent event = null;
event = store.relinquishRole(
clusterService.getLocalNode().id(), deviceId);
if (event != null) {
post(event);
}
......
package org.onlab.onos.cluster.impl;
package org.onlab.onos.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.Version;
import org.onlab.util.Tools;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Core service implementation.
......@@ -17,9 +21,13 @@ import java.util.List;
@Service
public class CoreManager implements CoreService {
private static final AtomicInteger ID_DISPENSER = new AtomicInteger(1);
private static final File VERSION_FILE = new File("../VERSION");
private static Version version = Version.version("1.0.0-SNAPSHOT");
private final Map<Short, DefaultApplicationId> appIds = new ConcurrentHashMap<>();
// TODO: work in progress
@Activate
......@@ -35,4 +43,17 @@ public class CoreManager implements CoreService {
return version;
}
@Override
public ApplicationId getAppId(Short id) {
return appIds.get(id);
}
@Override
public ApplicationId registerApplication(String name) {
short id = (short) ID_DISPENSER.getAndIncrement();
DefaultApplicationId appId = new DefaultApplicationId(id, name);
appIds.put(id, appId);
return appId;
}
}
......
package org.onlab.onos.impl;
import org.onlab.onos.ApplicationId;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Application id generator class.
*/
public class DefaultApplicationId implements ApplicationId {
private final short id;
private final String name;
// Ban public construction
protected DefaultApplicationId(Short id, String identifier) {
this.id = id;
this.name = identifier;
}
@Override
public short id() {
return id;
}
@Override
public String name() {
return name;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultApplicationId) {
DefaultApplicationId other = (DefaultApplicationId) obj;
return Objects.equals(this.id, other.id);
}
return false;
}
@Override
public String toString() {
return toStringHelper(this).add("id", id).add("name", name).toString();
}
}
/**
*
*/
package org.onlab.onos.impl;
\ No newline at end of file
......@@ -143,7 +143,7 @@ public class DeviceManager
// Applies the specified role to the device; ignores NONE
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
if (newRole != MastershipRole.NONE) {
if (newRole.equals(MastershipRole.NONE)) {
Device device = store.getDevice(deviceId);
// FIXME: Device might not be there yet. (eventual consistent)
if (device == null) {
......@@ -257,13 +257,14 @@ public class DeviceManager
// temporarily request for Master Role and mark offline.
if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
log.debug("Device {} disconnected, but I am not the master", deviceId);
//let go of any role anyways
mastershipService.relinquishMastership(deviceId);
return;
}
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of being master or a candidate.
mastershipService.relinquishMastership(deviceId);
//we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
post(event);
......@@ -319,24 +320,29 @@ public class DeviceManager
}
// Intercepts mastership events
private class InternalMastershipListener
implements MastershipListener {
private class InternalMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
final NodeId myNodeId = clusterService.getLocalNode().id();
if (myNodeId.equals(event.master())) {
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
if (term.master().equals(myNodeId)) {
// only set the new term if I am the master
clockProviderService.setMastershipTerm(event.subject(), term);
final DeviceId did = event.subject();
if (isAvailable(did)) {
final NodeId myNodeId = clusterService.getLocalNode().id();
if (myNodeId.equals(event.master())) {
MastershipTerm term = termService.getMastershipTerm(did);
if (term.master().equals(myNodeId)) {
// only set the new term if I am the master
clockProviderService.setMastershipTerm(did, term);
}
applyRole(did, MastershipRole.MASTER);
} else {
applyRole(did, MastershipRole.STANDBY);
}
applyRole(event.subject(), MastershipRole.MASTER);
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
//device dead to node, give up
mastershipService.relinquishMastership(did);
applyRole(did, MastershipRole.STANDBY);
}
}
}
......
......@@ -5,10 +5,12 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -26,6 +28,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
......@@ -52,6 +55,8 @@ public class FlowRuleManager
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
enum BatchState { STARTED, FINISHED, CANCELLED };
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
......@@ -144,7 +149,7 @@ public class FlowRuleManager
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
List<Future<Void>> futures = Lists.newArrayList();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
......@@ -165,10 +170,10 @@ public class FlowRuleManager
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
Future<Void> future = provider.executeBatch(b);
Future<CompletedBatchOperation> future = provider.executeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures);
return new FlowRuleBatchFuture(futures, batches);
}
@Override
......@@ -341,59 +346,140 @@ public class FlowRuleManager
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
private final List<Future<Void>> futures;
private final List<Future<CompletedBatchOperation>> futures;
private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
public FlowRuleBatchFuture(List<Future<Void>> futures) {
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
state = new AtomicReference<FlowRuleManager.BatchState>();
state.set(BatchState.STARTED);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
if (state.get() == BatchState.FINISHED) {
return false;
}
if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
return false;
}
cleanUpBatch();
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(mayInterruptIfRunning);
}
return true;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
return state.get() == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
boolean isDone = true;
for (Future<Void> future : futures) {
isDone &= future.isDone();
}
return isDone;
return state.get() == BatchState.FINISHED;
}
@Override
public CompletedBatchOperation get() throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
for (Future<Void> future : futures) {
future.get();
ExecutionException {
if (isDone()) {
return overall;
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
success = validateBatchOperation(failed, completed, future);
}
return new CompletedBatchOperation();
return finalizeBatchOperation(success, failed);
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
// TODO we should decrement the timeout
if (isDone()) {
return overall;
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
for (Future<Void> future : futures) {
for (Future<CompletedBatchOperation> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
future.get(thisTimeout, TimeUnit.NANOSECONDS);
completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
success = validateBatchOperation(failed, completed, future);
}
return new CompletedBatchOperation();
return finalizeBatchOperation(success, failed);
}
private boolean validateBatchOperation(List<FlowEntry> failed,
CompletedBatchOperation completed,
Future<CompletedBatchOperation> future) {
if (isCancelled()) {
throw new CancellationException();
}
if (!completed.isSuccess()) {
failed.addAll(completed.failedItems());
cleanUpBatch();
cancelAllSubBatches();
return false;
}
return true;
}
private void cancelAllSubBatches() {
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(true);
}
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
List<FlowEntry> failed) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
return overall;
}
throw new CancellationException();
}
overall = new CompletedBatchOperation(success, failed);
return overall;
}
}
private void cleanUpBatch() {
for (FlowRuleBatchEntry fbe : batches.values()) {
if (fbe.getOperator() == FlowRuleOperation.ADD ||
fbe.getOperator() == FlowRuleOperation.MODIFY) {
store.deleteFlowRule(fbe.getTarget());
} else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
store.storeFlowRule(fbe.getTarget());
}
}
}
}
}
......
......@@ -13,12 +13,17 @@ import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -28,6 +33,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
......@@ -44,7 +50,9 @@ import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
......@@ -67,7 +75,8 @@ public class IntentManager
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
private ExecutorService executor;
private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
......@@ -86,6 +95,8 @@ public class IntentManager
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newSingleThreadExecutor(namedThreads("onos-intents"));
monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
......@@ -94,6 +105,8 @@ public class IntentManager
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
executor.shutdown();
monitorExecutor.shutdown();
log.info("Stopped");
}
......@@ -240,14 +253,23 @@ public class IntentManager
}
}
// FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
// TODO: implement compilation traversing tree structure
/**
* Compiles an intent recursively.
*
* @param intent intent
* @return result of compilation
*/
private List<InstallableIntent> compileIntent(Intent intent) {
if (intent instanceof InstallableIntent) {
return ImmutableList.of((InstallableIntent) intent);
}
List<InstallableIntent> installable = new ArrayList<>();
// TODO do we need to registerSubclassCompiler?
for (Intent compiled : getCompiler(intent).compile(intent)) {
InstallableIntent installableIntent = (InstallableIntent) compiled;
installable.add(installableIntent);
installable.addAll(compileIntent(compiled));
}
return installable;
}
......@@ -261,6 +283,7 @@ public class IntentManager
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
......@@ -268,17 +291,20 @@ public class IntentManager
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
getInstaller(installable).install(installable);
Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
installFutures.add(future);
}
}
eventDispatcher.post(store.setState(intent, INSTALLED));
// FIXME we have to wait for the installable intents
//eventDispatcher.post(store.setState(intent, INSTALLED));
monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
uninstallIntent(intent);
uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
executeRecompilingPhase(intent);
// FIXME
//executeRecompilingPhase(intent);
}
}
......@@ -327,12 +353,14 @@ public class IntentManager
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
uninstallIntent(intent);
uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
store.removeInstalledIntents(intent.id());
eventDispatcher.post(store.setState(intent, WITHDRAWN));
// FIXME need to clean up
//store.removeInstalledIntents(intent.id());
// FIXME
//eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
......@@ -340,14 +368,17 @@ public class IntentManager
*
* @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent) {
private void uninstallIntent(Intent intent, IntentState nextState) {
List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
getInstaller(installable).uninstall(installable);
Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
uninstallFutures.add(future);
}
}
monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
......@@ -422,9 +453,10 @@ public class IntentManager
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
uninstallIntent(intent);
uninstallIntent(intent, RECOMPILING);
executeRecompilingPhase(intent);
//FIXME
//executeRecompilingPhase(intent);
}
if (compileAllFailed) {
......@@ -460,4 +492,49 @@ public class IntentManager
}
}
private class IntentInstallMonitor implements Runnable {
private final Intent intent;
private final List<Future<CompletedBatchOperation>> futures;
private final IntentState nextState;
public IntentInstallMonitor(Intent intent,
List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
this.intent = intent;
this.futures = futures;
this.nextState = nextState;
}
private void updateIntent(Intent intent) {
if (nextState == RECOMPILING) {
executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
eventDispatcher.post(store.setState(intent, nextState));
} else {
log.warn("Invalid next intent state {} for intent {}", nextState, intent);
}
}
@Override
public void run() {
for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
Future<CompletedBatchOperation> future = i.next();
try {
// TODO: we may want to get the future here and go back to the future.
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
// TODO check if future succeeded and if not report fail items
i.remove();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
log.debug("Intallations of intent {} is still pending", intent);
}
}
if (futures.isEmpty()) {
updateIntent(intent);
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
}
}
}
}
......
......@@ -5,7 +5,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -13,8 +13,10 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
......@@ -45,10 +47,14 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
private final ApplicationId appId = ApplicationId.getAppId();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@Activate
public void activate() {
appId = coreService.registerApplication("org.onlab.onos.net.intent");
intentManager.registerInstaller(PathIntent.class, this);
}
......@@ -57,8 +63,26 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
intentManager.unregisterInstaller(PathIntent.class);
}
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
return future;
// try {
// //FIXME don't do this here
// future.get();
// } catch (InterruptedException | ExecutionException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
@Override
public void install(PathIntent intent) {
public Future<CompletedBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
......@@ -74,20 +98,14 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
//flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return applyBatch(rules);
}
@Override
public void uninstall(PathIntent intent) {
public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
......@@ -103,15 +121,131 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
//flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return applyBatch(rules);
}
// TODO refactor below this line... ----------------------------
/**
* Generates the series of MatchActionOperations from the
* {@link FlowBatchOperation}.
* <p>
* FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
* <p>
* FIXME: MatchActionOperations should have dependency field to the other
* match action operations, and this method should use this.
*
* @param op the {@link FlowBatchOperation} object
* @return the list of {@link MatchActionOperations} objects
*/
/*
private List<MatchActionOperations>
generateMatchActionOperationsList(FlowBatchOperation op) {
// MatchAction operations at head (ingress) switches.
MatchActionOperations headOps = matchActionService.createOperationsList();
// MatchAction operations at rest of the switches.
MatchActionOperations tailOps = matchActionService.createOperationsList();
MatchActionOperations removeOps = matchActionService.createOperationsList();
for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
if (e.getOperator() == FlowBatchOperation.Operator.ADD) {
generateInstallMatchActionOperations(e, tailOps, headOps);
} else if (e.getOperator() == FlowBatchOperation.Operator.REMOVE) {
generateRemoveMatchActionOperations(e, removeOps);
} else {
throw new UnsupportedOperationException(
"FlowManager supports ADD and REMOVE operations only.");
}
}
return Arrays.asList(tailOps, headOps, removeOps);
}
*/
/**
* Generates MatchActionOperations for an INSTALL FlowBatchOperation.
* <p/>
* FIXME: Currently only supports flows that generate exactly two match
* action operation sets.
*
* @param e Flow BatchOperationEntry
* @param tailOps MatchActionOperation set that the tail
* MatchActionOperations will be placed in
* @param headOps MatchActionOperation set that the head
* MatchActionOperations will be placed in
*/
/*
private void generateInstallMatchActionOperations(
BatchOperationEntry<Operator, ?> e,
MatchActionOperations tailOps,
MatchActionOperations headOps) {
if (!(e.getTarget() instanceof Flow)) {
throw new IllegalStateException(
"The target is not Flow object: " + e.getTarget());
}
// Compile flows to match-actions
Flow flow = (Flow) e.getTarget();
List<MatchActionOperations> maOps = flow.compile(
e.getOperator(), matchActionService);
verifyNotNull(maOps, "Could not compile the flow: " + flow);
verify(maOps.size() == 2,
"The flow generates unspported match-action operations.");
// Map FlowId to MatchActionIds
for (MatchActionOperations maOp : maOps) {
for (MatchActionOperationEntry entry : maOp.getOperations()) {
flowMatchActionsMap.put(
KryoFactory.serialize(flow.getId()),
KryoFactory.serialize(entry.getTarget()));
}
}
// Merge match-action operations
for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
tailOps.addOperation(mae);
}
for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
headOps.addOperation(mae);
}
}
*/
/**
* Generates MatchActionOperations for a REMOVE FlowBatchOperation.
*
* @param e Flow BatchOperationEntry
* @param removeOps MatchActionOperation set that the remove
* MatchActionOperations will be placed in
*/
/*
private void generateRemoveMatchActionOperations(
BatchOperationEntry<Operator, ?> e,
MatchActionOperations removeOps) {
if (!(e.getTarget() instanceof FlowId)) {
throw new IllegalStateException(
"The target is not a FlowId object: " + e.getTarget());
}
// Compile flows to match-actions
FlowId flowId = (FlowId) e.getTarget();
for (byte[] matchActionIdBytes :
flowMatchActionsMap.remove(KryoFactory.serialize(flowId))) {
MatchActionId matchActionId = KryoFactory.deserialize(matchActionIdBytes);
removeOps.addOperation(new MatchActionOperationEntry(
MatchActionOperations.Operator.REMOVE, matchActionId));
}
}
*/
}
......
......@@ -55,6 +55,7 @@ public class ProxyArpManager implements ProxyArpService {
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
private static final String NOT_ARP_REPLY = "ARP is not a reply.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
......@@ -141,7 +142,7 @@ public class ProxyArpManager implements ProxyArpService {
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REQUEST);
checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REPLY);
Host h = hostService.getHost(HostId.hostId(eth.getDestinationMAC(),
VlanId.vlanId(eth.getVlanID())));
......
......@@ -272,7 +272,8 @@ public class DeviceManagerTest {
}
}
private static class TestMastershipService extends MastershipServiceAdapter {
private static class TestMastershipService
extends MastershipServiceAdapter {
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
......
......@@ -19,6 +19,7 @@ import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.impl.DefaultApplicationId;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Device.Type;
......@@ -28,6 +29,7 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
......@@ -58,6 +60,8 @@ import com.google.common.collect.Sets;
*/
public class FlowRuleManagerTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID = DeviceId.deviceId("of:001");
private static final int TIMEOUT = 10;
......@@ -86,7 +90,7 @@ public class FlowRuleManagerTest {
mgr.addListener(listener);
provider = new TestProvider(PID);
providerService = registry.register(provider);
appId = ApplicationId.getAppId();
appId = new TestApplicationId((short) 0, "FlowRuleManagerTest");
assertTrue("provider should be registered",
registry.getProviders().contains(provider.id()));
}
......@@ -408,7 +412,7 @@ public class FlowRuleManagerTest {
}
@Override
public Future<Void> executeBatch(
public Future<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;
......@@ -474,4 +478,11 @@ public class FlowRuleManagerTest {
}
public class TestApplicationId extends DefaultApplicationId {
public TestApplicationId(short id, String name) {
super(id, name);
}
}
}
......
package org.onlab.onos.net.proxyarp.impl;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions.OutputInstruction;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.link.LinkListener;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import com.google.common.collect.Sets;
/**
* Tests for the {@link ProxyArpManager} class.
*/
public class ProxyArpManagerTest {
private static final int NUM_DEVICES = 4;
private static final int NUM_PORTS_PER_DEVICE = 3;
private static final int NUM_FLOOD_PORTS = 4;
private static final IpPrefix IP1 = IpPrefix.valueOf("10.0.0.1/24");
private static final IpPrefix IP2 = IpPrefix.valueOf("10.0.0.2/24");
private static final ProviderId PID = new ProviderId("of", "foo");
private static final VlanId VLAN1 = VlanId.vlanId((short) 1);
private static final VlanId VLAN2 = VlanId.vlanId((short) 2);
private static final MacAddress MAC1 = MacAddress.valueOf("00:00:11:00:00:01");
private static final MacAddress MAC2 = MacAddress.valueOf("00:00:22:00:00:02");
private static final HostId HID1 = HostId.hostId(MAC1, VLAN1);
private static final HostId HID2 = HostId.hostId(MAC2, VLAN1);
private static final DeviceId DID1 = getDeviceId(1);
private static final DeviceId DID2 = getDeviceId(2);
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final HostLocation LOC1 = new HostLocation(DID1, P1, 123L);
private static final HostLocation LOC2 = new HostLocation(DID2, P1, 123L);
private ProxyArpManager proxyArp;
private TestPacketService packetService;
private DeviceService deviceService;
private LinkService linkService;
private HostService hostService;
@Before
public void setUp() throws Exception {
proxyArp = new ProxyArpManager();
packetService = new TestPacketService();
proxyArp.packetService = packetService;
// Create a host service mock here. Must be replayed by tests once the
// expectations have been set up
hostService = createMock(HostService.class);
proxyArp.hostService = hostService;
createTopology();
proxyArp.deviceService = deviceService;
proxyArp.linkService = linkService;
proxyArp.activate();
}
/**
* Creates a fake topology to feed into the ARP module.
* <p/>
* The default topology is a unidirectional ring topology. Each switch has
* 3 ports. Ports 2 and 3 have the links to neighbor switches, and port 1
* is free (edge port).
*/
private void createTopology() {
deviceService = createMock(DeviceService.class);
linkService = createMock(LinkService.class);
deviceService.addListener(anyObject(DeviceListener.class));
linkService.addListener(anyObject(LinkListener.class));
createDevices(NUM_DEVICES, NUM_PORTS_PER_DEVICE);
createLinks(NUM_DEVICES);
}
/**
* Creates the devices for the fake topology.
*/
private void createDevices(int numDevices, int numPorts) {
List<Device> devices = new ArrayList<>();
for (int i = 1; i <= numDevices; i++) {
DeviceId devId = getDeviceId(i);
Device device = createMock(Device.class);
expect(device.id()).andReturn(devId).anyTimes();
replay(device);
devices.add(device);
List<Port> ports = new ArrayList<>();
for (int j = 1; j <= numPorts; j++) {
Port port = createMock(Port.class);
expect(port.number()).andReturn(PortNumber.portNumber(j)).anyTimes();
replay(port);
ports.add(port);
}
expect(deviceService.getPorts(devId)).andReturn(ports);
}
expect(deviceService.getDevices()).andReturn(devices);
replay(deviceService);
}
/**
* Creates the links for the fake topology.
* NB: Only unidirectional links are created, as for this purpose all we
* need is to occupy the ports with some link.
*/
private void createLinks(int numDevices) {
List<Link> links = new ArrayList<Link>();
for (int i = 1; i <= numDevices; i++) {
ConnectPoint src = new ConnectPoint(
getDeviceId(i),
PortNumber.portNumber(2));
ConnectPoint dst = new ConnectPoint(
getDeviceId((i + 1 > numDevices) ? 1 : i + 1),
PortNumber.portNumber(3));
Link link = createMock(Link.class);
expect(link.src()).andReturn(src).anyTimes();
expect(link.dst()).andReturn(dst).anyTimes();
replay(link);
links.add(link);
}
expect(linkService.getLinks()).andReturn(links).anyTimes();
replay(linkService);
}
/**
* Tests {@link ProxyArpManager#known(IpPrefix)} in the case where the
* IP address is not known.
* Verifies the method returns false.
*/
@Test
public void testNotKnown() {
expect(hostService.getHostsByIp(IP1)).andReturn(Collections.<Host>emptySet());
replay(hostService);
assertFalse(proxyArp.known(IP1));
}
/**
* Tests {@link ProxyArpManager#known(IpPrefix)} in the case where the
* IP address is known.
* Verifies the method returns true.
*/
@Test
public void testKnown() {
Host host1 = createMock(Host.class);
Host host2 = createMock(Host.class);
expect(hostService.getHostsByIp(IP1))
.andReturn(Sets.newHashSet(host1, host2));
replay(hostService);
assertTrue(proxyArp.known(IP1));
}
/**
* Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
* destination host is known.
* Verifies the correct ARP reply is sent out the correct port.
*/
@Test
public void testReplyKnown() {
Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC2,
Collections.singleton(IP1));
Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
Collections.singleton(IP2));
expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
.andReturn(Collections.singleton(replyer));
expect(hostService.getHost(HID2)).andReturn(requestor);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
proxyArp.reply(arpRequest);
assertEquals(1, packetService.packets.size());
Ethernet arpReply = buildArp(ARP.OP_REPLY, MAC1, MAC2, IP1, IP2);
verifyPacketOut(arpReply, LOC1, packetService.packets.get(0));
}
/**
* Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
* destination host is not known.
* Verifies the ARP request is flooded out the correct edge ports.
*/
@Test
public void testReplyUnknown() {
Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
Collections.singleton(IP2));
expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
.andReturn(Collections.<Host>emptySet());
expect(hostService.getHost(HID2)).andReturn(requestor);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
proxyArp.reply(arpRequest);
verifyFlood(arpRequest);
}
/**
* Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
* destination host is known for that IP address, but is not on the same
* VLAN as the source host.
* Verifies the ARP request is flooded out the correct edge ports.
*/
@Test
public void testReplyDifferentVlan() {
Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN2, LOC2,
Collections.singleton(IP1));
Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
Collections.singleton(IP2));
expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
.andReturn(Collections.singleton(replyer));
expect(hostService.getHost(HID2)).andReturn(requestor);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
proxyArp.reply(arpRequest);
verifyFlood(arpRequest);
}
/**
* Tests {@link ProxyArpManager#forward(Ethernet)} in the case where the
* destination host is known.
* Verifies the correct ARP request is sent out the correct port.
*/
@Test
public void testForwardToHost() {
Host host1 = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC1,
Collections.singleton(IP1));
expect(hostService.getHost(HID1)).andReturn(host1);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REPLY, MAC2, MAC1, IP2, IP1);
proxyArp.forward(arpRequest);
assertEquals(1, packetService.packets.size());
OutboundPacket packet = packetService.packets.get(0);
verifyPacketOut(arpRequest, LOC1, packet);
}
/**
* Tests {@link ProxyArpManager#forward(Ethernet)} in the case where the
* destination host is not known.
* Verifies the correct ARP request is flooded out the correct edge ports.
*/
@Test
public void testForwardFlood() {
expect(hostService.getHost(HID1)).andReturn(null);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REPLY, MAC2, MAC1, IP2, IP1);
proxyArp.forward(arpRequest);
verifyFlood(arpRequest);
}
/**
* Verifies that the given packet was flooded out all available edge ports.
*
* @param packet the packet that was expected to be flooded
*/
private void verifyFlood(Ethernet packet) {
assertEquals(NUM_FLOOD_PORTS, packetService.packets.size());
Collections.sort(packetService.packets,
new Comparator<OutboundPacket>() {
@Override
public int compare(OutboundPacket o1, OutboundPacket o2) {
return o1.sendThrough().uri().compareTo(o2.sendThrough().uri());
}
});
for (int i = 0; i < NUM_FLOOD_PORTS; i++) {
ConnectPoint cp = new ConnectPoint(getDeviceId(i + 1), PortNumber.portNumber(1));
OutboundPacket outboundPacket = packetService.packets.get(i);
verifyPacketOut(packet, cp, outboundPacket);
}
}
/**
* Verifies the given packet was sent out the given port.
*
* @param expected the packet that was expected to be sent
* @param outPort the port the packet was expected to be sent out
* @param actual the actual OutboundPacket to verify
*/
private void verifyPacketOut(Ethernet expected, ConnectPoint outPort,
OutboundPacket actual) {
assertTrue(Arrays.equals(expected.serialize(), actual.data().array()));
assertEquals(1, actual.treatment().instructions().size());
assertEquals(outPort.deviceId(), actual.sendThrough());
Instruction instruction = actual.treatment().instructions().get(0);
assertTrue(instruction instanceof OutputInstruction);
assertEquals(outPort.port(), ((OutputInstruction) instruction).port());
}
/**
* Returns the device ID of the ith device.
*
* @param i device to get the ID of
* @return the device ID
*/
private static DeviceId getDeviceId(int i) {
return DeviceId.deviceId("" + i);
}
/**
* Builds an ARP packet with the given parameters.
*
* @param opcode opcode of the ARP packet
* @param srcMac source MAC address
* @param dstMac destination MAC address, or null if this is a request
* @param srcIp source IP address
* @param dstIp destination IP address
* @return the ARP packet
*/
private Ethernet buildArp(short opcode, MacAddress srcMac, MacAddress dstMac,
IpPrefix srcIp, IpPrefix dstIp) {
Ethernet eth = new Ethernet();
if (dstMac == null) {
eth.setDestinationMACAddress(MacAddress.BROADCAST_MAC);
} else {
eth.setDestinationMACAddress(dstMac.getAddress());
}
eth.setSourceMACAddress(srcMac.getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
eth.setVlanID(VLAN1.toShort());
ARP arp = new ARP();
arp.setOpCode(opcode);
arp.setProtocolType(ARP.PROTO_TYPE_IP);
arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
arp.setProtocolAddressLength((byte) IpPrefix.INET_LEN);
arp.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH);
arp.setSenderHardwareAddress(srcMac.getAddress());
if (dstMac == null) {
arp.setTargetHardwareAddress(MacAddress.ZERO_MAC_ADDRESS);
} else {
arp.setTargetHardwareAddress(dstMac.getAddress());
}
arp.setSenderProtocolAddress(srcIp.toOctets());
arp.setTargetProtocolAddress(dstIp.toOctets());
eth.setPayload(arp);
return eth;
}
/**
* Test PacketService implementation that simply stores OutboundPackets
* passed to {@link #emit(OutboundPacket)} for later verification.
*/
class TestPacketService implements PacketService {
List<OutboundPacket> packets = new ArrayList<>();
@Override
public void addProcessor(PacketProcessor processor, int priority) {
}
@Override
public void removeProcessor(PacketProcessor processor) {
}
@Override
public void emit(OutboundPacket packet) {
packets.add(packet);
}
}
}
......@@ -43,8 +43,8 @@ public class DistributedFlowRuleStore
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
private final Multimap<Short, FlowRule> flowEntriesById =
ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
......@@ -83,7 +83,7 @@ public class DistributedFlowRuleStore
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
......
package org.onlab.onos.store.cluster.impl;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
......@@ -21,17 +19,16 @@ import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MultiMap;
/**
* Distributed implementation of the cluster nodes store.
* Distributed implementation of the mastership store. The store is
* responsible for the master selection process.
*/
@Component(immediate = true)
@Service
......@@ -39,8 +36,21 @@ public class DistributedMastershipStore
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
private IMap<byte[], byte[]> rawMasters;
private LoadingCache<DeviceId, Optional<NodeId>> masters;
//arbitrary lock name
private static final String LOCK = "lock";
//initial term/TTL value
private static final Integer INIT = 0;
//devices to masters
protected IMap<byte[], byte[]> masters;
//devices to terms
protected IMap<byte[], Integer> terms;
//re-election related, disjoint-set structures:
//device-nodes multiset of available nodes
protected MultiMap<byte[], byte[]> standbys;
//device-nodes multiset for nodes that have given up on device
protected MultiMap<byte[], byte[]> unusable;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -50,99 +60,263 @@ implements MastershipStore {
public void activate() {
super.activate();
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(serializer, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
masters = theInstance.getMap("masters");
terms = theInstance.getMap("terms");
standbys = theInstance.getMultiMap("backups");
unusable = theInstance.getMultiMap("unusable");
loadMasters();
masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
log.info("Started");
}
private void loadMasters() {
for (byte[] keyBytes : rawMasters.keySet()) {
final DeviceId id = deserialize(keyBytes);
masters.refresh(id);
}
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
synchronized (this) {
NodeId currentMaster = getMaster(deviceId);
if (Objects.equals(currentMaster, nodeId)) {
return null;
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
byte[] did = serialize(deviceId);
byte[] nid = serialize(nodeId);
NodeId current = deserialize(masters.get(did));
if (current == null) {
if (standbys.containsEntry(did, nid)) {
//was previously standby, or set to standby from master
return MastershipRole.STANDBY;
} else {
return MastershipRole.NONE;
}
} else {
if (current.equals(nodeId)) {
//*should* be in unusable, not always
return MastershipRole.MASTER;
} else {
//may be in backups or unusable from earlier retirement
return MastershipRole.STANDBY;
}
}
}
// FIXME: for now implementing semantics of setMaster
rawMasters.put(serialize(deviceId), serialize(nodeId));
masters.put(deviceId, Optional.of(nodeId));
return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
byte [] did = serialize(deviceId);
byte [] nid = serialize(nodeId);
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
//reinforce mastership
evict(nid, did);
return null;
case STANDBY:
//make current master standby
byte [] current = masters.get(did);
if (current != null) {
backup(current, did);
}
//assign specified node as new master
masters.put(did, nid);
evict(nid, did);
updateTerm(did);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
case NONE:
masters.put(did, nid);
evict(nid, did);
updateTerm(did);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
default:
log.warn("unknown Mastership Role {}", role);
return null;
}
} finally {
lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
return masters.getUnchecked(deviceId).orNull();
return deserialize(masters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
if (nodeId.equals(entry.getValue().get())) {
builder.add(entry.getKey());
for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
if (nodeId.equals(deserialize(entry.getValue()))) {
builder.add((DeviceId) deserialize(entry.getKey()));
}
}
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
// FIXME: for now we are 'selecting' as master whoever asks
setMaster(clusterService.getLocalNode().id(), deviceId);
return MastershipRole.MASTER;
NodeId local = clusterService.getLocalNode().id();
byte [] did = serialize(deviceId);
byte [] lnid = serialize(local);
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(local, deviceId);
switch (role) {
case MASTER:
evict(lnid, did);
break;
case STANDBY:
backup(lnid, did);
terms.putIfAbsent(did, INIT);
break;
case NONE:
//claim mastership
masters.put(did, lnid);
evict(lnid, did);
updateTerm(did);
role = MastershipRole.MASTER;
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return role;
} finally {
lock.unlock();
}
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
NodeId master = masters.getUnchecked(deviceId).orNull();
return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
public MastershipTerm getTermFor(DeviceId deviceId) {
byte[] did = serialize(deviceId);
if ((masters.get(did) == null) ||
(terms.get(did) == null)) {
return null;
}
return MastershipTerm.of(
(NodeId) deserialize(masters.get(did)), terms.get(did));
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
// FIXME: implement this properly
return MastershipTerm.of(getMaster(deviceId), 1);
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
byte [] did = serialize(deviceId);
byte [] nid = serialize(nodeId);
MastershipEvent event = null;
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
event = reelect(nodeId, deviceId);
backup(nid, did);
break;
case STANDBY:
//fall through to reinforce role
case NONE:
backup(nid, did);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return event;
} finally {
lock.unlock();
}
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
boolean removed = rawMasters.remove(serialize(deviceId), serialize(nodeId));
masters.invalidate(deviceId);
if (!removed) {
return null;
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
byte [] did = serialize(deviceId);
byte [] nid = serialize(nodeId);
MastershipEvent event = null;
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
event = reelect(nodeId, deviceId);
evict(nid, did);
break;
case STANDBY:
//fall through to reinforce relinquishment
case NONE:
evict(nid, did);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return event;
} finally {
lock.unlock();
}
Optional<NodeId> newMaster = masters.getUnchecked(deviceId);
if (newMaster.isPresent()) {
return new MastershipEvent(MASTER_CHANGED, deviceId, newMaster.get());
} else {
// FIXME: probably need to express NO_MASTER somehow.
}
//helper to fetch a new master candidate for a given device.
private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
byte [] did = serialize(deviceId);
byte [] nid = serialize(current);
//if this is an queue it'd be neater.
byte [] backup = null;
for (byte [] n : standbys.get(serialize(deviceId))) {
if (!current.equals(deserialize(n))) {
backup = n;
break;
}
}
if (backup == null) {
masters.remove(did, nid);
return null;
} else {
masters.put(did, backup);
evict(backup, did);
Integer term = terms.get(did);
terms.put(did, ++term);
return new MastershipEvent(
MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
}
}
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
//adds node to pool(s) of backups and moves them from unusable.
private void backup(byte [] nodeId, byte [] deviceId) {
if (!standbys.containsEntry(deviceId, nodeId)) {
standbys.put(deviceId, nodeId);
}
if (unusable.containsEntry(deviceId, nodeId)) {
unusable.remove(deviceId, nodeId);
}
}
//adds node to unusable and evicts it from backup pool.
private void evict(byte [] nodeId, byte [] deviceId) {
if (!unusable.containsEntry(deviceId, nodeId)) {
unusable.put(deviceId, nodeId);
}
if (standbys.containsEntry(deviceId, nodeId)) {
standbys.remove(deviceId, nodeId);
}
}
//adds or updates term information.
private void updateTerm(byte [] deviceId) {
Integer term = terms.get(deviceId);
if (term == null) {
terms.put(deviceId, INIT);
} else {
terms.put(deviceId, ++term);
}
}
private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
@Override
protected void onAdd(DeviceId deviceId, NodeId nodeId) {
......@@ -151,12 +325,13 @@ implements MastershipStore {
@Override
protected void onRemove(DeviceId deviceId, NodeId nodeId) {
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
//notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
@Override
protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
//only addition indicates a change in mastership
//notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
}
......
package org.onlab.onos.store.cluster.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.net.MastershipRole.*;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipEvent.Type;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast-based distributed MastershipStore implementation.
*/
public class DistributedMastershipStoreTest {
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private static final NodeId N1 = new NodeId("node1");
private static final NodeId N2 = new NodeId("node2");
private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
private DistributedMastershipStore dms;
private TestDistributedMastershipStore testStore;
private KryoSerializer serializationMgr;
private StoreManager storeMgr;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Before
public void setUp() throws Exception {
// TODO should find a way to clean Hazelcast instance without shutdown.
Config config = TestStoreManager.getTestConfig();
storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeMgr.activate();
serializationMgr = new KryoSerializer();
dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
dms.clusterService = new TestClusterService();
dms.activate();
testStore = (TestDistributedMastershipStore) dms;
}
@After
public void tearDown() throws Exception {
dms.deactivate();
storeMgr.deactivate();
}
@Test
public void getRole() {
assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
testStore.put(DID1, N1, true, false, true);
assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
}
@Test
public void getMaster() {
assertTrue("wrong store state:", dms.masters.isEmpty());
testStore.put(DID1, N1, true, false, false);
assertEquals("wrong master:", N1, dms.getMaster(DID1));
assertNull("wrong master:", dms.getMaster(DID2));
}
@Test
public void getDevices() {
assertTrue("wrong store state:", dms.masters.isEmpty());
testStore.put(DID1, N1, true, false, false);
testStore.put(DID2, N1, true, false, false);
testStore.put(DID3, N2, true, false, false);
assertEquals("wrong devices",
Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
}
@Test
public void requestRoleAndTerm() {
//CN1 is "local"
testStore.setCurrent(CN1);
//if already MASTER, nothing should happen
testStore.put(DID2, N1, true, false, false);
assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
//populate maps with DID1, N1 thru NONE case
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
assertTrue("wrong state for store:", !dms.terms.isEmpty());
assertEquals("wrong term",
MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
//CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
testStore.setCurrent(CN2);
assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
assertEquals("wrong number of entries:", 2, dms.terms.size());
//change term and requestRole() again; should persist
testStore.increment(DID2);
assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
}
@Test
public void setMaster() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
assertNull("wrong event:", dms.setMaster(N1, DID1));
//switch over to N2
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
//orphan switch - should be rare case
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
//disconnect and reconnect - sign of failing re-election or single-instance channel
testStore.reset(true, false, false);
dms.setMaster(N2, DID2);
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
}
@Test
public void relinquishRole() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
//no backup, no new MASTER/event
assertNull("wrong event:", dms.relinquishRole(N1, DID1));
dms.requestRole(DID1);
//add backup CN2, get it elected MASTER by relinquishing
testStore.setCurrent(CN2);
assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
//STANDBY - nothing here, either
assertNull("wrong event:", dms.relinquishRole(N1, DID1));
assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
//all nodes "give up" on device, which goes back to NONE.
assertNull("wrong event:", dms.relinquishRole(N2, DID1));
assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
assertEquals("wrong number of retired nodes", 2, dms.unusable.size());
//bring nodes back
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
assertEquals("wrong number of backup nodes", 1, dms.standbys.size());
//NONE - nothing happens
assertNull("wrong event:", dms.relinquishRole(N1, DID2));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
}
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public void testEvents() throws InterruptedException {
//shamelessly copy other distributed store tests
final CountDownLatch addLatch = new CountDownLatch(1);
MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() {
@Override
public void notify(MastershipEvent event) {
assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
assertEquals("wrong subject", DID1, event.subject());
assertEquals("wrong subject", N1, event.master());
addLatch.countDown();
}
};
dms.setDelegate(checkAdd);
dms.setMaster(N1, DID1);
//this will fail until we do something about single-instance-ness
assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
}
private class TestDistributedMastershipStore extends
DistributedMastershipStore {
public TestDistributedMastershipStore(StoreService storeService,
KryoSerializer kryoSerialization) {
this.storeService = storeService;
this.serializer = kryoSerialization;
}
//helper to populate master/backup structures
public void put(DeviceId dev, NodeId node,
boolean master, boolean backup, boolean term) {
byte [] n = serialize(node);
byte [] d = serialize(dev);
if (master) {
dms.masters.put(d, n);
dms.unusable.put(d, n);
dms.standbys.remove(d, n);
}
if (backup) {
dms.standbys.put(d, n);
dms.masters.remove(d, n);
dms.unusable.remove(d, n);
}
if (term) {
dms.terms.put(d, 0);
}
}
//a dumb utility function.
public void dump() {
System.out.println("standbys");
for (Map.Entry<byte [], byte []> e : standbys.entrySet()) {
System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
}
System.out.println("unusable");
for (Map.Entry<byte [], byte []> e : unusable.entrySet()) {
System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
}
}
//clears structures
public void reset(boolean store, boolean backup, boolean term) {
if (store) {
dms.masters.clear();
dms.unusable.clear();
}
if (backup) {
dms.standbys.clear();
}
if (term) {
dms.terms.clear();
}
}
//increment term for a device
public void increment(DeviceId dev) {
Integer t = dms.terms.get(serialize(dev));
if (t != null) {
dms.terms.put(serialize(dev), ++t);
}
}
//sets the "local" node
public void setCurrent(ControllerNode node) {
((TestClusterService) clusterService).current = node;
}
}
private class TestClusterService implements ClusterService {
protected ControllerNode current;
@Override
public ControllerNode getLocalNode() {
return current;
}
@Override
public Set<ControllerNode> getNodes() {
return Sets.newHashSet(CN1, CN2);
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return null;
}
@Override
public State getState(NodeId nodeId) {
return null;
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
}
}
}
......@@ -43,8 +43,8 @@ public class DistributedFlowRuleStore
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
private final Multimap<Short, FlowRule> flowEntriesById =
ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
......@@ -83,7 +83,7 @@ public class DistributedFlowRuleStore
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
......
......@@ -42,8 +42,8 @@ public class SimpleFlowRuleStore
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
private final Multimap<Short, FlowRule> flowEntriesById =
ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
......@@ -82,7 +82,7 @@ public class SimpleFlowRuleStore
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
......
......@@ -174,7 +174,7 @@ public class SimpleMastershipStore
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
......@@ -214,4 +214,9 @@ public class SimpleMastershipStore
return backup;
}
@Override
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
return setStandby(nodeId, deviceId);
}
}
......
......@@ -129,22 +129,22 @@ public class SimpleMastershipStoreTest {
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.unsetMaster(N1, DID1);
sms.setStandby(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
sms.unsetMaster(N1, DID1);
sms.setStandby(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
assertNull("wrong event", sms.unsetMaster(N1, DID1));
assertNull("wrong event", sms.setStandby(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
}
//helper to populate master/backup structures
......
......@@ -981,11 +981,13 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
// switch was a duplicate-dpid, calling the method below would clear
// all state for the original switch (with the same dpid),
// which we obviously don't want.
log.info("{}:removal called");
sw.removeConnectedSwitch();
} else {
// A duplicate was disconnected on this ChannelHandler,
// this is the same switch reconnecting, but the original state was
// not cleaned up - XXX check liveness of original ChannelHandler
log.info("{}:duplicate found");
duplicateDpidFound = Boolean.FALSE;
}
} else {
......
......@@ -307,9 +307,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
connectedSwitches.remove(dpid);
OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
if (sw == null) {
log.warn("sw was null for {}", dpid);
sw = activeEqualSwitches.remove(dpid);
}
for (OpenFlowSwitchListener l : ofSwitchListener) {
log.warn("removal for {}", dpid);
l.switchRemoved(dpid);
}
}
......
......@@ -27,6 +27,8 @@ import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanPcp
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction;
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction;
import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFFlowAdd;
import org.projectfloodlight.openflow.protocol.OFFlowDelete;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowModFlags;
import org.projectfloodlight.openflow.protocol.action.OFAction;
......@@ -68,12 +70,13 @@ public class FlowModBuilder {
this.cookie = flowRule.id();
}
public OFFlowMod buildFlowAdd() {
public OFFlowAdd buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowAdd()
OFFlowAdd fm = factory.buildFlowAdd()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -92,6 +95,7 @@ public class FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -104,11 +108,12 @@ public class FlowModBuilder {
}
public OFFlowMod buildFlowDel() {
public OFFlowDelete buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
OFFlowMod fm = factory.buildFlowDelete()
OFFlowDelete fm = factory.buildFlowDelete()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.provider.of.flow.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -21,9 +22,12 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
......@@ -40,6 +44,7 @@ import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
......@@ -52,6 +57,11 @@ import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
......@@ -70,6 +80,8 @@ import com.google.common.collect.Multimap;
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
enum BatchState { STARTED, FINISHED, CANCELLED };
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -88,6 +100,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
private final Map<Long, InstallationFuture> pendingFMs =
new ConcurrentHashMap<Long, InstallationFuture>();
/**
* Creates an OpenFlow host provider.
*/
......@@ -143,9 +158,47 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
removeFlowRule(flowRules);
}
@Override
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
OFFlowMod mod = null;
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
FlowModBuilder builder = new FlowModBuilder(flowRule, sw.factory());
switch (fbe.getOperator()) {
case ADD:
mod = builder.buildFlowAdd();
break;
case REMOVE:
mod = builder.buildFlowDel();
break;
case MODIFY:
mod = builder.buildFlowMod();
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
if (mod != null) {
sw.sendMsg(mod);
fmXids.put(mod.getXid(), fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
}
InstallationFuture installation = new InstallationFuture(sws, fmXids);
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
//TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
// possibly barriers as well. May not be internal at all...
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
......@@ -175,7 +228,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
......@@ -191,7 +243,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
break;
case ERROR:
future = pendingFutures.get(msg.getXid());
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
......@@ -203,10 +255,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public void roleAssertFailed(Dpid dpid, RoleState role) {
// TODO Auto-generated method stub
}
public void roleAssertFailed(Dpid dpid, RoleState role) {}
private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
if (stats.getStatsType() != OFStatsType.FLOW) {
......@@ -230,7 +279,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
// TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
if (reply.getVersion().equals(OFVersion.OF_10) ||
reply.getMatch().getMatchFields().iterator().hasNext()) {
return false;
......@@ -251,104 +299,91 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
return false;
}
}
@Override
public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
switch (fbe.getOperator()) {
case ADD:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
break;
case REMOVE:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
break;
case MODIFY:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
}
InstallationFuture installation = new InstallationFuture(sws);
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
private class InstallationFuture implements Future<Void> {
private class InstallationFuture implements Future<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
private Integer pendingXid;
private BatchState state;
public InstallationFuture(Set<Dpid> sws) {
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.state = BatchState.STARTED;
this.sws = sws;
this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
//TODO add reason to flowentry
FlowEntry fe = null;
FlowRuleBatchEntry fbe = fms.get(msg.getXid());
FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
//offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
badins.getCode().ordinal());
break;
case BAD_MATCH:
OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
break;
case EXPERIMENTER:
OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
fmFail.getCode().ordinal());
break;
case EXPERIMENTER:
case GROUP_MOD_FAILED:
break;
case HELLO_FAILED:
break;
case METER_MOD_FAILED:
break;
case PORT_MOD_FAILED:
break;
case QUEUE_OP_FAILED:
break;
case ROLE_REQUEST_FAILED:
break;
case SWITCH_CONFIG_FAILED:
break;
case TABLE_FEATURES_FAILED:
break;
case TABLE_MOD_FAILED:
fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
break;
default:
break;
log.error("Unknown error type {}", msg.getErrType());
}
offendingFlowMods.add(fe);
}
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
sws.remove(controller.getSwitch(dpid));
sws.remove(dpid);
countDownLatch.countDown();
cleanUp();
}
public void verify(Integer id) {
pendingXid = id;
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
......@@ -356,41 +391,59 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
.setXid(id);
sw.sendMsg(builder.build());
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
this.state = BatchState.CANCELLED;
cleanUp();
for (FlowRuleBatchEntry fbe : fms.values()) {
if (fbe.getOperator() == FlowRuleOperation.ADD ||
fbe.getOperator() == FlowRuleOperation.MODIFY) {
removeFlowRule(fbe.getTarget());
} else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
applyRule(fbe.getTarget());
}
}
return isCancelled();
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
return this.state == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
return sws.isEmpty();
return this.state == BatchState.FINISHED;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
//return offendingFlowMods;
return null;
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
@Override
public Void get(long timeout, TimeUnit unit)
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
countDownLatch.await(timeout, unit);
//return offendingFlowMods;
return null;
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
throw new TimeoutException();
}
private void cleanUp() {
if (sws.isEmpty()) {
pendingFutures.remove(pendingXid);
for (Long xid : fms.keySet()) {
pendingFMs.remove(xid);
}
}
}
}
......
......@@ -33,6 +33,7 @@ alias obs='onos-build-selective'
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
alias ow='onos-watch'
alias go='ob && ot && onos -w'
alias pub='onos-push-update-bundle'
......
#------------------------------------------------------------------------------
# Echoes project-level directory if a Java file within is newer than its
# class file counterpart
# Echoes project-level directory if a Java file within is newer than the
# target directory.
#------------------------------------------------------------------------------
javaFile=${1#*\/src\/*\/java/}
......@@ -10,9 +10,7 @@ basename=${1/*\//}
src=${1/$javaFile/}
project=${src/src*/}
classFile=${javaFile/.java/.class}
target=$project/target
[ ${project}target/classes/$classFile -nt ${src}$javaFile -o \
${project}target/test-classes/$classFile -nt ${src}$javaFile ] \
|| echo ${src/src*/}
[ $target -nt ${src}$javaFile ] || echo ${src/src*/}
......
......@@ -7,7 +7,7 @@
. $ONOS_ROOT/tools/build/envDefaults
cd ~/.m2/repository
jar=$(find org/onlab -type f -name '*.jar' | grep $1 | grep -v -e -tests | head -n 1)
jar=$(find org/onlab -type f -name '*.jar' | grep -e $1 | grep -v -e -tests | head -n 1)
[ -z "$jar" ] && echo "No bundle $1 found for" && exit 1
......
#!/bin/bash
#-------------------------------------------------------------------------------
# Monitors selected set of ONOS commands using the system watch command.
#-------------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
node=${1:-$OCI}
commands="${2:-summary,intents,flows,hosts}"
aux=/tmp/onos-watch.$$
trap "rm -f $aux" EXIT
echo "$commands" | tr ',' '\n' > $aux
watch $3 "onos $node -b <$aux 2>/dev/null"
......@@ -11,6 +11,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
......@@ -18,7 +19,6 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.Timer;
/**
......@@ -70,16 +70,20 @@ public final class MetricsManager implements MetricsService {
/**
* Default Reporter for this metrics manager.
*/
private final Slf4jReporter reporter;
//private final Slf4jReporter reporter;
private final ConsoleReporter reporter;
public MetricsManager() {
this.metricsRegistry = new MetricRegistry();
this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
.outputTo(log)
// this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
// .outputTo(log)
// .convertRatesTo(TimeUnit.SECONDS)
// .convertDurationsTo(TimeUnit.MICROSECONDS)
// .build();
this.reporter = ConsoleReporter.forRegistry(this.metricsRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.NANOSECONDS)
.convertDurationsTo(TimeUnit.MICROSECONDS)
.build();
reporter.start(1, TimeUnit.MINUTES);
}
@Activate
......