Madan Jampani
Committed by Gerrit Code Review

FlowRuleStore: Configurable backup count

Change-Id: Ida4d3669e28e66350f4809539a48a456b6ec43c7
...@@ -16,90 +16,92 @@ ...@@ -16,90 +16,92 @@
16 package org.onosproject.store.flow.impl; 16 package org.onosproject.store.flow.impl;
17 17
18 import com.google.common.collect.ImmutableList; 18 import com.google.common.collect.ImmutableList;
19 - import com.google.common.collect.ImmutableMap; 19 +import com.google.common.collect.ImmutableMap;
20 - import com.google.common.collect.Iterables; 20 +import com.google.common.collect.Iterables;
21 - import com.google.common.collect.Maps; 21 +import com.google.common.collect.Maps;
22 - import com.google.common.collect.Sets; 22 +import com.google.common.collect.Sets;
23 - import com.google.common.util.concurrent.Futures; 23 +import com.google.common.util.concurrent.Futures;
24 - import org.apache.felix.scr.annotations.Activate; 24 +
25 - import org.apache.felix.scr.annotations.Component; 25 +import org.apache.felix.scr.annotations.Activate;
26 - import org.apache.felix.scr.annotations.Deactivate; 26 +import org.apache.felix.scr.annotations.Component;
27 - import org.apache.felix.scr.annotations.Modified; 27 +import org.apache.felix.scr.annotations.Deactivate;
28 - import org.apache.felix.scr.annotations.Property; 28 +import org.apache.felix.scr.annotations.Modified;
29 - import org.apache.felix.scr.annotations.Reference; 29 +import org.apache.felix.scr.annotations.Property;
30 - import org.apache.felix.scr.annotations.ReferenceCardinality; 30 +import org.apache.felix.scr.annotations.Reference;
31 - import org.apache.felix.scr.annotations.Service; 31 +import org.apache.felix.scr.annotations.ReferenceCardinality;
32 - import org.onlab.util.KryoNamespace; 32 +import org.apache.felix.scr.annotations.Service;
33 - import org.onlab.util.Tools; 33 +import org.onlab.util.KryoNamespace;
34 - import org.onosproject.cfg.ComponentConfigService; 34 +import org.onlab.util.Tools;
35 - import org.onosproject.cluster.ClusterService; 35 +import org.onosproject.cfg.ComponentConfigService;
36 - import org.onosproject.cluster.NodeId; 36 +import org.onosproject.cluster.ClusterService;
37 - import org.onosproject.core.CoreService; 37 +import org.onosproject.cluster.NodeId;
38 - import org.onosproject.core.IdGenerator; 38 +import org.onosproject.core.CoreService;
39 - import org.onosproject.mastership.MastershipService; 39 +import org.onosproject.core.IdGenerator;
40 - import org.onosproject.net.DeviceId; 40 +import org.onosproject.mastership.MastershipService;
41 - import org.onosproject.net.device.DeviceService; 41 +import org.onosproject.net.DeviceId;
42 - import org.onosproject.net.flow.CompletedBatchOperation; 42 +import org.onosproject.net.device.DeviceService;
43 - import org.onosproject.net.flow.DefaultFlowEntry; 43 +import org.onosproject.net.flow.CompletedBatchOperation;
44 - import org.onosproject.net.flow.FlowEntry; 44 +import org.onosproject.net.flow.DefaultFlowEntry;
45 - import org.onosproject.net.flow.FlowEntry.FlowEntryState; 45 +import org.onosproject.net.flow.FlowEntry;
46 - import org.onosproject.net.flow.FlowId; 46 +import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47 - import org.onosproject.net.flow.FlowRule; 47 +import org.onosproject.net.flow.FlowId;
48 - import org.onosproject.net.flow.FlowRuleBatchEntry; 48 +import org.onosproject.net.flow.FlowRule;
49 - import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation; 49 +import org.onosproject.net.flow.FlowRuleBatchEntry;
50 - import org.onosproject.net.flow.FlowRuleBatchEvent; 50 +import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51 - import org.onosproject.net.flow.FlowRuleBatchOperation; 51 +import org.onosproject.net.flow.FlowRuleBatchEvent;
52 - import org.onosproject.net.flow.FlowRuleBatchRequest; 52 +import org.onosproject.net.flow.FlowRuleBatchOperation;
53 - import org.onosproject.net.flow.FlowRuleEvent; 53 +import org.onosproject.net.flow.FlowRuleBatchRequest;
54 - import org.onosproject.net.flow.FlowRuleEvent.Type; 54 +import org.onosproject.net.flow.FlowRuleEvent;
55 - import org.onosproject.net.flow.FlowRuleService; 55 +import org.onosproject.net.flow.FlowRuleEvent.Type;
56 - import org.onosproject.net.flow.FlowRuleStore; 56 +import org.onosproject.net.flow.FlowRuleService;
57 - import org.onosproject.net.flow.FlowRuleStoreDelegate; 57 +import org.onosproject.net.flow.FlowRuleStore;
58 - import org.onosproject.net.flow.StoredFlowEntry; 58 +import org.onosproject.net.flow.FlowRuleStoreDelegate;
59 - import org.onosproject.net.flow.TableStatisticsEntry; 59 +import org.onosproject.net.flow.StoredFlowEntry;
60 - import org.onosproject.persistence.PersistenceService; 60 +import org.onosproject.net.flow.TableStatisticsEntry;
61 - import org.onosproject.store.AbstractStore; 61 +import org.onosproject.persistence.PersistenceService;
62 - import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 62 +import org.onosproject.store.AbstractStore;
63 - import org.onosproject.store.cluster.messaging.ClusterMessage; 63 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64 - import org.onosproject.store.cluster.messaging.ClusterMessageHandler; 64 +import org.onosproject.store.cluster.messaging.ClusterMessage;
65 - import org.onosproject.store.flow.ReplicaInfoEvent; 65 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
66 - import org.onosproject.store.flow.ReplicaInfoEventListener; 66 +import org.onosproject.store.flow.ReplicaInfoEvent;
67 - import org.onosproject.store.flow.ReplicaInfoService; 67 +import org.onosproject.store.flow.ReplicaInfoEventListener;
68 - import org.onosproject.store.impl.MastershipBasedTimestamp; 68 +import org.onosproject.store.flow.ReplicaInfoService;
69 - import org.onosproject.store.serializers.KryoNamespaces; 69 +import org.onosproject.store.impl.MastershipBasedTimestamp;
70 - import org.onosproject.store.serializers.StoreSerializer; 70 +import org.onosproject.store.serializers.KryoNamespaces;
71 - import org.onosproject.store.serializers.custom.DistributedStoreSerializers; 71 +import org.onosproject.store.serializers.StoreSerializer;
72 - import org.onosproject.store.service.EventuallyConsistentMap; 72 +import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
73 - import org.onosproject.store.service.EventuallyConsistentMapEvent; 73 +import org.onosproject.store.service.EventuallyConsistentMap;
74 - import org.onosproject.store.service.EventuallyConsistentMapListener; 74 +import org.onosproject.store.service.EventuallyConsistentMapEvent;
75 - import org.onosproject.store.service.Serializer; 75 +import org.onosproject.store.service.EventuallyConsistentMapListener;
76 - import org.onosproject.store.service.StorageService; 76 +import org.onosproject.store.service.Serializer;
77 - import org.onosproject.store.service.WallClockTimestamp; 77 +import org.onosproject.store.service.StorageService;
78 - import org.osgi.service.component.ComponentContext; 78 +import org.onosproject.store.service.WallClockTimestamp;
79 - import org.slf4j.Logger; 79 +import org.osgi.service.component.ComponentContext;
80 +import org.slf4j.Logger;
80 81
81 import java.util.Collections; 82 import java.util.Collections;
82 - import java.util.Dictionary; 83 +import java.util.Dictionary;
83 - import java.util.HashSet; 84 +import java.util.HashSet;
84 - import java.util.List; 85 +import java.util.List;
85 - import java.util.Map; 86 +import java.util.Map;
86 - import java.util.Objects; 87 +import java.util.Objects;
87 - import java.util.Set; 88 +import java.util.Set;
88 - import java.util.concurrent.ExecutorService; 89 +import java.util.concurrent.ExecutorService;
89 - import java.util.concurrent.Executors; 90 +import java.util.concurrent.Executors;
90 - import java.util.concurrent.ScheduledExecutorService; 91 +import java.util.concurrent.ScheduledExecutorService;
91 - import java.util.concurrent.ScheduledFuture; 92 +import java.util.concurrent.ScheduledFuture;
92 - import java.util.concurrent.TimeUnit; 93 +import java.util.concurrent.TimeUnit;
93 - import java.util.concurrent.atomic.AtomicInteger; 94 +import java.util.concurrent.atomic.AtomicInteger;
94 - import java.util.concurrent.atomic.AtomicReference; 95 +import java.util.concurrent.atomic.AtomicReference;
95 - import java.util.stream.Collectors; 96 +import java.util.stream.Collectors;
96 97
97 import static com.google.common.base.Strings.isNullOrEmpty; 98 import static com.google.common.base.Strings.isNullOrEmpty;
98 - import static org.onlab.util.Tools.get; 99 +import static org.onlab.util.Tools.get;
99 - import static org.onlab.util.Tools.groupedThreads; 100 +import static org.onlab.util.Tools.groupedThreads;
100 - import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 101 +import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
101 - import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*; 102 +import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
102 - import static org.slf4j.LoggerFactory.getLogger; 103 +import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
104 +import static org.slf4j.LoggerFactory.getLogger;
103 105
104 /** 106 /**
105 * Manages inventory of flow rules using a distributed state management protocol. 107 * Manages inventory of flow rules using a distributed state management protocol.
...@@ -114,6 +116,7 @@ public class DistributedFlowRuleStore ...@@ -114,6 +116,7 @@ public class DistributedFlowRuleStore
114 116
115 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; 117 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
116 private static final boolean DEFAULT_BACKUP_ENABLED = true; 118 private static final boolean DEFAULT_BACKUP_ENABLED = true;
119 + private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
117 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false; 120 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
118 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000; 121 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
119 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; 122 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
...@@ -126,7 +129,7 @@ public class DistributedFlowRuleStore ...@@ -126,7 +129,7 @@ public class DistributedFlowRuleStore
126 129
127 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED, 130 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
128 label = "Indicates whether backups are enabled or not") 131 label = "Indicates whether backups are enabled or not")
129 - private boolean backupEnabled = DEFAULT_BACKUP_ENABLED; 132 + private volatile boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
130 133
131 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS, 134 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
132 label = "Delay in ms between successive backup runs") 135 label = "Delay in ms between successive backup runs")
...@@ -135,6 +138,10 @@ public class DistributedFlowRuleStore ...@@ -135,6 +138,10 @@ public class DistributedFlowRuleStore
135 label = "Indicates whether or not changes in the flow table should be persisted to disk.") 138 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
136 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED; 139 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
137 140
141 + @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
142 + label = "Max number of backup copies for each device")
143 + private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
144 +
138 private InternalFlowTable flowTable = new InternalFlowTable(); 145 private InternalFlowTable flowTable = new InternalFlowTable();
139 146
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -255,6 +262,7 @@ public class DistributedFlowRuleStore ...@@ -255,6 +262,7 @@ public class DistributedFlowRuleStore
255 int newPoolSize; 262 int newPoolSize;
256 boolean newBackupEnabled; 263 boolean newBackupEnabled;
257 int newBackupPeriod; 264 int newBackupPeriod;
265 + int newBackupCount;
258 try { 266 try {
259 String s = get(properties, "msgHandlerPoolSize"); 267 String s = get(properties, "msgHandlerPoolSize");
260 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim()); 268 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
...@@ -265,10 +273,13 @@ public class DistributedFlowRuleStore ...@@ -265,10 +273,13 @@ public class DistributedFlowRuleStore
265 s = get(properties, "backupPeriod"); 273 s = get(properties, "backupPeriod");
266 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim()); 274 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
267 275
276 + s = get(properties, "backupCount");
277 + newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
268 } catch (NumberFormatException | ClassCastException e) { 278 } catch (NumberFormatException | ClassCastException e) {
269 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE; 279 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
270 newBackupEnabled = DEFAULT_BACKUP_ENABLED; 280 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
271 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS; 281 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
282 + newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
272 } 283 }
273 284
274 boolean restartBackupTask = false; 285 boolean restartBackupTask = false;
...@@ -310,6 +321,9 @@ public class DistributedFlowRuleStore ...@@ -310,6 +321,9 @@ public class DistributedFlowRuleStore
310 registerMessageHandlers(messageHandlingExecutor); 321 registerMessageHandlers(messageHandlingExecutor);
311 oldMsgHandler.shutdown(); 322 oldMsgHandler.shutdown();
312 } 323 }
324 + if (backupCount != newBackupCount) {
325 + backupCount = newBackupCount;
326 + }
313 logConfig("Reconfigured"); 327 logConfig("Reconfigured");
314 } 328 }
315 329
...@@ -340,8 +354,8 @@ public class DistributedFlowRuleStore ...@@ -340,8 +354,8 @@ public class DistributedFlowRuleStore
340 } 354 }
341 355
342 private void logConfig(String prefix) { 356 private void logConfig(String prefix) {
343 - log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}", 357 + log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}, backupCount = {}",
344 - prefix, msgHandlerPoolSize, backupEnabled, backupPeriod); 358 + prefix, msgHandlerPoolSize, backupEnabled, backupPeriod, backupCount);
345 } 359 }
346 360
347 // This is not a efficient operation on a distributed sharded 361 // This is not a efficient operation on a distributed sharded
...@@ -652,15 +666,40 @@ public class DistributedFlowRuleStore ...@@ -652,15 +666,40 @@ public class DistributedFlowRuleStore
652 } 666 }
653 } 667 }
654 668
669 + private class BackupOperation {
670 + private final NodeId nodeId;
671 + private final DeviceId deviceId;
672 +
673 + public BackupOperation(NodeId nodeId, DeviceId deviceId) {
674 + this.nodeId = nodeId;
675 + this.deviceId = deviceId;
676 + }
677 +
678 + @Override
679 + public int hashCode() {
680 + return Objects.hash(nodeId, deviceId);
681 + }
682 +
683 + @Override
684 + public boolean equals(Object other) {
685 + if (other != null && other instanceof BackupOperation) {
686 + BackupOperation that = (BackupOperation) other;
687 + return this.nodeId.equals(that.nodeId) &&
688 + this.deviceId.equals(that.deviceId);
689 + } else {
690 + return false;
691 + }
692 + }
693 + }
694 +
655 private class InternalFlowTable implements ReplicaInfoEventListener { 695 private class InternalFlowTable implements ReplicaInfoEventListener {
656 696
657 //TODO replace the Map<V,V> with ExtendedSet 697 //TODO replace the Map<V,V> with ExtendedSet
658 private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> 698 private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
659 flowEntries = Maps.newConcurrentMap(); 699 flowEntries = Maps.newConcurrentMap();
660 700
661 - private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap(); 701 + private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
662 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap(); 702 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
663 - private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
664 703
665 @Override 704 @Override
666 public void event(ReplicaInfoEvent event) { 705 public void event(ReplicaInfoEvent event) {
...@@ -668,41 +707,14 @@ public class DistributedFlowRuleStore ...@@ -668,41 +707,14 @@ public class DistributedFlowRuleStore
668 } 707 }
669 708
670 private void handleEvent(ReplicaInfoEvent event) { 709 private void handleEvent(ReplicaInfoEvent event) {
671 - if (!backupEnabled) { 710 + DeviceId deviceId = event.subject();
711 + if (!backupEnabled || !mastershipService.isLocalMaster(deviceId)) {
672 return; 712 return;
673 } 713 }
674 - if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) { 714 + if (event.type() == MASTER_CHANGED) {
675 - DeviceId deviceId = event.subject(); 715 + lastUpdateTimes.put(deviceId, System.currentTimeMillis());
676 - NodeId master = mastershipService.getMasterFor(deviceId);
677 - if (!Objects.equals(local, master)) {
678 - // ignore since this event is for a device this node does not manage.
679 - return;
680 - }
681 - NodeId newBackupNode = getBackupNode(deviceId);
682 - NodeId currentBackupNode = lastBackupNodes.get(deviceId);
683 - if (Objects.equals(newBackupNode, currentBackupNode)) {
684 - // ignore since backup location hasn't changed.
685 - return;
686 - }
687 - if (currentBackupNode != null && newBackupNode == null) {
688 - // Current backup node is most likely down and no alternate backup node
689 - // has been chosen. Clear current backup location so that we can resume
690 - // backups when either current backup comes online or a different backup node
691 - // is chosen.
692 - log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
693 - + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
694 - lastBackupNodes.remove(deviceId);
695 - lastBackupTimes.remove(deviceId);
696 - return;
697 - // TODO: Pick any available node as backup and ensure hand-off occurs when
698 - // a new master is elected.
699 - }
700 - log.debug("Backup location for {} has changed from {} to {}.",
701 - deviceId, currentBackupNode, newBackupNode);
702 - backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
703 - 0,
704 - TimeUnit.SECONDS);
705 } 716 }
717 + backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
706 } 718 }
707 719
708 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) { 720 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
...@@ -715,7 +727,7 @@ public class DistributedFlowRuleStore ...@@ -715,7 +727,7 @@ public class DistributedFlowRuleStore
715 if (deviceIds.isEmpty()) { 727 if (deviceIds.isEmpty()) {
716 return; 728 return;
717 } 729 }
718 - log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId); 730 + log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
719 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> 731 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
720 deviceFlowEntries = Maps.newConcurrentMap(); 732 deviceFlowEntries = Maps.newConcurrentMap();
721 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id)))); 733 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
...@@ -737,8 +749,7 @@ public class DistributedFlowRuleStore ...@@ -737,8 +749,7 @@ public class DistributedFlowRuleStore
737 } 749 }
738 if (backedupDevices != null) { 750 if (backedupDevices != null) {
739 backedupDevices.forEach(id -> { 751 backedupDevices.forEach(id -> {
740 - lastBackupTimes.put(id, System.currentTimeMillis()); 752 + lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
741 - lastBackupNodes.put(id, nodeId);
742 }); 753 });
743 } 754 }
744 }); 755 });
...@@ -836,10 +847,11 @@ public class DistributedFlowRuleStore ...@@ -836,10 +847,11 @@ public class DistributedFlowRuleStore
836 flowEntries.remove(deviceId); 847 flowEntries.remove(deviceId);
837 } 848 }
838 849
839 - private NodeId getBackupNode(DeviceId deviceId) { 850 + private List<NodeId> getBackupNodes(DeviceId deviceId) {
840 - List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups(); 851 + // The returned backup node list is in the order of preference i.e. next likely master first.
841 - // pick the standby which is most likely to become next master 852 + List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
842 - return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0); 853 + return ImmutableList.copyOf(allPossibleBackupNodes)
854 + .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
843 } 855 }
844 856
845 private void backup() { 857 private void backup() {
...@@ -847,29 +859,17 @@ public class DistributedFlowRuleStore ...@@ -847,29 +859,17 @@ public class DistributedFlowRuleStore
847 return; 859 return;
848 } 860 }
849 try { 861 try {
850 - // determine the set of devices that we need to backup during this run.
851 - Set<DeviceId> devicesToBackup = flowEntries.keySet()
852 - .stream()
853 - .filter(mastershipService::isLocalMaster)
854 - .filter(deviceId -> {
855 - Long lastBackupTime = lastBackupTimes.get(deviceId);
856 - Long lastUpdateTime = lastUpdateTimes.get(deviceId);
857 - NodeId lastBackupNode = lastBackupNodes.get(deviceId);
858 - NodeId newBackupNode = getBackupNode(deviceId);
859 - return lastBackupTime == null
860 - || !Objects.equals(lastBackupNode, newBackupNode)
861 - || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
862 - })
863 - .collect(Collectors.toSet());
864 -
865 // compute a mapping from node to the set of devices whose flow entries it should backup 862 // compute a mapping from node to the set of devices whose flow entries it should backup
866 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap(); 863 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
867 - devicesToBackup.forEach(deviceId -> { 864 + flowEntries.keySet().stream().forEach(deviceId -> {
868 - NodeId backupLocation = getBackupNode(deviceId); 865 + List<NodeId> backupNodes = getBackupNodes(deviceId);
869 - if (backupLocation != null) { 866 + backupNodes.forEach(backupNode -> {
870 - devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet()) 867 + if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
871 - .add(deviceId); 868 + < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
872 - } 869 + devicesToBackupByNode.computeIfAbsent(backupNode,
870 + nodeId -> Sets.newHashSet()).add(deviceId);
871 + }
872 + });
873 }); 873 });
874 // send the device flow entries to their respective backup nodes 874 // send the device flow entries to their respective backup nodes
875 devicesToBackupByNode.forEach(this::sendBackups); 875 devicesToBackupByNode.forEach(this::sendBackups);
......