Brian O'Connor
Committed by Gerrit Code Review

ONOS-2890 Deactivating apps removes other apps' packet processors

- Reworking store commands to use compute for updates
- Trigger add/remove to devices on map events

Change-Id: I33391435945c1775a9ef0cdc83920fc1cadfd658
...@@ -37,17 +37,15 @@ public interface PacketStore extends Store<PacketEvent, PacketStoreDelegate> { ...@@ -37,17 +37,15 @@ public interface PacketStore extends Store<PacketEvent, PacketStoreDelegate> {
37 * Requests intercept of packets that match the given selector. 37 * Requests intercept of packets that match the given selector.
38 * 38 *
39 * @param request a packet request 39 * @param request a packet request
40 - * @return true if the first time the given selector was requested
41 */ 40 */
42 - boolean requestPackets(PacketRequest request); 41 + void requestPackets(PacketRequest request);
43 42
44 /** 43 /**
45 * Cancels intercept of packets that match the given selector. 44 * Cancels intercept of packets that match the given selector.
46 * 45 *
47 * @param request a packet request 46 * @param request a packet request
48 - * @return true if there is no other application requesting the given selector
49 */ 47 */
50 - boolean cancelPackets(PacketRequest request); 48 + void cancelPackets(PacketRequest request);
51 49
52 /** 50 /**
53 * Obtains all existing requests in the system. 51 * Obtains all existing requests in the system.
......
...@@ -21,4 +21,20 @@ import org.onosproject.store.StoreDelegate; ...@@ -21,4 +21,20 @@ import org.onosproject.store.StoreDelegate;
21 * Packet store delegate abstraction. 21 * Packet store delegate abstraction.
22 */ 22 */
23 public interface PacketStoreDelegate extends StoreDelegate<PacketEvent> { 23 public interface PacketStoreDelegate extends StoreDelegate<PacketEvent> {
24 +
25 + /**
26 + * Requests that packets matching to following request be collected
27 + * from all switches.
28 + *
29 + * @param request packet request
30 + */
31 + void requestPackets(PacketRequest request);
32 +
33 + /**
34 + * Requests that packets matching to following request no longer be
35 + * collected from any switches.
36 + *
37 + * @param request packet request
38 + */
39 + void cancelPackets(PacketRequest request);
24 } 40 }
......
...@@ -15,10 +15,13 @@ ...@@ -15,10 +15,13 @@
15 */ 15 */
16 package org.onosproject.store.trivial; 16 package org.onosproject.store.trivial;
17 17
18 -import com.google.common.collect.ImmutableList; 18 +import com.google.common.collect.ImmutableSet;
19 +import com.google.common.collect.Lists;
20 +import com.google.common.collect.Maps;
19 import com.google.common.collect.Sets; 21 import com.google.common.collect.Sets;
20 import org.apache.felix.scr.annotations.Component; 22 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
24 +import org.onosproject.net.flow.TrafficSelector;
22 import org.onosproject.net.packet.OutboundPacket; 25 import org.onosproject.net.packet.OutboundPacket;
23 import org.onosproject.net.packet.PacketEvent; 26 import org.onosproject.net.packet.PacketEvent;
24 import org.onosproject.net.packet.PacketEvent.Type; 27 import org.onosproject.net.packet.PacketEvent.Type;
...@@ -27,7 +30,9 @@ import org.onosproject.net.packet.PacketStore; ...@@ -27,7 +30,9 @@ import org.onosproject.net.packet.PacketStore;
27 import org.onosproject.net.packet.PacketStoreDelegate; 30 import org.onosproject.net.packet.PacketStoreDelegate;
28 import org.onosproject.store.AbstractStore; 31 import org.onosproject.store.AbstractStore;
29 32
33 +import java.util.HashSet;
30 import java.util.List; 34 import java.util.List;
35 +import java.util.Map;
31 import java.util.Set; 36 import java.util.Set;
32 37
33 /** 38 /**
...@@ -39,7 +44,7 @@ public class SimplePacketStore ...@@ -39,7 +44,7 @@ public class SimplePacketStore
39 extends AbstractStore<PacketEvent, PacketStoreDelegate> 44 extends AbstractStore<PacketEvent, PacketStoreDelegate>
40 implements PacketStore { 45 implements PacketStore {
41 46
42 - private Set<PacketRequest> requests = Sets.newConcurrentHashSet(); 47 + private Map<TrafficSelector, Set<PacketRequest>> requests = Maps.newConcurrentMap();
43 48
44 @Override 49 @Override
45 public void emit(OutboundPacket packet) { 50 public void emit(OutboundPacket packet) {
...@@ -47,18 +52,50 @@ public class SimplePacketStore ...@@ -47,18 +52,50 @@ public class SimplePacketStore
47 } 52 }
48 53
49 @Override 54 @Override
50 - public boolean requestPackets(PacketRequest request) { 55 + public void requestPackets(PacketRequest request) {
51 - return requests.add(request); 56 + requests.compute(request.selector(), (s, existingRequests) -> {
57 + if (existingRequests == null) {
58 + return ImmutableSet.of(request);
59 + } else if (!existingRequests.contains(request)) {
60 + if (delegate != null) {
61 + delegate.requestPackets(request);
62 + }
63 + return ImmutableSet.<PacketRequest>builder()
64 + .addAll(existingRequests)
65 + .add(request)
66 + .build();
67 + } else {
68 + return existingRequests;
69 + }
70 + });
52 } 71 }
53 72
54 @Override 73 @Override
55 - public boolean cancelPackets(PacketRequest request) { 74 + public void cancelPackets(PacketRequest request) {
56 - return requests.remove(request); 75 + requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
76 + if (existingRequests.contains(request)) {
77 + HashSet<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
78 + newRequests.remove(request);
79 + if (newRequests.size() > 0) {
80 + return ImmutableSet.copyOf(newRequests);
81 + } else {
82 + if (delegate != null) {
83 + delegate.cancelPackets(request);
84 + }
85 + return null;
86 + }
87 + } else {
88 + return existingRequests;
89 + }
90 + });
57 } 91 }
58 92
59 @Override 93 @Override
60 public List<PacketRequest> existingRequests() { 94 public List<PacketRequest> existingRequests() {
61 - return ImmutableList.copyOf(requests); 95 + List<PacketRequest> list = Lists.newArrayList();
96 + requests.values().forEach(list::addAll);
97 + list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
98 + return list;
62 } 99 }
63 100
64 } 101 }
......
...@@ -168,9 +168,7 @@ public class PacketManager ...@@ -168,9 +168,7 @@ public class PacketManager
168 checkNotNull(appId, "Application ID cannot be null"); 168 checkNotNull(appId, "Application ID cannot be null");
169 169
170 PacketRequest request = new DefaultPacketRequest(selector, priority, appId); 170 PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
171 - if (store.requestPackets(request)) { 171 + store.requestPackets(request);
172 - pushToAllDevices(request);
173 - }
174 } 172 }
175 173
176 @Override 174 @Override
...@@ -181,9 +179,7 @@ public class PacketManager ...@@ -181,9 +179,7 @@ public class PacketManager
181 checkNotNull(appId, "Application ID cannot be null"); 179 checkNotNull(appId, "Application ID cannot be null");
182 180
183 PacketRequest request = new DefaultPacketRequest(selector, priority, appId); 181 PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
184 - if (store.cancelPackets(request)) { 182 + store.cancelPackets(request);
185 - removeFromAllDevices(request);
186 - }
187 } 183 }
188 184
189 @Override 185 @Override
...@@ -332,6 +328,16 @@ public class PacketManager ...@@ -332,6 +328,16 @@ public class PacketManager
332 public void notify(PacketEvent event) { 328 public void notify(PacketEvent event) {
333 localEmit(event.subject()); 329 localEmit(event.subject());
334 } 330 }
331 +
332 + @Override
333 + public void requestPackets(PacketRequest request) {
334 + pushToAllDevices(request);
335 + }
336 +
337 + @Override
338 + public void cancelPackets(PacketRequest request) {
339 + removeFromAllDevices(request);
340 + }
335 } 341 }
336 342
337 /** 343 /**
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
15 */ 15 */
16 package org.onosproject.store.packet.impl; 16 package org.onosproject.store.packet.impl;
17 17
18 +import com.google.common.collect.ImmutableSet;
18 import com.google.common.collect.Lists; 19 import com.google.common.collect.Lists;
20 +import com.google.common.collect.Sets;
19 import org.apache.felix.scr.annotations.Activate; 21 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 22 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 23 import org.apache.felix.scr.annotations.Deactivate;
...@@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer; ...@@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer;
41 import org.onosproject.store.service.ConsistentMap; 43 import org.onosproject.store.service.ConsistentMap;
42 import org.onosproject.store.service.Serializer; 44 import org.onosproject.store.service.Serializer;
43 import org.onosproject.store.service.StorageService; 45 import org.onosproject.store.service.StorageService;
44 -import org.onosproject.store.service.Versioned;
45 import org.slf4j.Logger; 46 import org.slf4j.Logger;
46 47
47 -import java.util.HashSet;
48 import java.util.List; 48 import java.util.List;
49 import java.util.Set; 49 import java.util.Set;
50 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors; 51 import java.util.concurrent.Executors;
52 +import java.util.concurrent.atomic.AtomicBoolean;
52 53
53 import static org.onlab.util.Tools.groupedThreads; 54 import static org.onlab.util.Tools.groupedThreads;
54 import static org.slf4j.LoggerFactory.getLogger; 55 import static org.slf4j.LoggerFactory.getLogger;
...@@ -117,6 +118,7 @@ public class DistributedPacketStore ...@@ -117,6 +118,7 @@ public class DistributedPacketStore
117 public void deactivate() { 118 public void deactivate() {
118 communicationService.removeSubscriber(PACKET_OUT_SUBJECT); 119 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
119 messageHandlingExecutor.shutdown(); 120 messageHandlingExecutor.shutdown();
121 + tracker = null;
120 log.info("Stopped"); 122 log.info("Stopped");
121 } 123 }
122 124
...@@ -143,13 +145,13 @@ public class DistributedPacketStore ...@@ -143,13 +145,13 @@ public class DistributedPacketStore
143 } 145 }
144 146
145 @Override 147 @Override
146 - public boolean requestPackets(PacketRequest request) { 148 + public void requestPackets(PacketRequest request) {
147 - return tracker.add(request); 149 + tracker.add(request);
148 } 150 }
149 151
150 @Override 152 @Override
151 - public boolean cancelPackets(PacketRequest request) { 153 + public void cancelPackets(PacketRequest request) {
152 - return tracker.remove(request); 154 + tracker.remove(request);
153 } 155 }
154 156
155 @Override 157 @Override
...@@ -169,33 +171,50 @@ public class DistributedPacketStore ...@@ -169,33 +171,50 @@ public class DistributedPacketStore
169 .build(); 171 .build();
170 } 172 }
171 173
172 - public boolean add(PacketRequest request) { 174 + public void add(PacketRequest request) {
173 - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); 175 + AtomicBoolean firstRequest = new AtomicBoolean(false);
174 - if (old != null && old.value().contains(request)) { 176 + requests.compute(request.selector(), (s, existingRequests) -> {
175 - return false; 177 + if (existingRequests == null) {
178 + firstRequest.set(true);
179 + return ImmutableSet.of(request);
180 + } else if (!existingRequests.contains(request)) {
181 + return ImmutableSet.<PacketRequest>builder()
182 + .addAll(existingRequests)
183 + .add(request)
184 + .build();
185 + } else {
186 + return existingRequests;
187 + }
188 + });
189 +
190 + if (firstRequest.get() && delegate != null) {
191 + // The instance that makes the first request will push to all devices
192 + delegate.requestPackets(request);
176 } 193 }
177 - // FIXME: add retry logic using a random delay
178 - Set<PacketRequest> newSet = new HashSet<>();
179 - newSet.add(request);
180 - if (old == null) {
181 - return requests.putIfAbsent(request.selector(), newSet) == null;
182 - }
183 - newSet.addAll(old.value());
184 - return requests.replace(request.selector(), old.version(), newSet);
185 } 194 }
186 195
187 - public boolean remove(PacketRequest request) { 196 + public void remove(PacketRequest request) {
188 - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); 197 + AtomicBoolean removedLast = new AtomicBoolean(false);
189 - if (old == null || !old.value().contains(request)) { 198 + requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
190 - return false; 199 + if (existingRequests.contains(request)) {
191 - } 200 + Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
192 - // FIXME: add retry logic using a random delay 201 + newRequests.remove(request);
193 - Set<PacketRequest> newSet = new HashSet<>(old.value()); 202 + if (newRequests.size() > 0) {
194 - newSet.remove(request); 203 + return ImmutableSet.copyOf(newRequests);
195 - if (newSet.isEmpty()) { 204 + } else {
196 - return requests.remove(request.selector(), old.version()); 205 + removedLast.set(true);
206 + return null;
207 + }
208 + } else {
209 + return existingRequests;
210 + }
211 + });
212 +
213 + if (removedLast.get() && delegate != null) {
214 + // The instance that removes the last request will remove from all devices
215 + delegate.cancelPackets(request);
197 } 216 }
198 - return requests.replace(request.selector(), old.version(), newSet); 217 +
199 } 218 }
200 219
201 public List<PacketRequest> requests() { 220 public List<PacketRequest> requests() {
...@@ -204,6 +223,5 @@ public class DistributedPacketStore ...@@ -204,6 +223,5 @@ public class DistributedPacketStore
204 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); 223 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
205 return list; 224 return list;
206 } 225 }
207 -
208 } 226 }
209 } 227 }
......