tom

Merge remote-tracking branch 'origin/master'

Showing 27 changed files with 525 additions and 108 deletions
package org.onlab.onos.net.flow;
public class CompletedBatchOperation {
}
......@@ -2,12 +2,13 @@ package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.intent.BatchOperationTarget;
/**
* Represents a generalized match & action pair to be applied to
* an infrastucture device.
*/
public interface FlowRule {
public interface FlowRule extends BatchOperationTarget {
static final int MAX_TIMEOUT = 60;
static final int MIN_PRIORITY = 0;
......
package org.onlab.onos.net.flow;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.intent.BatchOperationEntry;
public class FlowRuleBatchEntry
extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
super(operator, target);
}
public enum FlowRuleOperation {
ADD,
REMOVE,
MODIFY
}
}
package org.onlab.onos.net.flow;
import java.util.Collection;
import org.onlab.onos.net.intent.BatchOperation;
public class FlowRuleBatchOperation
extends BatchOperation<FlowRuleBatchEntry> {
public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
super(operations);
}
}
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
/**
......@@ -34,4 +37,6 @@ public interface FlowRuleProvider extends Provider {
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
......@@ -66,7 +68,12 @@ public interface FlowRuleService {
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
//Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
/**
* Applies a batch operation of FlowRules.
*
* @return future indicating the state of the batch operation
*/
Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Adds the specified flow rule listener.
......
package org.onlab.onos.net.intent;
//TODO is this the right package?
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A list of BatchOperationEntry.
*
......@@ -15,7 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
private List<T> ops;
private final List<T> ops;
/**
* Creates new {@link BatchOperation} object.
......@@ -30,7 +31,7 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
*
* @param batchOperations the list of batch operation entries.
*/
public BatchOperation(List<T> batchOperations) {
public BatchOperation(Collection<T> batchOperations) {
ops = new LinkedList<>(checkNotNull(batchOperations));
}
......@@ -61,6 +62,10 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
/**
* Adds an operation.
* FIXME: Brian promises that the Intent Framework
* will not modify the batch operation after it has submitted it.
* Ali would prefer immutablity, but trusts brian for better or
* for worse.
*
* @param entry the operation to be added
* @return this object if succeeded, null otherwise
......
......@@ -15,14 +15,7 @@ public class BatchOperationEntry<T extends Enum<?>, U extends BatchOperationTarg
private final T operator;
private final U target;
/**
* Default constructor for serializer.
*/
@Deprecated
protected BatchOperationEntry() {
this.operator = null;
this.target = null;
}
/**
* Constructs new instance for the entry of the BatchOperation.
......
......@@ -12,6 +12,7 @@ public interface ClockProviderService {
/**
* Updates the mastership term for the specified deviceId.
*
* @param deviceId device identifier.
* @param term mastership term.
*/
......
......@@ -144,6 +144,10 @@ public class DeviceManager
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
if (newRole != MastershipRole.NONE) {
Device device = store.getDevice(deviceId);
// FIXME: Device might not be there yet. (eventual consistent)
if (device == null) {
return;
}
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
......@@ -193,16 +197,38 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
log.info("Device {} connected", deviceId);
// check my Role
MastershipRole role = mastershipService.requestRoleFor(deviceId);
if (role != MastershipRole.MASTER) {
// TODO: Do we need to tell the Provider that
// I am no longer the MASTER?
return;
}
// Master:
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(deviceId);
if (!term.master().equals(clusterService.getLocalNode().id())) {
// I've lost mastership after I thought I was MASTER.
return;
}
clockProviderService.setMastershipTerm(deviceId, term);
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
// If there was a change of any kind, trigger role selection
// process.
// If there was a change of any kind, tell the provider
// I am the master.
// Note: can be null, if mastership was lost.
if (event != null) {
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.requestRoleFor(deviceId));
// TODO: Check switch reconnected case, is it assured that
// event will not be null?
// FIXME: 1st argument should be deviceId
provider().roleChanged(event.subject(), role);
post(event);
}
}
......@@ -211,6 +237,10 @@ public class DeviceManager
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
log.debug("Device {} disconnected, but I am not the master", deviceId);
return;
}
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of mastership.
......@@ -272,9 +302,15 @@ public class DeviceManager
@Override
public void event(MastershipEvent event) {
if (event.master().equals(clusterService.getLocalNode().id())) {
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
clockProviderService.setMastershipTerm(event.subject(), term);
if (term.master().equals(clusterService.getLocalNode().id())) {
// only set if I am the master
clockProviderService.setMastershipTerm(event.subject(), term);
}
applyRole(event.subject(), MastershipRole.MASTER);
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
......
......@@ -5,6 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -18,8 +22,11 @@ import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -32,7 +39,9 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB &amp; SB APIs.
......@@ -131,6 +140,38 @@ public class FlowRuleManager
}
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
List<Future<Void>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
batches.put(frp, fbe);
switch (fbe.getOperator()) {
case ADD:
store.storeFlowRule(f);
break;
case REMOVE:
store.deleteFlowRule(f);
break;
case MODIFY:
default:
log.error("Batch operation type {} unsupported.", fbe.getOperator());
}
}
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
Future<Void> future = provider.executeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures);
}
@Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
......@@ -296,4 +337,63 @@ public class FlowRuleManager
eventDispatcher.post(event);
}
}
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
private final List<Future<Void>> futures;
public FlowRuleBatchFuture(List<Future<Void>> futures) {
this.futures = futures;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
boolean isDone = true;
for (Future<Void> future : futures) {
isDone &= future.isDone();
}
return isDone;
}
@Override
public CompletedBatchOperation get() throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
for (Future<Void> future : futures) {
future.get();
}
return new CompletedBatchOperation();
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
// TODO we should decrement the timeout
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
for (Future<Void> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
future.get(thisTimeout, TimeUnit.NANOSECONDS);
}
return new CompletedBatchOperation();
}
}
}
......
......@@ -4,6 +4,8 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -16,6 +18,9 @@ import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
......@@ -24,6 +29,8 @@ import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Installer for {@link PathIntent path connectivity intents}.
*/
......@@ -56,19 +63,27 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
flowRuleService.applyFlowRules(rule);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
//flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
......@@ -77,6 +92,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
......@@ -86,9 +102,16 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
flowRuleService.removeFlowRules(rule);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
//flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
......
......@@ -12,6 +12,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
......@@ -32,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -42,6 +44,7 @@ import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
......@@ -404,6 +407,13 @@ public class FlowRuleManagerTest {
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
@Override
public Future<Void> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;
}
}
......
......@@ -45,4 +45,9 @@ public class MessageSubject {
MessageSubject that = (MessageSubject) obj;
return Objects.equals(this.value, that.value);
}
// for serializer
protected MessageSubject() {
this.value = "";
}
}
......
......@@ -3,11 +3,11 @@ package org.onlab.onos.store.cluster.messaging.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -26,6 +26,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
......@@ -34,6 +35,7 @@ import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,8 +52,6 @@ public class ClusterCommunicationManager
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
// FIXME: `members` should go away and should be using ClusterService
private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
......@@ -59,11 +59,14 @@ public class ClusterCommunicationManager
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(ClusterMessage.class)
.register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
.register(byte[].class)
.register(MessageSubject.class)
.build()
.populate(1);
}
......@@ -73,7 +76,15 @@ public class ClusterCommunicationManager
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
messagingService = new NettyMessagingService(localNode.tcpPort());
NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
netty.activate();
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("NettyMessagingService#activate", e);
}
messagingService = netty;
log.info("Started");
}
......@@ -86,7 +97,7 @@ public class ClusterCommunicationManager
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
for (ControllerNode node : members.values()) {
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
......@@ -107,13 +118,16 @@ public class ClusterCommunicationManager
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
ControllerNode node = members.get(toNodeId);
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
log.info("sending...");
Response resp = messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
resp.get(1, TimeUnit.SECONDS);
log.info("sent...");
return true;
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
}
......@@ -137,7 +151,7 @@ public class ClusterCommunicationManager
@Override
public void addNode(ControllerNode node) {
members.put(node.id(), node);
//members.put(node.id(), node);
}
@Override
......@@ -146,7 +160,7 @@ public class ClusterCommunicationManager
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
//members.remove(node.id());
}
// Sends a heart beat to all peers.
......@@ -181,7 +195,8 @@ public class ClusterCommunicationManager
}
}
private static class InternalClusterMessageHandler implements MessageHandler {
// FIXME: revert static
private class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
......@@ -191,8 +206,18 @@ public class ClusterCommunicationManager
@Override
public void handle(Message message) {
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(clusterMessage);
// FIXME: remove me
log.info("InternalClusterMessageHandler.handle({})", message);
try {
log.info("before decode");
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
log.info("Subject:({}), Sender:({})", clusterMessage.subject(), clusterMessage.sender());
handler.handle(clusterMessage);
message.respond("ACK".getBytes());
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed", e);
}
}
}
}
......
......@@ -113,6 +113,7 @@ public class GossipDeviceStore
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
......
......@@ -35,4 +35,11 @@ public class InternalDeviceEvent {
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
// for serializer
protected InternalDeviceEvent() {
this.providerId = null;
this.deviceId = null;
this.deviceDescription = null;
}
}
......
......@@ -37,4 +37,11 @@ public class InternalPortEvent {
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
// for serializer
protected InternalPortEvent() {
this.providerId = null;
this.deviceId = null;
this.portDescriptions = null;
}
}
......
......@@ -35,4 +35,11 @@ public class InternalPortStatusEvent {
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
// for serializer
protected InternalPortStatusEvent() {
this.providerId = null;
this.deviceId = null;
this.portDescription = null;
}
}
......
......@@ -119,8 +119,8 @@ implements MastershipStore {
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
// TODO Auto-generated method stub
return null;
// FIXME: implement this
return MastershipTerm.of(getMaster(deviceId), 1);
}
@Override
......
......@@ -68,7 +68,7 @@ public class FlowModBuilder {
this.cookie = flowRule.id();
}
public OFFlowMod buildFlowMod() {
public OFFlowMod buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
......@@ -86,6 +86,24 @@ public class FlowModBuilder {
}
public OFFlowMod buildFlowMod() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
.build();
return fm;
}
public OFFlowMod buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
......
......@@ -2,8 +2,17 @@ package org.onlab.onos.provider.of.flow.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -14,9 +23,11 @@ import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
......@@ -27,6 +38,8 @@ import org.onlab.onos.openflow.controller.OpenFlowSwitch;
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
......@@ -42,9 +55,11 @@ import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
......@@ -70,6 +85,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final InternalFlowProvider listener = new InternalFlowProvider();
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
/**
* Creates an OpenFlow host provider.
*/
......@@ -101,7 +119,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
......@@ -154,6 +172,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
......@@ -166,7 +185,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
}
break;
case ERROR:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
......@@ -226,6 +255,144 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
switch (fbe.getOperator()) {
case ADD:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
break;
case REMOVE:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
break;
case MODIFY:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
}
InstallationFuture installation = new InstallationFuture(sws);
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
private class InstallationFuture implements Future<Void> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
public InstallationFuture(Set<Dpid> sws) {
this.sws = sws;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
//TODO add reason to flowentry
//TODO handle specific error msgs
//offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
break;
case BAD_INSTRUCTION:
break;
case BAD_MATCH:
break;
case BAD_REQUEST:
break;
case EXPERIMENTER:
break;
case FLOW_MOD_FAILED:
break;
case GROUP_MOD_FAILED:
break;
case HELLO_FAILED:
break;
case METER_MOD_FAILED:
break;
case PORT_MOD_FAILED:
break;
case QUEUE_OP_FAILED:
break;
case ROLE_REQUEST_FAILED:
break;
case SWITCH_CONFIG_FAILED:
break;
case TABLE_FEATURES_FAILED:
break;
case TABLE_MOD_FAILED:
break;
default:
break;
}
}
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
sws.remove(controller.getSwitch(dpid));
countDownLatch.countDown();
}
public void verify(Integer id) {
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
.setXid(id);
sw.sendMsg(builder.build());
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
return sws.isEmpty();
}
@Override
public Void get() throws InterruptedException, ExecutionException {
countDownLatch.await();
//return offendingFlowMods;
return null;
}
@Override
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
countDownLatch.await(timeout, unit);
//return offendingFlowMods;
return null;
}
}
}
......
......@@ -8,11 +8,16 @@ import io.netty.handler.codec.ReplayingDecoder;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
......@@ -57,4 +62,10 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(false, "Must not be here");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
package org.onlab.netty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
......@@ -11,6 +14,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
private final Logger log = LoggerFactory.getLogger(getClass());
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
......@@ -47,4 +52,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write payload.
out.writeBytes(payload);
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
......@@ -248,6 +248,7 @@ public class NettyMessagingService implements MessagingService {
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsManager;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleClient {
private SimpleClient() {
}
public static void main(String... args) throws Exception {
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
messaging.activate();
metrics.activate();
MetricsFeature feature = new MetricsFeature("timers");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
final int warmup = 100;
for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
"Hello World".getBytes());
System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
}
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
}
}
}
package org.onlab.netty;
//FIXME: Should be move out to test or app
public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
}