Yuta HIGUCHI

modified SimpleFlowRuleStore not to use MultiMap

Change-Id: Ie9adb127f1acb4d919951c75513e689fbd80596d
...@@ -8,7 +8,9 @@ import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED; ...@@ -8,7 +8,9 @@ import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
8 8
9 import java.util.ArrayList; 9 import java.util.ArrayList;
10 import java.util.Collections; 10 import java.util.Collections;
11 +import java.util.HashMap;
11 import java.util.List; 12 import java.util.List;
13 +import java.util.Map;
12 import java.util.Set; 14 import java.util.Set;
13 import java.util.concurrent.ExecutionException; 15 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.Future; 16 import java.util.concurrent.Future;
...@@ -54,6 +56,7 @@ import org.onlab.onos.net.provider.ProviderId; ...@@ -54,6 +56,7 @@ import org.onlab.onos.net.provider.ProviderId;
54 import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore; 56 import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
55 57
56 import com.google.common.collect.ImmutableList; 58 import com.google.common.collect.ImmutableList;
59 +import com.google.common.collect.ImmutableMap;
57 import com.google.common.collect.Lists; 60 import com.google.common.collect.Lists;
58 import com.google.common.collect.Sets; 61 import com.google.common.collect.Sets;
59 62
...@@ -166,16 +169,17 @@ public class FlowRuleManagerTest { ...@@ -166,16 +169,17 @@ public class FlowRuleManagerTest {
166 } 169 }
167 170
168 171
172 + // TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
169 //backing store is sensitive to the order of additions/removals 173 //backing store is sensitive to the order of additions/removals
170 - private boolean validateState(FlowEntryState... state) { 174 + private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
175 + Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
171 Iterable<FlowEntry> rules = service.getFlowEntries(DID); 176 Iterable<FlowEntry> rules = service.getFlowEntries(DID);
172 - int i = 0;
173 for (FlowEntry f : rules) { 177 for (FlowEntry f : rules) {
174 - if (f.state() != state[i]) { 178 + assertTrue("Unexpected FlowRule " + f, expectedToCheck.containsKey(f));
175 - return false; 179 + assertEquals("FlowEntry" + f, expectedToCheck.get(f), f.state());
176 - } 180 + expectedToCheck.remove(f);
177 - i++;
178 } 181 }
182 + assertEquals(Collections.emptySet(), expectedToCheck.entrySet());
179 return true; 183 return true;
180 } 184 }
181 185
...@@ -191,8 +195,10 @@ public class FlowRuleManagerTest { ...@@ -191,8 +195,10 @@ public class FlowRuleManagerTest {
191 mgr.applyFlowRules(r1, r2, r3); 195 mgr.applyFlowRules(r1, r2, r3);
192 assertEquals("3 rules should exist", 3, flowCount()); 196 assertEquals("3 rules should exist", 3, flowCount());
193 assertTrue("Entries should be pending add.", 197 assertTrue("Entries should be pending add.",
194 - validateState(FlowEntryState.PENDING_ADD, FlowEntryState.PENDING_ADD, 198 + validateState(ImmutableMap.of(
195 - FlowEntryState.PENDING_ADD)); 199 + r1, FlowEntryState.PENDING_ADD,
200 + r2, FlowEntryState.PENDING_ADD,
201 + r3, FlowEntryState.PENDING_ADD)));
196 } 202 }
197 203
198 @Test 204 @Test
...@@ -213,8 +219,10 @@ public class FlowRuleManagerTest { ...@@ -213,8 +219,10 @@ public class FlowRuleManagerTest {
213 validateEvents(); 219 validateEvents();
214 assertEquals("3 rule should exist", 3, flowCount()); 220 assertEquals("3 rule should exist", 3, flowCount());
215 assertTrue("Entries should be pending remove.", 221 assertTrue("Entries should be pending remove.",
216 - validateState(FlowEntryState.PENDING_REMOVE, FlowEntryState.PENDING_REMOVE, 222 + validateState(ImmutableMap.of(
217 - FlowEntryState.ADDED)); 223 + f1, FlowEntryState.PENDING_REMOVE,
224 + f2, FlowEntryState.PENDING_REMOVE,
225 + f3, FlowEntryState.ADDED)));
218 226
219 mgr.removeFlowRules(f1); 227 mgr.removeFlowRules(f1);
220 assertEquals("3 rule should still exist", 3, flowCount()); 228 assertEquals("3 rule should still exist", 3, flowCount());
...@@ -263,8 +271,10 @@ public class FlowRuleManagerTest { ...@@ -263,8 +271,10 @@ public class FlowRuleManagerTest {
263 providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2)); 271 providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2));
264 272
265 assertTrue("Entries should be added.", 273 assertTrue("Entries should be added.",
266 - validateState(FlowEntryState.ADDED, FlowEntryState.ADDED, 274 + validateState(ImmutableMap.of(
267 - FlowEntryState.PENDING_ADD)); 275 + f1, FlowEntryState.ADDED,
276 + f2, FlowEntryState.ADDED,
277 + f3, FlowEntryState.PENDING_ADD)));
268 278
269 validateEvents(RULE_ADDED, RULE_ADDED); 279 validateEvents(RULE_ADDED, RULE_ADDED);
270 } 280 }
...@@ -336,7 +346,9 @@ public class FlowRuleManagerTest { ...@@ -336,7 +346,9 @@ public class FlowRuleManagerTest {
336 346
337 //only check that we are in pending remove. Events and actual remove state will 347 //only check that we are in pending remove. Events and actual remove state will
338 // be set by flowRemoved call. 348 // be set by flowRemoved call.
339 - validateState(FlowEntryState.PENDING_REMOVE, FlowEntryState.PENDING_REMOVE); 349 + validateState(ImmutableMap.of(
350 + f1, FlowEntryState.PENDING_REMOVE,
351 + f2, FlowEntryState.PENDING_REMOVE));
340 } 352 }
341 353
342 @Test 354 @Test
...@@ -360,7 +372,9 @@ public class FlowRuleManagerTest { ...@@ -360,7 +372,9 @@ public class FlowRuleManagerTest {
360 Lists.newArrayList(fbe1, fbe2)); 372 Lists.newArrayList(fbe1, fbe2));
361 Future<CompletedBatchOperation> future = mgr.applyBatch(fbo); 373 Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
362 assertTrue("Entries in wrong state", 374 assertTrue("Entries in wrong state",
363 - validateState(FlowEntryState.PENDING_REMOVE, FlowEntryState.PENDING_ADD)); 375 + validateState(ImmutableMap.of(
376 + f1, FlowEntryState.PENDING_REMOVE,
377 + f2, FlowEntryState.PENDING_ADD)));
364 CompletedBatchOperation completed = null; 378 CompletedBatchOperation completed = null;
365 try { 379 try {
366 completed = future.get(); 380 completed = future.get();
...@@ -381,9 +395,18 @@ public class FlowRuleManagerTest { ...@@ -381,9 +395,18 @@ public class FlowRuleManagerTest {
381 395
382 mgr.applyFlowRules(f1); 396 mgr.applyFlowRules(f1);
383 397
398 + assertTrue("Entries in wrong state",
399 + validateState(ImmutableMap.of(
400 + f1, FlowEntryState.PENDING_ADD)));
401 +
384 FlowEntry fe1 = new DefaultFlowEntry(f1); 402 FlowEntry fe1 = new DefaultFlowEntry(f1);
385 providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1)); 403 providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
386 404
405 + assertTrue("Entries in wrong state",
406 + validateState(ImmutableMap.of(
407 + f1, FlowEntryState.ADDED)));
408 +
409 +
387 FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry( 410 FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
388 FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1); 411 FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
389 412
...@@ -403,8 +426,9 @@ public class FlowRuleManagerTest { ...@@ -403,8 +426,9 @@ public class FlowRuleManagerTest {
403 * state. 426 * state.
404 */ 427 */
405 assertTrue("Entries in wrong state", 428 assertTrue("Entries in wrong state",
406 - validateState(FlowEntryState.PENDING_REMOVE, 429 + validateState(ImmutableMap.of(
407 - FlowEntryState.PENDING_ADD)); 430 + f2, FlowEntryState.PENDING_REMOVE,
431 + f1, FlowEntryState.PENDING_ADD)));
408 432
409 433
410 } 434 }
......
...@@ -2,9 +2,13 @@ package org.onlab.onos.store.trivial.impl; ...@@ -2,9 +2,13 @@ package org.onlab.onos.store.trivial.impl;
2 2
3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
6 +import static java.util.Collections.unmodifiableCollection;
5 7
6 -import java.util.Collection; 8 +import java.util.HashSet;
7 -import java.util.Collections; 9 +import java.util.Set;
10 +import java.util.concurrent.ConcurrentHashMap;
11 +import java.util.concurrent.ConcurrentMap;
8 12
9 import org.apache.felix.scr.annotations.Activate; 13 import org.apache.felix.scr.annotations.Activate;
10 import org.apache.felix.scr.annotations.Component; 14 import org.apache.felix.scr.annotations.Component;
...@@ -15,18 +19,16 @@ import org.onlab.onos.net.DeviceId; ...@@ -15,18 +19,16 @@ import org.onlab.onos.net.DeviceId;
15 import org.onlab.onos.net.flow.DefaultFlowEntry; 19 import org.onlab.onos.net.flow.DefaultFlowEntry;
16 import org.onlab.onos.net.flow.FlowEntry; 20 import org.onlab.onos.net.flow.FlowEntry;
17 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; 21 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
22 +import org.onlab.onos.net.flow.FlowId;
18 import org.onlab.onos.net.flow.FlowRule; 23 import org.onlab.onos.net.flow.FlowRule;
19 import org.onlab.onos.net.flow.FlowRuleEvent; 24 import org.onlab.onos.net.flow.FlowRuleEvent;
20 import org.onlab.onos.net.flow.FlowRuleEvent.Type; 25 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
21 import org.onlab.onos.net.flow.FlowRuleStore; 26 import org.onlab.onos.net.flow.FlowRuleStore;
22 import org.onlab.onos.net.flow.FlowRuleStoreDelegate; 27 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
23 import org.onlab.onos.store.AbstractStore; 28 import org.onlab.onos.store.AbstractStore;
29 +import org.onlab.util.NewConcurrentHashMap;
24 import org.slf4j.Logger; 30 import org.slf4j.Logger;
25 31
26 -import com.google.common.collect.ArrayListMultimap;
27 -import com.google.common.collect.ImmutableSet;
28 -import com.google.common.collect.Multimap;
29 -
30 /** 32 /**
31 * Manages inventory of flow rules using trivial in-memory implementation. 33 * Manages inventory of flow rules using trivial in-memory implementation.
32 */ 34 */
...@@ -38,12 +40,11 @@ public class SimpleFlowRuleStore ...@@ -38,12 +40,11 @@ public class SimpleFlowRuleStore
38 40
39 private final Logger log = getLogger(getClass()); 41 private final Logger log = getLogger(getClass());
40 42
41 - // store entries as a pile of rules, no info about device tables
42 - private final Multimap<DeviceId, FlowEntry> flowEntries =
43 - ArrayListMultimap.<DeviceId, FlowEntry>create();
44 43
45 - private final Multimap<Short, FlowRule> flowEntriesById = 44 + // inner Map is Device flow table
46 - ArrayListMultimap.<Short, FlowRule>create(); 45 + // Assumption: FlowId cannot have synonyms
46 + private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, FlowEntry>>
47 + flowEntries = new ConcurrentHashMap<>();
47 48
48 @Activate 49 @Activate
49 public void activate() { 50 public void activate() {
...@@ -52,88 +53,130 @@ public class SimpleFlowRuleStore ...@@ -52,88 +53,130 @@ public class SimpleFlowRuleStore
52 53
53 @Deactivate 54 @Deactivate
54 public void deactivate() { 55 public void deactivate() {
56 + flowEntries.clear();
55 log.info("Stopped"); 57 log.info("Stopped");
56 } 58 }
57 59
58 60
59 @Override 61 @Override
60 public int getFlowRuleCount() { 62 public int getFlowRuleCount() {
61 - return flowEntries.size(); 63 + int sum = 0;
64 + for (ConcurrentMap<FlowId, FlowEntry> ft : flowEntries.values()) {
65 + sum += ft.size();
66 + }
67 + return sum;
62 } 68 }
63 69
64 - @Override 70 + private static NewConcurrentHashMap<FlowId, FlowEntry> lazyEmptyFlowTable() {
65 - public synchronized FlowEntry getFlowEntry(FlowRule rule) { 71 + return NewConcurrentHashMap.<FlowId, FlowEntry>ifNeeded();
66 - for (FlowEntry f : flowEntries.get(rule.deviceId())) {
67 - if (f.equals(rule)) {
68 - return f;
69 } 72 }
73 +
74 + /**
75 + * Returns the flow table for specified device.
76 + *
77 + * @param deviceId identifier of the device
78 + * @return Map representing Flow Table of given device.
79 + */
80 + private ConcurrentMap<FlowId, FlowEntry> getFlowTable(DeviceId deviceId) {
81 + return createIfAbsentUnchecked(flowEntries,
82 + deviceId, lazyEmptyFlowTable());
70 } 83 }
71 - return null; 84 +
85 + private FlowEntry getFlowEntry(DeviceId deviceId, FlowId flowId) {
86 + return getFlowTable(deviceId).get(flowId);
72 } 87 }
73 88
74 @Override 89 @Override
75 - public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { 90 + public FlowEntry getFlowEntry(FlowRule rule) {
76 - Collection<FlowEntry> rules = flowEntries.get(deviceId); 91 + return getFlowEntry(rule.deviceId(), rule.id());
77 - if (rules == null) {
78 - return Collections.emptyList();
79 } 92 }
80 - return ImmutableSet.copyOf(rules); 93 +
94 + @Override
95 + public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
96 + return unmodifiableCollection(getFlowTable(deviceId).values());
81 } 97 }
82 98
83 @Override 99 @Override
84 - public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) { 100 + public Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
85 - Collection<FlowRule> rules = flowEntriesById.get(appId.id()); 101 +
86 - if (rules == null) { 102 + Set<FlowRule> rules = new HashSet<>();
87 - return Collections.emptyList(); 103 + for (DeviceId did : flowEntries.keySet()) {
104 + ConcurrentMap<FlowId, FlowEntry> ft = getFlowTable(did);
105 + for (FlowEntry fe : ft.values()) {
106 + if (fe.appId() == appId.id()) {
107 + rules.add(fe);
88 } 108 }
89 - return ImmutableSet.copyOf(rules); 109 + }
110 + }
111 + return rules;
90 } 112 }
91 113
92 @Override 114 @Override
93 - public synchronized void storeFlowRule(FlowRule rule) { 115 + public void storeFlowRule(FlowRule rule) {
116 + final boolean added = storeFlowRuleInternal(rule);
117 + }
118 +
119 + private boolean storeFlowRuleInternal(FlowRule rule) {
94 FlowEntry f = new DefaultFlowEntry(rule); 120 FlowEntry f = new DefaultFlowEntry(rule);
95 - DeviceId did = f.deviceId(); 121 + final DeviceId did = f.deviceId();
96 - if (!flowEntries.containsEntry(did, f)) { 122 + final FlowId fid = f.id();
97 - flowEntries.put(did, f); 123 + FlowEntry existing = getFlowTable(did).putIfAbsent(fid, f);
98 - flowEntriesById.put(rule.appId(), f); 124 + if (existing != null) {
125 + // was already there? ignore
126 + return false;
99 } 127 }
128 + // new flow rule added
129 + // TODO: notify through delegate about remote event?
130 + return true;
100 } 131 }
101 132
102 @Override 133 @Override
103 - public synchronized void deleteFlowRule(FlowRule rule) { 134 + public void deleteFlowRule(FlowRule rule) {
104 - FlowEntry entry = getFlowEntry(rule); 135 +
136 + FlowEntry entry = getFlowEntry(rule.deviceId(), rule.id());
105 if (entry == null) { 137 if (entry == null) {
106 - //log.warn("Cannot find rule {}", rule); 138 + log.warn("Cannot find rule {}", rule);
139 + System.err.println("Cannot find rule " + rule);
107 return; 140 return;
108 } 141 }
142 + synchronized (entry) {
109 entry.setState(FlowEntryState.PENDING_REMOVE); 143 entry.setState(FlowEntryState.PENDING_REMOVE);
110 } 144 }
145 + }
111 146
112 @Override 147 @Override
113 - public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { 148 + public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
114 - DeviceId did = rule.deviceId();
115 -
116 // check if this new rule is an update to an existing entry 149 // check if this new rule is an update to an existing entry
117 - FlowEntry stored = getFlowEntry(rule); 150 + FlowEntry stored = getFlowEntry(rule.deviceId(), rule.id());
118 if (stored != null) { 151 if (stored != null) {
152 + synchronized (stored) {
119 stored.setBytes(rule.bytes()); 153 stored.setBytes(rule.bytes());
120 stored.setLife(rule.life()); 154 stored.setLife(rule.life());
121 stored.setPackets(rule.packets()); 155 stored.setPackets(rule.packets());
122 if (stored.state() == FlowEntryState.PENDING_ADD) { 156 if (stored.state() == FlowEntryState.PENDING_ADD) {
123 stored.setState(FlowEntryState.ADDED); 157 stored.setState(FlowEntryState.ADDED);
158 + // TODO: Do we need to change `rule` state?
124 return new FlowRuleEvent(Type.RULE_ADDED, rule); 159 return new FlowRuleEvent(Type.RULE_ADDED, rule);
125 } 160 }
126 return new FlowRuleEvent(Type.RULE_UPDATED, rule); 161 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
127 } 162 }
163 + }
164 +
165 + // should not reach here
166 + // storeFlowRule was expected to be called
167 + log.error("FlowRule was not found in store {} to update", rule);
128 168
129 //flowEntries.put(did, rule); 169 //flowEntries.put(did, rule);
130 return null; 170 return null;
131 } 171 }
132 172
133 @Override 173 @Override
134 - public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) { 174 + public FlowRuleEvent removeFlowRule(FlowEntry rule) {
135 // This is where one could mark a rule as removed and still keep it in the store. 175 // This is where one could mark a rule as removed and still keep it in the store.
136 - if (flowEntries.remove(rule.deviceId(), rule)) { 176 + final DeviceId did = rule.deviceId();
177 +
178 + ConcurrentMap<FlowId, FlowEntry> ft = getFlowTable(did);
179 + if (ft.remove(rule.id(), rule)) {
137 return new FlowRuleEvent(RULE_REMOVED, rule); 180 return new FlowRuleEvent(RULE_REMOVED, rule);
138 } else { 181 } else {
139 return null; 182 return null;
......