Madan Jampani
Committed by Gerrit Code Review

Improvement: Ensure configurations options are current and valid in NewDistributedFlowRuleStore

Bug fix: Only accept backups for devices that the local node does not manage.

Change-Id: If7b1e8c3b0339e5d756e250c38fe53dc191084d1
......@@ -81,6 +81,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
......@@ -106,6 +107,7 @@ public class NewDistributedFlowRuleStore
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final boolean DEFAULT_BACKUP_ENABLED = true;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
......@@ -116,6 +118,10 @@ public class NewDistributedFlowRuleStore
label = "Indicates whether backups are enabled or not")
private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
label = "Delay in ms between successive backup runs")
private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
private InternalFlowTable flowTable = new InternalFlowTable();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -142,6 +148,7 @@ public class NewDistributedFlowRuleStore
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
private ExecutorService messageHandlingExecutor;
private ScheduledFuture<?> backupTask;
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
......@@ -173,7 +180,13 @@ public class NewDistributedFlowRuleStore
registerMessageHandlers(messageHandlingExecutor);
backupSenderExecutor.scheduleWithFixedDelay(() -> flowTable.backup(), 0, 2000, TimeUnit.MILLISECONDS);
if (backupEnabled) {
backupTask = backupSenderExecutor.scheduleWithFixedDelay(
flowTable::backup,
0,
backupPeriod,
TimeUnit.MILLISECONDS);
}
logConfig("Started");
}
......@@ -199,6 +212,7 @@ public class NewDistributedFlowRuleStore
Dictionary properties = context.getProperties();
int newPoolSize;
boolean newBackupEnabled;
int newBackupPeriod;
try {
String s = get(properties, "msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
......@@ -206,13 +220,38 @@ public class NewDistributedFlowRuleStore
s = get(properties, "backupEnabled");
newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
s = get(properties, "backupPeriod");
newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupEnabled = DEFAULT_BACKUP_ENABLED;
newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
}
boolean restartBackupTask = false;
if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
if (!backupEnabled && backupTask != null) {
backupTask.cancel(false);
backupTask = null;
}
restartBackupTask = backupEnabled;
}
if (newBackupPeriod != backupPeriod) {
backupPeriod = newBackupPeriod;
restartBackupTask = backupEnabled;
}
if (restartBackupTask) {
if (backupTask != null) {
// cancel previously running task
backupTask.cancel(false);
}
backupTask = backupSenderExecutor.scheduleWithFixedDelay(
flowTable::backup,
0,
backupPeriod,
TimeUnit.MILLISECONDS);
}
if (newPoolSize != msgHandlerPoolSize) {
msgHandlerPoolSize = newPoolSize;
......@@ -254,8 +293,8 @@ public class NewDistributedFlowRuleStore
}
private void logConfig(String prefix) {
log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
prefix, msgHandlerPoolSize, backupEnabled);
log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
}
// This is not a efficient operation on a distributed sharded
......@@ -620,6 +659,9 @@ public class NewDistributedFlowRuleStore
}
private void backup() {
if (!backupEnabled) {
return;
}
//TODO: Force backup when backups change.
try {
// determine the set of devices that we need to backup during this run.
......@@ -674,7 +716,9 @@ public class NewDistributedFlowRuleStore
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
Maps.filterKeys(flowTables, managedDevices::contains).forEach((deviceId, flowTable) -> {
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
.forEach((deviceId, flowTable) -> {
Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
deviceFlowTable.clear();
deviceFlowTable.putAll(flowTable);
......