Ray Milkey

Fix ONOS-4976 - buffer underflow from flow backup serialization

Change-Id: I924e53cfd436c38a45d1c9a237553a56acf888ef
...@@ -15,91 +15,95 @@ ...@@ -15,91 +15,95 @@
15 */ 15 */
16 package org.onosproject.store.flow.impl; 16 package org.onosproject.store.flow.impl;
17 17
18 - import com.google.common.collect.ImmutableList;
19 -import com.google.common.collect.ImmutableMap;
20 -import com.google.common.collect.Iterables;
21 -import com.google.common.collect.Maps;
22 -import com.google.common.collect.Sets;
23 -import com.google.common.util.concurrent.Futures;
24 -
25 -import org.apache.felix.scr.annotations.Activate;
26 -import org.apache.felix.scr.annotations.Component;
27 -import org.apache.felix.scr.annotations.Deactivate;
28 -import org.apache.felix.scr.annotations.Modified;
29 -import org.apache.felix.scr.annotations.Property;
30 -import org.apache.felix.scr.annotations.Reference;
31 -import org.apache.felix.scr.annotations.ReferenceCardinality;
32 -import org.apache.felix.scr.annotations.Service;
33 -import org.onlab.util.KryoNamespace;
34 -import org.onlab.util.Tools;
35 -import org.onosproject.cfg.ComponentConfigService;
36 -import org.onosproject.cluster.ClusterService;
37 -import org.onosproject.cluster.NodeId;
38 -import org.onosproject.core.CoreService;
39 -import org.onosproject.core.IdGenerator;
40 -import org.onosproject.mastership.MastershipService;
41 -import org.onosproject.net.DeviceId;
42 -import org.onosproject.net.device.DeviceService;
43 -import org.onosproject.net.flow.CompletedBatchOperation;
44 -import org.onosproject.net.flow.DefaultFlowEntry;
45 -import org.onosproject.net.flow.FlowEntry;
46 -import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47 -import org.onosproject.net.flow.FlowId;
48 -import org.onosproject.net.flow.FlowRule;
49 -import org.onosproject.net.flow.FlowRuleBatchEntry;
50 -import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51 -import org.onosproject.net.flow.FlowRuleBatchEvent;
52 -import org.onosproject.net.flow.FlowRuleBatchOperation;
53 -import org.onosproject.net.flow.FlowRuleBatchRequest;
54 -import org.onosproject.net.flow.FlowRuleEvent;
55 -import org.onosproject.net.flow.FlowRuleEvent.Type;
56 -import org.onosproject.net.flow.FlowRuleService;
57 -import org.onosproject.net.flow.FlowRuleStore;
58 -import org.onosproject.net.flow.FlowRuleStoreDelegate;
59 -import org.onosproject.net.flow.StoredFlowEntry;
60 -import org.onosproject.net.flow.TableStatisticsEntry;
61 -import org.onosproject.persistence.PersistenceService;
62 -import org.onosproject.store.AbstractStore;
63 -import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64 -import org.onosproject.store.cluster.messaging.ClusterMessage;
65 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
66 -import org.onosproject.store.flow.ReplicaInfoEvent;
67 -import org.onosproject.store.flow.ReplicaInfoEventListener;
68 -import org.onosproject.store.flow.ReplicaInfoService;
69 -import org.onosproject.store.impl.MastershipBasedTimestamp;
70 -import org.onosproject.store.serializers.KryoNamespaces;
71 -import org.onosproject.store.service.EventuallyConsistentMap;
72 -import org.onosproject.store.service.EventuallyConsistentMapEvent;
73 -import org.onosproject.store.service.EventuallyConsistentMapListener;
74 -import org.onosproject.store.service.Serializer;
75 -import org.onosproject.store.service.StorageService;
76 -import org.onosproject.store.service.WallClockTimestamp;
77 -import org.osgi.service.component.ComponentContext;
78 -import org.slf4j.Logger;
79 -
80 import java.util.Collections; 18 import java.util.Collections;
81 -import java.util.Dictionary; 19 + import java.util.Dictionary;
82 -import java.util.HashSet; 20 + import java.util.HashSet;
83 -import java.util.List; 21 + import java.util.List;
84 -import java.util.Map; 22 + import java.util.Map;
85 -import java.util.Objects; 23 + import java.util.Objects;
86 -import java.util.Set; 24 + import java.util.Set;
87 -import java.util.concurrent.ExecutorService; 25 + import java.util.concurrent.ExecutorService;
88 -import java.util.concurrent.Executors; 26 + import java.util.concurrent.Executors;
89 -import java.util.concurrent.ScheduledExecutorService; 27 + import java.util.concurrent.ScheduledExecutorService;
90 -import java.util.concurrent.ScheduledFuture; 28 + import java.util.concurrent.ScheduledFuture;
91 -import java.util.concurrent.TimeUnit; 29 + import java.util.concurrent.TimeUnit;
92 -import java.util.concurrent.atomic.AtomicInteger; 30 + import java.util.concurrent.atomic.AtomicInteger;
93 -import java.util.concurrent.atomic.AtomicReference; 31 + import java.util.concurrent.atomic.AtomicReference;
94 -import java.util.stream.Collectors; 32 + import java.util.stream.Collectors;
33 +
34 + import org.apache.felix.scr.annotations.Activate;
35 + import org.apache.felix.scr.annotations.Component;
36 + import org.apache.felix.scr.annotations.Deactivate;
37 + import org.apache.felix.scr.annotations.Modified;
38 + import org.apache.felix.scr.annotations.Property;
39 + import org.apache.felix.scr.annotations.Reference;
40 + import org.apache.felix.scr.annotations.ReferenceCardinality;
41 + import org.apache.felix.scr.annotations.Service;
42 + import org.onlab.util.KryoNamespace;
43 + import org.onlab.util.Tools;
44 + import org.onosproject.cfg.ComponentConfigService;
45 + import org.onosproject.cluster.ClusterService;
46 + import org.onosproject.cluster.NodeId;
47 + import org.onosproject.core.CoreService;
48 + import org.onosproject.core.IdGenerator;
49 + import org.onosproject.mastership.MastershipService;
50 + import org.onosproject.net.DeviceId;
51 + import org.onosproject.net.device.DeviceService;
52 + import org.onosproject.net.flow.CompletedBatchOperation;
53 + import org.onosproject.net.flow.DefaultFlowEntry;
54 + import org.onosproject.net.flow.FlowEntry;
55 + import org.onosproject.net.flow.FlowEntry.FlowEntryState;
56 + import org.onosproject.net.flow.FlowId;
57 + import org.onosproject.net.flow.FlowRule;
58 + import org.onosproject.net.flow.FlowRuleBatchEntry;
59 + import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
60 + import org.onosproject.net.flow.FlowRuleBatchEvent;
61 + import org.onosproject.net.flow.FlowRuleBatchOperation;
62 + import org.onosproject.net.flow.FlowRuleBatchRequest;
63 + import org.onosproject.net.flow.FlowRuleEvent;
64 + import org.onosproject.net.flow.FlowRuleEvent.Type;
65 + import org.onosproject.net.flow.FlowRuleService;
66 + import org.onosproject.net.flow.FlowRuleStore;
67 + import org.onosproject.net.flow.FlowRuleStoreDelegate;
68 + import org.onosproject.net.flow.StoredFlowEntry;
69 + import org.onosproject.net.flow.TableStatisticsEntry;
70 + import org.onosproject.persistence.PersistenceService;
71 + import org.onosproject.store.AbstractStore;
72 + import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
73 + import org.onosproject.store.cluster.messaging.ClusterMessage;
74 + import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
75 + import org.onosproject.store.flow.ReplicaInfoEvent;
76 + import org.onosproject.store.flow.ReplicaInfoEventListener;
77 + import org.onosproject.store.flow.ReplicaInfoService;
78 + import org.onosproject.store.impl.MastershipBasedTimestamp;
79 + import org.onosproject.store.serializers.KryoNamespaces;
80 + import org.onosproject.store.service.EventuallyConsistentMap;
81 + import org.onosproject.store.service.EventuallyConsistentMapEvent;
82 + import org.onosproject.store.service.EventuallyConsistentMapListener;
83 + import org.onosproject.store.service.Serializer;
84 + import org.onosproject.store.service.StorageService;
85 + import org.onosproject.store.service.WallClockTimestamp;
86 + import org.osgi.service.component.ComponentContext;
87 + import org.slf4j.Logger;
88 +
89 + import com.google.common.collect.ImmutableList;
90 + import com.google.common.collect.Iterables;
91 + import com.google.common.collect.Maps;
92 + import com.google.common.collect.Sets;
93 + import com.google.common.util.concurrent.Futures;
95 94
96 import static com.google.common.base.Strings.isNullOrEmpty; 95 import static com.google.common.base.Strings.isNullOrEmpty;
97 -import static org.onlab.util.Tools.get; 96 + import static org.onlab.util.Tools.get;
98 -import static org.onlab.util.Tools.groupedThreads; 97 + import static org.onlab.util.Tools.groupedThreads;
99 -import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 98 + import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
100 -import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED; 99 + import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
101 -import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*; 100 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
102 -import static org.slf4j.LoggerFactory.getLogger; 101 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.FLOW_TABLE_BACKUP;
102 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
103 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
104 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
105 + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
106 + import static org.slf4j.LoggerFactory.getLogger;
103 107
104 /** 108 /**
105 * Manages inventory of flow rules using a distributed state management protocol. 109 * Manages inventory of flow rules using a distributed state management protocol.
...@@ -724,7 +728,7 @@ public class DistributedFlowRuleStore ...@@ -724,7 +728,7 @@ public class DistributedFlowRuleStore
724 log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId); 728 log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
725 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> 729 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
726 deviceFlowEntries = Maps.newConcurrentMap(); 730 deviceFlowEntries = Maps.newConcurrentMap();
727 - deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id)))); 731 + deviceIds.forEach(id -> deviceFlowEntries.put(id, getFlowTableCopy(id)));
728 clusterCommunicator.<Map<DeviceId, 732 clusterCommunicator.<Map<DeviceId,
729 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>, 733 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
730 Set<DeviceId>> 734 Set<DeviceId>>
...@@ -738,8 +742,9 @@ public class DistributedFlowRuleStore ...@@ -738,8 +742,9 @@ public class DistributedFlowRuleStore
738 deviceFlowEntries.keySet() : 742 deviceFlowEntries.keySet() :
739 Sets.difference(deviceFlowEntries.keySet(), backedupDevices); 743 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
740 if (devicesNotBackedup.size() > 0) { 744 if (devicesNotBackedup.size() > 0) {
741 - log.warn("Failed to backup devices: {}. Reason: {}", 745 + log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
742 - devicesNotBackedup, error != null ? error.getMessage() : "none"); 746 + devicesNotBackedup, error != null ? error.getMessage() : "none",
747 + nodeId);
743 } 748 }
744 if (backedupDevices != null) { 749 if (backedupDevices != null) {
745 backedupDevices.forEach(id -> { 750 backedupDevices.forEach(id -> {
...@@ -777,6 +782,32 @@ public class DistributedFlowRuleStore ...@@ -777,6 +782,32 @@ public class DistributedFlowRuleStore
777 } 782 }
778 } 783 }
779 784
785 + private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
786 + Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
787 + if (persistenceEnabled) {
788 + return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
789 + .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
790 + .withName("FlowTable:" + deviceId.toString())
791 + .withSerializer(new Serializer() {
792 + @Override
793 + public <T> byte[] encode(T object) {
794 + return serializer.encode(object);
795 + }
796 +
797 + @Override
798 + public <T> T decode(byte[] bytes) {
799 + return serializer.decode(bytes);
800 + }
801 + })
802 + .build());
803 + } else {
804 + flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
805 + copy.put(k, Maps.newHashMap(v));
806 + });
807 + return copy;
808 + }
809 + }
810 +
780 private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) { 811 private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
781 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap()); 812 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
782 } 813 }
......