Thomas Vachuska

Merge remote-tracking branch 'origin/master'

package org.onlab.onos.cluster;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
/**
* A container for detailed role information for a device,
* within the current cluster. Role attributes include current
......@@ -15,7 +16,7 @@ public class RoleInfo {
public RoleInfo(NodeId master, List<NodeId> backups) {
this.master = master;
this.backups = Collections.unmodifiableList(backups);
this.backups = ImmutableList.copyOf(backups);
}
public NodeId master() {
......
......@@ -24,7 +24,18 @@ public class FlowRuleEvent extends AbstractEvent<FlowRuleEvent.Type, FlowRule> {
/**
* Signifies that a rule has been updated.
*/
RULE_UPDATED
RULE_UPDATED,
// internal event between Manager <-> Store
/*
* Signifies that a request to add flow rule has been added to the store.
*/
RULE_ADD_REQUESTED,
/*
* Signifies that a request to remove flow rule has been added to the store.
*/
RULE_REMOVE_REQUESTED,
}
/**
......
......@@ -44,16 +44,18 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
* Stores a new flow rule without generating events.
*
* @param rule the flow rule to add
* @return true if the rule should be handled locally
*/
void storeFlowRule(FlowRule rule);
boolean storeFlowRule(FlowRule rule);
/**
* Marks a flow rule for deletion. Actual deletion will occur
* when the provider indicates that the flow has been removed.
*
* @param rule the flow rule to delete
* @return true if the rule should be handled locally
*/
void deleteFlowRule(FlowRule rule);
boolean deleteFlowRule(FlowRule rule);
/**
* Stores a new flow rule, or updates an existing entry.
......
......@@ -104,24 +104,52 @@ public class FlowRuleManager
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = flowRules[i];
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
store.storeFlowRule(f);
boolean local = store.storeFlowRule(f);
if (local) {
// TODO: aggregate all local rules and push down once?
applyFlowRulesToProviders(f);
}
}
}
private void applyFlowRulesToProviders(FlowRule... flowRules) {
DeviceId did = null;
FlowRuleProvider frp = null;
for (FlowRule f : flowRules) {
if (!f.deviceId().equals(did)) {
did = f.deviceId();
final Device device = deviceService.getDevice(did);
frp = getProvider(device.providerId());
}
if (frp != null) {
frp.applyFlowRule(f);
}
}
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
FlowRule f;
FlowRuleProvider frp;
Device device;
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
device = deviceService.getDevice(f.deviceId());
store.deleteFlowRule(f);
if (device != null) {
boolean local = store.deleteFlowRule(f);
if (local) {
// TODO: aggregate all local rules and push down once?
removeFlowRulesFromProviders(f);
}
}
}
private void removeFlowRulesFromProviders(FlowRule... flowRules) {
DeviceId did = null;
FlowRuleProvider frp = null;
for (FlowRule f : flowRules) {
if (!f.deviceId().equals(did)) {
did = f.deviceId();
final Device device = deviceService.getDevice(did);
frp = getProvider(device.providerId());
}
if (frp != null) {
frp.removeFlowRule(f);
}
}
......@@ -135,8 +163,11 @@ public class FlowRuleManager
for (FlowRule f : rules) {
store.deleteFlowRule(f);
// FIXME: only accept request and push to provider on internal event
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
// FIXME: flows removed from store and flows removed from might diverge
// get rid of #removeRulesById?
frp.removeRulesById(id, f);
}
}
......@@ -352,7 +383,23 @@ public class FlowRuleManager
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
@Override
public void notify(FlowRuleEvent event) {
switch (event.type()) {
case RULE_ADD_REQUESTED:
applyFlowRulesToProviders(event.subject());
break;
case RULE_REMOVE_REQUESTED:
removeFlowRulesFromProviders(event.subject());
break;
case RULE_ADDED:
case RULE_REMOVED:
case RULE_UPDATED:
// only dispatch events related to switch
eventDispatcher.post(event);
break;
default:
break;
}
}
}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.flow.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
import java.io.IOException;
import java.util.Collection;
......@@ -30,6 +31,7 @@ import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
......@@ -80,10 +82,44 @@ public class DistributedFlowRuleStore
};
// TODO: make this configurable
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received add request for {}", rule);
storeFlowEntryInternal(rule);
// FIXME what to respond.
try {
// FIXME: #respond() not working. responded message is
// handled by this sender node and never goes back.
message.respond(SERIALIZER.encode("ACK"));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
});
clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received delete request for {}", rule);
deleteFlowRuleInternal(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
});
log.info("Started");
}
......@@ -131,13 +167,14 @@ public class DistributedFlowRuleStore
}
@Override
public void storeFlowRule(FlowRule rule) {
public boolean storeFlowRule(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
storeFlowEntryInternal(rule);
return;
return storeFlowEntryInternal(rule);
}
log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.STORE_FLOW_RULE,
......@@ -150,26 +187,29 @@ public class DistributedFlowRuleStore
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
return false;
}
private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
// write to local copy.
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
flowEntriesById.put(flowRule.appId(), flowEntry);
notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
return true;
}
// write to backup.
// TODO: write to a hazelcast map.
return false;
}
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
public synchronized boolean deleteFlowRule(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
deleteFlowRuleInternal(rule);
return;
return deleteFlowRuleInternal(rule);
}
ClusterMessage message = new ClusterMessage(
......@@ -184,15 +224,21 @@ public class DistributedFlowRuleStore
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
return false;
}
private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry == null) {
return;
return false;
}
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: also update backup.
notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
return true;
}
@Override
......
......@@ -8,6 +8,7 @@ import java.util.HashMap;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.RoleInfo;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
......@@ -27,7 +28,11 @@ import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instructions;
import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.link.DefaultLinkDescription;
......@@ -90,7 +95,21 @@ public final class KryoNamespaces {
DefaultHostDescription.class,
DefaultFlowRule.class,
FlowId.class,
DefaultTrafficSelector.class
DefaultTrafficSelector.class,
Criteria.PortCriterion.class,
Criteria.EthCriterion.class,
Criteria.EthTypeCriterion.class,
Criteria.IPCriterion.class,
Criteria.IPProtocolCriterion.class,
Criteria.VlanIdCriterion.class,
Criteria.VlanPcpCriterion.class,
Criteria.TcpPortCriterion.class,
Criterion.class,
Criterion.Type.class,
DefaultTrafficTreatment.class,
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
RoleInfo.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
......
......@@ -3,6 +3,7 @@ package org.onlab.onos.store.serializers;
import static org.junit.Assert.assertEquals;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
import static java.util.Arrays.asList;
import java.nio.ByteBuffer;
......@@ -11,6 +12,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.RoleInfo;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
......@@ -198,6 +200,12 @@ public class KryoSerializerTest {
}
@Test
public void testRoleInfo() {
testSerialized(new RoleInfo(new NodeId("master"),
asList(new NodeId("stby1"), new NodeId("stby2"))));
}
@Test
public void testAnnotations() {
// Annotations does not have equals defined, manually test equality
final byte[] a1Bytes = serializer.encode(A1);
......
......@@ -148,8 +148,9 @@ public class SimpleFlowRuleStore
}
@Override
public void storeFlowRule(FlowRule rule) {
public boolean storeFlowRule(FlowRule rule) {
final boolean added = storeFlowRuleInternal(rule);
return added;
}
private boolean storeFlowRuleInternal(FlowRule rule) {
......@@ -166,13 +167,14 @@ public class SimpleFlowRuleStore
}
// new flow rule added
existing.add(f);
// TODO: notify through delegate about remote event?
// TODO: Should we notify only if it's "remote" event?
//notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
return true;
}
}
@Override
public void deleteFlowRule(FlowRule rule) {
public boolean deleteFlowRule(FlowRule rule) {
List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
synchronized (entries) {
......@@ -180,12 +182,15 @@ public class SimpleFlowRuleStore
if (entry.equals(rule)) {
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
return;
// TODO: Should we notify only if it's "remote" event?
//notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
return true;
}
}
}
}
//log.warn("Cannot find rule {}", rule);
return false;
}
@Override
......