Madan Jampani
Committed by Gerrit Code Review

ONOS-2077: Limit the number of devices whose flow entries are backed in each communication round

Change-Id: I190a05bb1a123ad49edc6d2d192295c05587e410
...@@ -106,6 +106,8 @@ public class NewDistributedFlowRuleStore ...@@ -106,6 +106,8 @@ public class NewDistributedFlowRuleStore
106 private static final boolean DEFAULT_BACKUP_ENABLED = true; 106 private static final boolean DEFAULT_BACKUP_ENABLED = true;
107 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000; 107 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
108 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; 108 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
109 + // number of devices whose flow entries will be backed up in one communication round
110 + private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
109 111
110 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE, 112 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
111 label = "Number of threads in the message handler pool") 113 label = "Number of threads in the message handler pool")
...@@ -638,7 +640,16 @@ public class NewDistributedFlowRuleStore ...@@ -638,7 +640,16 @@ public class NewDistributedFlowRuleStore
638 } 640 }
639 } 641 }
640 642
643 + private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
644 + // split up the devices into smaller batches and send them separately.
645 + Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
646 + .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
647 + }
648 +
641 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) { 649 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
650 + if (deviceIds.isEmpty()) {
651 + return;
652 + }
642 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId); 653 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
643 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries = 654 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
644 Maps.newConcurrentMap(); 655 Maps.newConcurrentMap();
...@@ -750,7 +761,7 @@ public class NewDistributedFlowRuleStore ...@@ -750,7 +761,7 @@ public class NewDistributedFlowRuleStore
750 } 761 }
751 }); 762 });
752 // send the device flow entries to their respective backup nodes 763 // send the device flow entries to their respective backup nodes
753 - devicesToBackupByNode.forEach(this::backupFlowEntries); 764 + devicesToBackupByNode.forEach(this::sendBackups);
754 } catch (Exception e) { 765 } catch (Exception e) {
755 log.error("Backup failed.", e); 766 log.error("Backup failed.", e);
756 } 767 }
......