alshabib
Committed by Gerrit Code Review

adding group garbage collection functionality

If a group has a reference count of zero for more than
a configurable timeout, it is garbage collected.

This feature can be deactivated by component config.

Change-Id: I254d62a90ef7ac8d2ce2f406b67957455a5bf4d0
......@@ -35,6 +35,7 @@ public class DefaultGroup extends DefaultGroupDescription
private long bytes;
private long referenceCount;
private GroupId id;
private int age;
/**
* Initializes default values.
......@@ -48,6 +49,7 @@ public class DefaultGroup extends DefaultGroupDescription
packets = 0;
bytes = 0;
referenceCount = 0;
age = 0;
}
/**
......@@ -128,6 +130,11 @@ public class DefaultGroup extends DefaultGroupDescription
return this.bytes;
}
@Override
public int age() {
return age;
}
/**
* Sets the new state for this entry.
*
......@@ -171,6 +178,11 @@ public class DefaultGroup extends DefaultGroupDescription
@Override
public void setReferenceCount(long referenceCount) {
this.referenceCount = referenceCount;
if (referenceCount == 0) {
age++;
} else {
age = 0;
}
}
@Override
......@@ -214,6 +226,7 @@ public class DefaultGroup extends DefaultGroupDescription
.add("description", super.toString())
.add("groupid", id)
.add("state", state)
.add("age", age)
.toString();
}
......
......@@ -96,4 +96,12 @@ public interface Group extends GroupDescription {
* @return number of flow rules or other groups pointing to this group
*/
long referenceCount();
/**
* Obtains the age of a group. The age reflects the number of polling rounds
* the group has had a reference count of zero.
*
* @return the age of the group as an integer
*/
int age();
}
......
......@@ -19,15 +19,17 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.DefaultGroupId;
......@@ -54,19 +56,21 @@ import org.onosproject.net.group.StoredGroupBucketEntry;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -75,6 +79,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -83,7 +88,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -99,6 +106,9 @@ public class DistributedGroupStore
private final Logger log = getLogger(getClass());
private static final boolean GARBAGE_COLLECT = false;
private static final int GC_THRESH = 6;
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
......@@ -114,6 +124,9 @@ public class DistributedGroupStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
// Per device group table with (device id + app cookie) as key
private ConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> groupStoreEntriesByKey = null;
......@@ -135,8 +148,18 @@ public class DistributedGroupStore
private KryoNamespace clusterMsgSerializer;
@Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
label = "Enable group garbage collection")
private boolean garbageCollect = GARBAGE_COLLECT;
@Property(name = "gcThresh", intValue = GC_THRESH,
label = "Number of rounds for group garbage collection")
private int gcThresh = GC_THRESH;
@Activate
public void activate() {
cfgService.registerProperties(getClass());
kryoBuilder = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(DefaultGroup.class,
......@@ -193,12 +216,29 @@ public class DistributedGroupStore
@Deactivate
public void deactivate() {
cfgService.unregisterProperties(getClass(), false);
clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
groupStoreEntriesByKey.destroy();
auditPendingReqQueue.destroy();
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
try {
String s = get(properties, "garbageCollect");
garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
s = get(properties, "gcThresh");
gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
} catch (Exception e) {
gcThresh = GC_THRESH;
garbageCollect = GARBAGE_COLLECT;
}
}
private static NewConcurrentHashMap<GroupId, Group>
lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
......@@ -268,7 +308,6 @@ public class DistributedGroupStore
* Returns the groups associated with a device.
*
* @param deviceId the device ID
*
* @return the group entries
*/
@Override
......@@ -296,7 +335,6 @@ public class DistributedGroupStore
*
* @param deviceId the device ID
* @param appCookie the group key
*
* @return a group associated with the key
*/
@Override
......@@ -377,7 +415,8 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
clusterMsgSerializer::serialize,
mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
mastershipService.getMasterFor(groupDesc.deviceId()))
.whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
groupOp,
......@@ -415,7 +454,7 @@ public class DistributedGroupStore
return null;
}
for (Group extraneousGroup:extraneousMap.values()) {
for (Group extraneousGroup : extraneousMap.values()) {
if (extraneousGroup.buckets().equals(buckets)) {
return extraneousGroup;
}
......@@ -688,7 +727,7 @@ public class DistributedGroupStore
if (type == UpdateType.ADD) {
// Check if the any of the new buckets are part of
// the old bucket list
for (GroupBucket addBucket:buckets.buckets()) {
for (GroupBucket addBucket : buckets.buckets()) {
if (!newBucketList.contains(addBucket)) {
newBucketList.add(addBucket);
groupDescUpdated = true;
......@@ -697,7 +736,7 @@ public class DistributedGroupStore
} else if (type == UpdateType.REMOVE) {
// Check if the to be removed buckets are part of the
// old bucket list
for (GroupBucket removeBucket:buckets.buckets()) {
for (GroupBucket removeBucket : buckets.buckets()) {
if (newBucketList.contains(removeBucket)) {
newBucketList.remove(removeBucket);
groupDescUpdated = true;
......@@ -795,11 +834,11 @@ public class DistributedGroupStore
group.id(),
group.deviceId());
synchronized (existing) {
for (GroupBucket bucket:group.buckets().buckets()) {
for (GroupBucket bucket : group.buckets().buckets()) {
Optional<GroupBucket> matchingBucket =
existing.buckets().buckets()
.stream()
.filter((existingBucket)->(existingBucket.equals(bucket)))
.filter((existingBucket) -> (existingBucket.equals(bucket)))
.findFirst();
if (matchingBucket.isPresent()) {
((StoredGroupBucketEntry) matchingBucket.
......@@ -814,6 +853,7 @@ public class DistributedGroupStore
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
existing.setReferenceCount(group.referenceCount());
if ((existing.state() == GroupState.PENDING_ADD) ||
(existing.state() == GroupState.PENDING_ADD_RETRY)) {
log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
......@@ -901,12 +941,12 @@ public class DistributedGroupStore
List<StoredGroupEntry> pendingGroupRequests =
getPendingGroupKeyTable().values()
.stream()
.filter(g-> g.deviceId().equals(deviceId))
.filter(g -> g.deviceId().equals(deviceId))
.collect(Collectors.toList());
log.debug("processing pending group add requests for device {} and number of pending requests {}",
deviceId,
pendingGroupRequests.size());
for (Group group:pendingGroupRequests) {
for (Group group : pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
......@@ -1144,6 +1184,7 @@ public class DistributedGroupStore
protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
private final GroupKey appCookie;
public GroupStoreKeyMapKey(DeviceId deviceId,
GroupKey appCookie) {
super(deviceId);
......@@ -1175,6 +1216,7 @@ public class DistributedGroupStore
protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
private final GroupId groupId;
public GroupStoreIdMapKey(DeviceId deviceId,
GroupId groupId) {
super(deviceId);
......@@ -1233,12 +1275,15 @@ public class DistributedGroupStore
log.trace("Stored Group {} for device {}", group, deviceId);
}
garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
it2.remove();
}
......@@ -1283,6 +1328,29 @@ public class DistributedGroupStore
}
}
private void garbageCollect(DeviceId deviceId,
Set<Group> southboundGroupEntries,
Set<StoredGroupEntry> storedGroupEntries) {
if (!garbageCollect) {
return;
}
Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
while (it.hasNext()) {
StoredGroupEntry group = it.next();
if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
log.debug("Garbage collecting group {} on {}", group, deviceId);
deleteGroupDescription(deviceId, group.appCookie());
southboundGroupEntries.remove(group);
it.remove();
}
}
}
private boolean checkGroupRefCount(Group group) {
return (group.referenceCount() == 0 && group.age() >= gcThresh);
}
private void groupMissing(Group group) {
switch (group.state()) {
case PENDING_DELETE:
......
......@@ -23,6 +23,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
......@@ -129,6 +130,7 @@ public class DistributedGroupStoreTest {
groupStoreImpl.storageService = new TestStorageService();
groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
groupStoreImpl.mastershipService = new MasterOfAll();
groupStoreImpl.cfgService = new ComponentConfigAdapter();
groupStoreImpl.activate();
groupStore = groupStoreImpl;
auditPendingReqQueue =
......
......@@ -156,6 +156,11 @@ public class GroupsResourceTest extends ResourceTest {
}
@Override
public int age() {
return 0;
}
@Override
public Type type() {
return GroupDescription.Type.ALL;
}
......