Charles Chan
Committed by Gerrit Code Review

CORD-394 Purge group/flow store when device goes offline

Stage 1: (this commit)
Add a component config purgeOnDisconnection, which is false by default.
When set to true, GroupManager and FlowManager will purge groups/flows
associated with a device when the device goes offline.

Stage 2: (upcoming commit)
Enable these configs in SegmentRoutingManager
Clean up group related information in SegmentRountingManager

Change-Id: I46d047d690d4641e030f6cdd084ce16ac02d8919
......@@ -107,6 +107,13 @@ public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDe
FlowRuleEvent pendingFlowRule(FlowEntry rule);
/**
* Removes all flow entries of given device from store.
*
* @param deviceId device id
*/
void purgeFlowRule(DeviceId deviceId);
/**
* Updates the flow table statistics of the specified device using
* the given statistics.
*
......
......@@ -118,6 +118,13 @@ public interface GroupStore extends Store<GroupEvent, GroupStoreDelegate> {
void removeGroupEntry(Group group);
/**
* Removes all group entries of given device from store.
*
* @param deviceId device id
*/
void purgeGroupEntry(DeviceId deviceId);
/**
* A group entry that is present in switch but not in the store.
*
* @param group group entry
......
......@@ -274,6 +274,10 @@ public class SimpleFlowRuleStore
return null;
}
public void purgeFlowRule(DeviceId deviceId) {
flowEntries.remove(deviceId);
}
@Override
public void storeBatch(
FlowRuleBatchOperation operation) {
......
......@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -477,6 +479,19 @@ public class SimpleGroupStore
}
@Override
public void purgeGroupEntry(DeviceId deviceId) {
Set<Map.Entry<GroupId, StoredGroupEntry>> entryPendingRemove =
groupEntriesById.get(deviceId).entrySet();
groupEntriesById.remove(deviceId);
groupEntriesByKey.remove(deviceId);
entryPendingRemove.forEach(entry -> {
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
});
}
@Override
public void deviceInitialAuditCompleted(DeviceId deviceId,
boolean completed) {
synchronized (deviceAuditStatus) {
......
......@@ -16,6 +16,8 @@
package org.onosproject.store.trivial;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onosproject.net.DeviceId.deviceId;
import java.util.ArrayList;
......@@ -199,6 +201,11 @@ public class SimpleGroupStoreTest {
// Testing removeGroupEntry operation from southbound
testRemoveGroupFromSB(currKey);
// Testing removing all groups on the given device
newKey = new DefaultGroupKey("group1".getBytes());
testStoreAndGetGroup(newKey);
testDeleteGroupOnDevice(newKey);
}
// Testing storeGroup operation
......@@ -376,6 +383,13 @@ public class SimpleGroupStoreTest {
simpleGroupStore.unsetDelegate(deleteGroupDescDelegate);
}
// Testing deleteGroupDescription operation from northbound
private void testDeleteGroupOnDevice(GroupKey currKey) {
assertThat(simpleGroupStore.getGroupCount(D1), is(1));
simpleGroupStore.purgeGroupEntry(D1);
assertThat(simpleGroupStore.getGroupCount(D1), is(0));
}
// Testing removeGroupEntry operation from southbound
private void testRemoveGroupFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
......
......@@ -15,7 +15,6 @@
*/
package org.onosproject.net.flow.impl;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -31,8 +30,9 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
......@@ -75,6 +75,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_ADD_REQUESTED;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVE_REQUESTED;
......@@ -101,9 +102,14 @@ public class FlowRuleManager
label = "Allow flow rules in switch not installed by ONOS")
private boolean allowExtraneousRules = ALLOW_EXTRANEOUS_RULES;
@Property(name = "purgeOnDisconnection", boolValue = false,
label = "Purge entries associated with a device when the device goes offline")
private boolean purgeOnDisconnection = false;
private final Logger log = getLogger(getClass());
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
protected ExecutorService deviceInstallers =
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d"));
......@@ -130,13 +136,12 @@ public class FlowRuleManager
@Activate
public void activate(ComponentContext context) {
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
cfgService.registerProperties(getClass());
idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
modified(context);
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
}
......@@ -152,18 +157,59 @@ public class FlowRuleManager
@Modified
public void modified(ComponentContext context) {
if (context == null) {
return;
if (context != null) {
readComponentConfiguration(context);
}
}
/**
* Extracts properties from the component configuration context.
*
* @param context the component context
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Boolean flag;
flag = isPropertyEnabled(properties, "allowExtraneousRules");
if (flag == null) {
log.info("AllowExtraneousRules is not configured, " +
"using current value of {}", allowExtraneousRules);
} else {
allowExtraneousRules = flag;
log.info("Configured. AllowExtraneousRules is {}",
allowExtraneousRules ? "enabled" : "disabled");
}
String s = Tools.get(properties, "allowExtraneousRules");
allowExtraneousRules = Strings.isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_RULES : Boolean.valueOf(s);
flag = isPropertyEnabled(properties, "purgeOnDisconnection");
if (flag == null) {
log.info("PurgeOnDisconnection is not configured, " +
"using current value of {}", purgeOnDisconnection);
} else {
purgeOnDisconnection = flag;
log.info("Configured. PurgeOnDisconnection is {}",
purgeOnDisconnection ? "enabled" : "disabled");
}
}
if (allowExtraneousRules) {
log.info("Allowing flow rules not installed by ONOS");
/**
* Check property name is defined and set to true.
*
* @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
private static Boolean isPropertyEnabled(Dictionary<?, ?> properties,
String propertyName) {
Boolean value = null;
try {
String s = (String) properties.get(propertyName);
value = isNullOrEmpty(s) ? null : s.trim().equals("true");
} catch (ClassCastException e) {
// No propertyName defined.
value = null;
}
return value;
}
@Override
......@@ -613,4 +659,23 @@ public class FlowRuleManager
checkPermission(FLOWRULE_READ);
return store.getTableStatistics(deviceId);
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_REMOVED:
case DEVICE_AVAILABILITY_CHANGED:
DeviceId deviceId = event.subject().id();
if (!deviceService.isAvailable(deviceId)) {
if (purgeOnDisconnection) {
store.purgeFlowRule(deviceId);
}
}
break;
default:
break;
}
}
}
}
......
......@@ -18,9 +18,12 @@ package org.onosproject.net.group.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
......@@ -43,11 +46,14 @@ import org.onosproject.net.group.GroupStore;
import org.onosproject.net.group.GroupStore.UpdateType;
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.provider.AbstractProviderService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;
......@@ -75,21 +81,78 @@ public class GroupManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
@Property(name = "purgeOnDisconnection", boolValue = false,
label = "Purge entries associated with a device when the device goes offline")
private boolean purgeOnDisconnection = false;
@Activate
public void activate() {
public void activate(ComponentContext context) {
store.setDelegate(delegate);
eventDispatcher.addSink(GroupEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
cfgService.registerProperties(getClass());
modified(context);
log.info("Started");
}
@Deactivate
public void deactivate() {
cfgService.unregisterProperties(getClass(), false);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(GroupEvent.class);
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
if (context != null) {
readComponentConfiguration(context);
}
}
/**
* Extracts properties from the component configuration context.
*
* @param context the component context
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Boolean flag;
flag = isPropertyEnabled(properties, "purgeOnDisconnection");
if (flag == null) {
log.info("PurgeOnDisconnection is not configured, " +
"using current value of {}", purgeOnDisconnection);
} else {
purgeOnDisconnection = flag;
log.info("Configured. PurgeOnDisconnection is {}",
purgeOnDisconnection ? "enabled" : "disabled");
}
}
/**
* Check property name is defined and set to true.
*
* @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
private static Boolean isPropertyEnabled(Dictionary<?, ?> properties,
String propertyName) {
Boolean value = null;
try {
String s = (String) properties.get(propertyName);
value = isNullOrEmpty(s) ? null : s.trim().equals("true");
} catch (ClassCastException e) {
// No propertyName defined.
value = null;
}
return value;
}
/**
* Create a group in the specified device with the provided parameters.
*
......@@ -303,13 +366,17 @@ public class GroupManager
switch (event.type()) {
case DEVICE_REMOVED:
case DEVICE_AVAILABILITY_CHANGED:
if (!deviceService.isAvailable(event.subject().id())) {
DeviceId deviceId = event.subject().id();
if (!deviceService.isAvailable(deviceId)) {
log.debug("Device {} became un available; clearing initial audit status",
event.type(), event.subject().id());
store.deviceInitialAuditCompleted(event.subject().id(), false);
if (purgeOnDisconnection) {
store.purgeGroupEntry(deviceId);
}
}
break;
default:
break;
}
......
......@@ -32,6 +32,7 @@ import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
......@@ -89,11 +90,12 @@ public class GroupManagerTest {
mgr = new GroupManager();
groupService = mgr;
mgr.deviceService = new DeviceManager();
mgr.cfgService = new ComponentConfigAdapter();
mgr.store = new SimpleGroupStore();
injectEventDispatcher(mgr, new TestEventDispatcher());
providerRegistry = mgr;
mgr.activate();
mgr.activate(null);
mgr.addListener(listener);
internalProvider = new TestGroupProvider(PID);
......
......@@ -611,6 +611,11 @@ public class NewDistributedFlowRuleStore
}
@Override
public void purgeFlowRule(DeviceId deviceId) {
flowTable.purgeFlowRule(deviceId);
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
//FIXME: need a per device pending response
NodeId nodeId = pendingResponses.remove(event.subject().batchId());
......@@ -827,6 +832,10 @@ public class NewDistributedFlowRuleStore
}
}
public void purgeFlowRule(DeviceId deviceId) {
flowEntries.remove(deviceId);
}
private NodeId getBackupNode(DeviceId deviceId) {
List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
// pick the standby which is most likely to become next master
......
......@@ -67,8 +67,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
......@@ -845,6 +847,21 @@ public class DistributedGroupStore
}
@Override
public void purgeGroupEntry(DeviceId deviceId) {
Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
new HashSet<>();
groupStoreEntriesByKey.entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.forEach(entryPendingRemove::add);
entryPendingRemove.forEach(entry -> {
groupStoreEntriesByKey.remove(entry.getKey());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
});
}
@Override
public void deviceInitialAuditCompleted(DeviceId deviceId,
boolean completed) {
synchronized (deviceAuditStatus) {
......
......@@ -71,8 +71,10 @@ public class DistributedGroupStoreTest {
DeviceId deviceId2 = did("dev2");
GroupId groupId1 = new DefaultGroupId(1);
GroupId groupId2 = new DefaultGroupId(2);
GroupId groupId3 = new DefaultGroupId(3);
GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
TrafficTreatment treatment =
DefaultTrafficTreatment.emptyTreatment();
......@@ -97,6 +99,13 @@ public class DistributedGroupStoreTest {
groupKey2,
groupId2.id(),
APP_ID);
GroupDescription groupDescription3 = new DefaultGroupDescription(
deviceId2,
GroupDescription.Type.INDIRECT,
buckets,
groupKey3,
groupId3.id(),
APP_ID);
DistributedGroupStore groupStoreImpl;
GroupStore groupStore;
......@@ -202,6 +211,30 @@ public class DistributedGroupStoreTest {
}
/**
* Tests removing all groups on the given device.
*/
@Test
public void testRemoveGroupOnDevice() throws Exception {
groupStore.deviceInitialAuditCompleted(deviceId1, true);
assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
groupStore.deviceInitialAuditCompleted(deviceId2, true);
assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
// Make sure the pending list starts out empty
assertThat(auditPendingReqQueue.size(), is(0));
groupStore.storeGroupDescription(groupDescription1);
groupStore.storeGroupDescription(groupDescription2);
groupStore.storeGroupDescription(groupDescription3);
assertThat(groupStore.getGroupCount(deviceId1), is(1));
assertThat(groupStore.getGroupCount(deviceId2), is(2));
groupStore.purgeGroupEntry(deviceId2);
assertThat(groupStore.getGroupCount(deviceId1), is(1));
assertThat(groupStore.getGroupCount(deviceId2), is(0));
}
/**
* Tests adding and removing a group.
*/
@Test
......