Brian O'Connor

Adding configurable variables to DistributedFlowRuleStore

Change-Id: I989de6b4ca4caf8ac1e648fbae766d0f88414419
......@@ -74,11 +74,6 @@
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
......
......@@ -52,6 +52,10 @@
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -23,10 +23,13 @@ import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
......@@ -68,12 +71,14 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -88,6 +93,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
......@@ -104,8 +111,17 @@ public class DistributedFlowRuleStore
private final Logger log = getLogger(getClass());
// TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final boolean DEFAULT_BACKUP_ENABLED = false;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Number of threads in the message handler pool")
private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
@Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
label = "Indicates whether backups are enabled or not")
private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
private InternalFlowTable flowTable;
......@@ -127,6 +143,9 @@ public class DistributedFlowRuleStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
private ExecutorService messageHandlingExecutor;
......@@ -143,24 +162,22 @@ public class DistributedFlowRuleStore
}
};
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
private ReplicaInfoEventListener replicaInfoEventListener;
private IdGenerator idGenerator;
@Activate
public void activate() {
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
flowTable = new InternalFlowTable(); // .withBackupsEnabled(false);
flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled);
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
final NodeId local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/flow", "message-handlers"));
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
......@@ -222,11 +239,12 @@ public class DistributedFlowRuleStore
replicaInfoManager.addListener(replicaInfoEventListener);
log.info("Started");
logConfig("Started");
}
@Deactivate
public void deactivate() {
public void deactivate(ComponentContext context) {
configService.unregisterProperties(getClass(), false);
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
......@@ -237,6 +255,46 @@ public class DistributedFlowRuleStore
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
if (context == null) {
backupEnabled = DEFAULT_BACKUP_ENABLED;
logConfig("Default config");
return;
}
Dictionary properties = context.getProperties();
int newPoolSize;
boolean newBackupEnabled;
try {
String s = (String) properties.get("msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
s = (String) properties.get("backupEnabled");
newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupEnabled = DEFAULT_BACKUP_ENABLED;
}
if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
msgHandlerPoolSize = newPoolSize;
backupEnabled = newBackupEnabled;
// reconfigure the store
flowTable.withBackupsEnabled(backupEnabled);
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
oldMsgHandler.shutdown();
logConfig("Reconfigured");
}
}
private void logConfig(String prefix) {
log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
prefix, msgHandlerPoolSize, backupEnabled);
}
// This is not a efficient operation on a distributed sharded
// flow store. We need to revisit the need for this operation or at least
......@@ -639,14 +697,13 @@ public class DistributedFlowRuleStore
(flowId, flowEntry) ->
(flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = backupsEnabled ?
private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
new EventuallyConsistentMapImpl<>("flow-backup",
clusterService,
clusterCommunicator,
flowSerializer,
clockService,
(key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true)
: null;
(key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true);
private Collection<NodeId> getPeerNodes() {
List<NodeId> nodes = clusterService.getNodes()
......