SimpleFlowRuleStore to support FlowId collision
Change-Id: I750a733146e9dfd6984cb701bdcc21d0fd61a14d
Showing
1 changed file
with
94 additions
and
47 deletions
... | @@ -3,13 +3,13 @@ package org.onlab.onos.store.trivial.impl; | ... | @@ -3,13 +3,13 @@ package org.onlab.onos.store.trivial.impl; |
3 | import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; | 3 | import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; |
4 | import static org.slf4j.LoggerFactory.getLogger; | 4 | import static org.slf4j.LoggerFactory.getLogger; |
5 | import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; | 5 | import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; |
6 | -import static java.util.Collections.unmodifiableCollection; | 6 | +import java.util.Collections; |
7 | - | ||
8 | -import java.util.Collection; | ||
9 | import java.util.HashSet; | 7 | import java.util.HashSet; |
8 | +import java.util.List; | ||
10 | import java.util.Set; | 9 | import java.util.Set; |
11 | import java.util.concurrent.ConcurrentHashMap; | 10 | import java.util.concurrent.ConcurrentHashMap; |
12 | import java.util.concurrent.ConcurrentMap; | 11 | import java.util.concurrent.ConcurrentMap; |
12 | +import java.util.concurrent.CopyOnWriteArrayList; | ||
13 | 13 | ||
14 | import org.apache.felix.scr.annotations.Activate; | 14 | import org.apache.felix.scr.annotations.Activate; |
15 | import org.apache.felix.scr.annotations.Component; | 15 | import org.apache.felix.scr.annotations.Component; |
... | @@ -31,6 +31,9 @@ import org.onlab.onos.store.AbstractStore; | ... | @@ -31,6 +31,9 @@ import org.onlab.onos.store.AbstractStore; |
31 | import org.onlab.util.NewConcurrentHashMap; | 31 | import org.onlab.util.NewConcurrentHashMap; |
32 | import org.slf4j.Logger; | 32 | import org.slf4j.Logger; |
33 | 33 | ||
34 | +import com.google.common.base.Function; | ||
35 | +import com.google.common.collect.FluentIterable; | ||
36 | + | ||
34 | /** | 37 | /** |
35 | * Manages inventory of flow rules using trivial in-memory implementation. | 38 | * Manages inventory of flow rules using trivial in-memory implementation. |
36 | */ | 39 | */ |
... | @@ -44,8 +47,8 @@ public class SimpleFlowRuleStore | ... | @@ -44,8 +47,8 @@ public class SimpleFlowRuleStore |
44 | 47 | ||
45 | 48 | ||
46 | // inner Map is Device flow table | 49 | // inner Map is Device flow table |
47 | - // Assumption: FlowId cannot have synonyms | 50 | + // inner Map value (FlowId synonym list) must be synchronized before modifying |
48 | - private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, StoredFlowEntry>> | 51 | + private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>> |
49 | flowEntries = new ConcurrentHashMap<>(); | 52 | flowEntries = new ConcurrentHashMap<>(); |
50 | 53 | ||
51 | @Activate | 54 | @Activate |
... | @@ -63,14 +66,16 @@ public class SimpleFlowRuleStore | ... | @@ -63,14 +66,16 @@ public class SimpleFlowRuleStore |
63 | @Override | 66 | @Override |
64 | public int getFlowRuleCount() { | 67 | public int getFlowRuleCount() { |
65 | int sum = 0; | 68 | int sum = 0; |
66 | - for (ConcurrentMap<FlowId, StoredFlowEntry> ft : flowEntries.values()) { | 69 | + for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft : flowEntries.values()) { |
67 | - sum += ft.size(); | 70 | + for (List<StoredFlowEntry> fes : ft.values()) { |
71 | + sum += fes.size(); | ||
72 | + } | ||
68 | } | 73 | } |
69 | return sum; | 74 | return sum; |
70 | } | 75 | } |
71 | 76 | ||
72 | - private static NewConcurrentHashMap<FlowId, StoredFlowEntry> lazyEmptyFlowTable() { | 77 | + private static NewConcurrentHashMap<FlowId, List<StoredFlowEntry>> lazyEmptyFlowTable() { |
73 | - return NewConcurrentHashMap.<FlowId, StoredFlowEntry>ifNeeded(); | 78 | + return NewConcurrentHashMap.<FlowId, List<StoredFlowEntry>>ifNeeded(); |
74 | } | 79 | } |
75 | 80 | ||
76 | /** | 81 | /** |
... | @@ -79,24 +84,53 @@ public class SimpleFlowRuleStore | ... | @@ -79,24 +84,53 @@ public class SimpleFlowRuleStore |
79 | * @param deviceId identifier of the device | 84 | * @param deviceId identifier of the device |
80 | * @return Map representing Flow Table of given device. | 85 | * @return Map representing Flow Table of given device. |
81 | */ | 86 | */ |
82 | - private ConcurrentMap<FlowId, StoredFlowEntry> getFlowTable(DeviceId deviceId) { | 87 | + private ConcurrentMap<FlowId, List<StoredFlowEntry>> getFlowTable(DeviceId deviceId) { |
83 | return createIfAbsentUnchecked(flowEntries, | 88 | return createIfAbsentUnchecked(flowEntries, |
84 | deviceId, lazyEmptyFlowTable()); | 89 | deviceId, lazyEmptyFlowTable()); |
85 | } | 90 | } |
86 | 91 | ||
87 | - private StoredFlowEntry getFlowEntry(DeviceId deviceId, FlowId flowId) { | 92 | + private List<StoredFlowEntry> getFlowEntries(DeviceId deviceId, FlowId flowId) { |
88 | - return getFlowTable(deviceId).get(flowId); | 93 | + final ConcurrentMap<FlowId, List<StoredFlowEntry>> flowTable = getFlowTable(deviceId); |
94 | + List<StoredFlowEntry> r = flowTable.get(flowId); | ||
95 | + if (r == null) { | ||
96 | + final List<StoredFlowEntry> concurrentlyAdded; | ||
97 | + r = new CopyOnWriteArrayList<>(); | ||
98 | + concurrentlyAdded = flowTable.putIfAbsent(flowId, r); | ||
99 | + if (concurrentlyAdded != null) { | ||
100 | + return concurrentlyAdded; | ||
101 | + } | ||
102 | + } | ||
103 | + return r; | ||
104 | + } | ||
105 | + | ||
106 | + private FlowEntry getFlowEntryInternal(DeviceId deviceId, FlowRule rule) { | ||
107 | + List<StoredFlowEntry> fes = getFlowEntries(deviceId, rule.id()); | ||
108 | + for (StoredFlowEntry fe : fes) { | ||
109 | + if (fe.equals(rule)) { | ||
110 | + return fe; | ||
111 | + } | ||
112 | + } | ||
113 | + return null; | ||
89 | } | 114 | } |
90 | 115 | ||
91 | @Override | 116 | @Override |
92 | public FlowEntry getFlowEntry(FlowRule rule) { | 117 | public FlowEntry getFlowEntry(FlowRule rule) { |
93 | - return getFlowEntry(rule.deviceId(), rule.id()); | 118 | + return getFlowEntryInternal(rule.deviceId(), rule); |
94 | } | 119 | } |
95 | 120 | ||
96 | @Override | 121 | @Override |
97 | public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { | 122 | public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { |
98 | - return unmodifiableCollection((Collection<? extends FlowEntry>) | 123 | + // flatten and make iterator unmodifiable |
99 | - getFlowTable(deviceId).values()); | 124 | + return FluentIterable.from(getFlowTable(deviceId).values()) |
125 | + .transformAndConcat( | ||
126 | + new Function<List<StoredFlowEntry>, Iterable<? extends FlowEntry>>() { | ||
127 | + | ||
128 | + @Override | ||
129 | + public Iterable<? extends FlowEntry> apply( | ||
130 | + List<StoredFlowEntry> input) { | ||
131 | + return Collections.unmodifiableList(input); | ||
132 | + } | ||
133 | + }); | ||
100 | } | 134 | } |
101 | 135 | ||
102 | @Override | 136 | @Override |
... | @@ -104,8 +138,7 @@ public class SimpleFlowRuleStore | ... | @@ -104,8 +138,7 @@ public class SimpleFlowRuleStore |
104 | 138 | ||
105 | Set<FlowRule> rules = new HashSet<>(); | 139 | Set<FlowRule> rules = new HashSet<>(); |
106 | for (DeviceId did : flowEntries.keySet()) { | 140 | for (DeviceId did : flowEntries.keySet()) { |
107 | - ConcurrentMap<FlowId, StoredFlowEntry> ft = getFlowTable(did); | 141 | + for (FlowEntry fe : getFlowEntries(did)) { |
108 | - for (FlowEntry fe : ft.values()) { | ||
109 | if (fe.appId() == appId.id()) { | 142 | if (fe.appId() == appId.id()) { |
110 | rules.add(fe); | 143 | rules.add(fe); |
111 | } | 144 | } |
... | @@ -123,44 +156,57 @@ public class SimpleFlowRuleStore | ... | @@ -123,44 +156,57 @@ public class SimpleFlowRuleStore |
123 | StoredFlowEntry f = new DefaultFlowEntry(rule); | 156 | StoredFlowEntry f = new DefaultFlowEntry(rule); |
124 | final DeviceId did = f.deviceId(); | 157 | final DeviceId did = f.deviceId(); |
125 | final FlowId fid = f.id(); | 158 | final FlowId fid = f.id(); |
126 | - FlowEntry existing = getFlowTable(did).putIfAbsent(fid, f); | 159 | + List<StoredFlowEntry> existing = getFlowEntries(did, fid); |
127 | - if (existing != null) { | 160 | + synchronized (existing) { |
128 | - // was already there? ignore | 161 | + for (StoredFlowEntry fe : existing) { |
129 | - return false; | 162 | + if (fe.equals(rule)) { |
163 | + // was already there? ignore | ||
164 | + return false; | ||
165 | + } | ||
166 | + } | ||
167 | + // new flow rule added | ||
168 | + existing.add(f); | ||
169 | + // TODO: notify through delegate about remote event? | ||
170 | + return true; | ||
130 | } | 171 | } |
131 | - // new flow rule added | ||
132 | - // TODO: notify through delegate about remote event? | ||
133 | - return true; | ||
134 | } | 172 | } |
135 | 173 | ||
136 | @Override | 174 | @Override |
137 | public void deleteFlowRule(FlowRule rule) { | 175 | public void deleteFlowRule(FlowRule rule) { |
138 | 176 | ||
139 | - StoredFlowEntry entry = getFlowEntry(rule.deviceId(), rule.id()); | 177 | + List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); |
140 | - if (entry == null) { | 178 | + synchronized (entries) { |
141 | - //log.warn("Cannot find rule {}", rule); | 179 | + for (StoredFlowEntry entry : entries) { |
142 | - return; | 180 | + if (entry.equals(rule)) { |
143 | - } | 181 | + synchronized (entry) { |
144 | - synchronized (entry) { | 182 | + entry.setState(FlowEntryState.PENDING_REMOVE); |
145 | - entry.setState(FlowEntryState.PENDING_REMOVE); | 183 | + return; |
184 | + } | ||
185 | + } | ||
186 | + } | ||
146 | } | 187 | } |
188 | + //log.warn("Cannot find rule {}", rule); | ||
147 | } | 189 | } |
148 | 190 | ||
149 | @Override | 191 | @Override |
150 | public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { | 192 | public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { |
151 | // check if this new rule is an update to an existing entry | 193 | // check if this new rule is an update to an existing entry |
152 | - StoredFlowEntry stored = getFlowEntry(rule.deviceId(), rule.id()); | 194 | + List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); |
153 | - if (stored != null) { | 195 | + synchronized (entries) { |
154 | - synchronized (stored) { | 196 | + for (StoredFlowEntry stored : entries) { |
155 | - stored.setBytes(rule.bytes()); | 197 | + if (stored.equals(rule)) { |
156 | - stored.setLife(rule.life()); | 198 | + synchronized (stored) { |
157 | - stored.setPackets(rule.packets()); | 199 | + stored.setBytes(rule.bytes()); |
158 | - if (stored.state() == FlowEntryState.PENDING_ADD) { | 200 | + stored.setLife(rule.life()); |
159 | - stored.setState(FlowEntryState.ADDED); | 201 | + stored.setPackets(rule.packets()); |
160 | - // TODO: Do we need to change `rule` state? | 202 | + if (stored.state() == FlowEntryState.PENDING_ADD) { |
161 | - return new FlowRuleEvent(Type.RULE_ADDED, rule); | 203 | + stored.setState(FlowEntryState.ADDED); |
204 | + // TODO: Do we need to change `rule` state? | ||
205 | + return new FlowRuleEvent(Type.RULE_ADDED, rule); | ||
206 | + } | ||
207 | + return new FlowRuleEvent(Type.RULE_UPDATED, rule); | ||
208 | + } | ||
162 | } | 209 | } |
163 | - return new FlowRuleEvent(Type.RULE_UPDATED, rule); | ||
164 | } | 210 | } |
165 | } | 211 | } |
166 | 212 | ||
... | @@ -177,11 +223,12 @@ public class SimpleFlowRuleStore | ... | @@ -177,11 +223,12 @@ public class SimpleFlowRuleStore |
177 | // This is where one could mark a rule as removed and still keep it in the store. | 223 | // This is where one could mark a rule as removed and still keep it in the store. |
178 | final DeviceId did = rule.deviceId(); | 224 | final DeviceId did = rule.deviceId(); |
179 | 225 | ||
180 | - ConcurrentMap<FlowId, StoredFlowEntry> ft = getFlowTable(did); | 226 | + List<StoredFlowEntry> entries = getFlowEntries(did, rule.id()); |
181 | - if (ft.remove(rule.id(), rule)) { | 227 | + synchronized (entries) { |
182 | - return new FlowRuleEvent(RULE_REMOVED, rule); | 228 | + if (entries.remove(rule)) { |
183 | - } else { | 229 | + return new FlowRuleEvent(RULE_REMOVED, rule); |
184 | - return null; | 230 | + } |
185 | } | 231 | } |
232 | + return null; | ||
186 | } | 233 | } |
187 | } | 234 | } | ... | ... |
-
Please register or login to post a comment