Thomas Vachuska
Committed by Gerrit Code Review

ONOS-4604 Fixed flow objective installation

Removed context from objective toString methods.
Removed duplicate flow objective delegate notifications in the store for next objectives.
Synchronized queueing of forwarding objectives for pending next objectives to avoid notifications race.
Changed logging for better readability.

Change-Id: Ic2bd411a891ea035a2c5513b24dea5fbd48f187d
...@@ -160,7 +160,6 @@ public final class DefaultFilteringObjective implements FilteringObjective { ...@@ -160,7 +160,6 @@ public final class DefaultFilteringObjective implements FilteringObjective {
160 .add("appId", appId()) 160 .add("appId", appId())
161 .add("permanent", permanent()) 161 .add("permanent", permanent())
162 .add("timeout", timeout()) 162 .add("timeout", timeout())
163 - .add("context", context())
164 .toString(); 163 .toString();
165 } 164 }
166 165
......
...@@ -169,7 +169,6 @@ public final class DefaultForwardingObjective implements ForwardingObjective { ...@@ -169,7 +169,6 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
169 .add("appId", appId()) 169 .add("appId", appId())
170 .add("permanent", permanent()) 170 .add("permanent", permanent())
171 .add("timeout", timeout()) 171 .add("timeout", timeout())
172 - .add("context", context())
173 .toString(); 172 .toString();
174 } 173 }
175 174
......
...@@ -138,7 +138,6 @@ public final class DefaultNextObjective implements NextObjective { ...@@ -138,7 +138,6 @@ public final class DefaultNextObjective implements NextObjective {
138 .add("appId", appId()) 138 .add("appId", appId())
139 .add("permanent", permanent()) 139 .add("permanent", permanent())
140 .add("timeout", timeout()) 140 .add("timeout", timeout())
141 - .add("context", context())
142 .toString(); 141 .toString();
143 } 142 }
144 143
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.net.flowobjective.impl; 16 package org.onosproject.net.flowobjective.impl;
17 17
18 import com.google.common.collect.Maps; 18 import com.google.common.collect.Maps;
19 +import com.google.common.collect.Sets;
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -55,21 +56,17 @@ import org.slf4j.Logger; ...@@ -55,21 +56,17 @@ import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory; 56 import org.slf4j.LoggerFactory;
56 57
57 import java.util.ArrayList; 58 import java.util.ArrayList;
58 -import java.util.Collections;
59 import java.util.List; 59 import java.util.List;
60 import java.util.Map; 60 import java.util.Map;
61 import java.util.Objects; 61 import java.util.Objects;
62 import java.util.Set; 62 import java.util.Set;
63 -import java.util.concurrent.ConcurrentHashMap;
64 import java.util.concurrent.ExecutorService; 63 import java.util.concurrent.ExecutorService;
65 64
66 import static com.google.common.base.Preconditions.checkNotNull; 65 import static com.google.common.base.Preconditions.checkNotNull;
67 import static java.util.concurrent.Executors.newFixedThreadPool; 66 import static java.util.concurrent.Executors.newFixedThreadPool;
68 import static org.onlab.util.Tools.groupedThreads; 67 import static org.onlab.util.Tools.groupedThreads;
69 import static org.onosproject.security.AppGuard.checkPermission; 68 import static org.onosproject.security.AppGuard.checkPermission;
70 -import static org.onosproject.security.AppPermission.Type.*; 69 +import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
71 -
72 -
73 70
74 /** 71 /**
75 * Provides implementation of the flow objective programming service. 72 * Provides implementation of the flow objective programming service.
...@@ -124,7 +121,7 @@ public class FlowObjectiveManager implements FlowObjectiveService { ...@@ -124,7 +121,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
124 121
125 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory(); 122 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
126 123
127 - private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap(); 124 + private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
128 125
129 // local store to track which nextObjectives were sent to which device 126 // local store to track which nextObjectives were sent to which device
130 // for debugging purposes 127 // for debugging purposes
...@@ -237,21 +234,33 @@ public class FlowObjectiveManager implements FlowObjectiveService { ...@@ -237,21 +234,33 @@ public class FlowObjectiveManager implements FlowObjectiveService {
237 public void initPolicy(String policy) {} 234 public void initPolicy(String policy) {}
238 235
239 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) { 236 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
240 - if (fwd.nextId() != null && 237 + if (fwd.nextId() == null ||
241 - flowObjectiveStore.getNextGroup(fwd.nextId()) == null) { 238 + flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
242 - log.debug("Queuing forwarding objective {} for nextId {} meant for device {}", 239 + // fast path
243 - fwd.id(), fwd.nextId(), deviceId); 240 + return false;
244 - // TODO: change to computeIfAbsent 241 + }
245 - Set<PendingNext> newset = Collections.newSetFromMap( 242 + boolean queued = false;
246 - new ConcurrentHashMap<PendingNext, Boolean>()); 243 + synchronized (pendingForwards) {
247 - newset.add(new PendingNext(deviceId, fwd)); 244 + // double check the flow objective store, because this block could run
248 - Set<PendingNext> pnext = pendingForwards.putIfAbsent(fwd.nextId(), newset); 245 + // after a notification arrives
249 - if (pnext != null) { 246 + if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
250 - pnext.add(new PendingNext(deviceId, fwd)); 247 + pendingForwards.compute(fwd.nextId(), (id, pending) -> {
248 + PendingNext next = new PendingNext(deviceId, fwd);
249 + if (pending == null) {
250 + return Sets.newHashSet(next);
251 + } else {
252 + pending.add(next);
253 + return pending;
254 + }
255 + });
256 + queued = true;
251 } 257 }
252 - return true;
253 } 258 }
254 - return false; 259 + if (queued) {
260 + log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
261 + fwd.id(), fwd.nextId(), deviceId);
262 + }
263 + return queued;
255 } 264 }
256 265
257 // Retrieves the device pipeline behaviour from the cache. 266 // Retrieves the device pipeline behaviour from the cache.
...@@ -396,7 +405,11 @@ public class FlowObjectiveManager implements FlowObjectiveService { ...@@ -396,7 +405,11 @@ public class FlowObjectiveManager implements FlowObjectiveService {
396 public void notify(ObjectiveEvent event) { 405 public void notify(ObjectiveEvent event) {
397 if (event.type() == Type.ADD) { 406 if (event.type() == Type.ADD) {
398 log.debug("Received notification of obj event {}", event); 407 log.debug("Received notification of obj event {}", event);
399 - Set<PendingNext> pending = pendingForwards.remove(event.subject()); 408 + Set<PendingNext> pending;
409 + synchronized (pendingForwards) {
410 + // needs to be synchronized for queueObjective lookup
411 + pending = pendingForwards.remove(event.subject());
412 + }
400 413
401 if (pending == null) { 414 if (pending == null) {
402 log.debug("Nothing pending for this obj event {}", event); 415 log.debug("Nothing pending for this obj event {}", event);
......
...@@ -120,7 +120,7 @@ class IntentInstaller { ...@@ -120,7 +120,7 @@ class IntentInstaller {
120 // if toInstall was cause of error, then recompile (manage/increment counter, when exceeded -> CORRUPT) 120 // if toInstall was cause of error, then recompile (manage/increment counter, when exceeded -> CORRUPT)
121 if (toInstall.isPresent()) { 121 if (toInstall.isPresent()) {
122 IntentData installData = toInstall.get(); 122 IntentData installData = toInstall.get();
123 - log.warn("Failed installation: {} {} on {}", 123 + log.warn("Failed installation: {} {} due to {}",
124 installData.key(), installData.intent(), ctx.error()); 124 installData.key(), installData.intent(), ctx.error());
125 installData.setState(CORRUPT); 125 installData.setState(CORRUPT);
126 installData.incrementErrorCount(); 126 installData.incrementErrorCount();
...@@ -129,7 +129,7 @@ class IntentInstaller { ...@@ -129,7 +129,7 @@ class IntentInstaller {
129 // if toUninstall was cause of error, then CORRUPT (another job will clean this up) 129 // if toUninstall was cause of error, then CORRUPT (another job will clean this up)
130 if (toUninstall.isPresent()) { 130 if (toUninstall.isPresent()) {
131 IntentData uninstallData = toUninstall.get(); 131 IntentData uninstallData = toUninstall.get();
132 - log.warn("Failed withdrawal: {} {} on {}", 132 + log.warn("Failed withdrawal: {} {} due to {}",
133 uninstallData.key(), uninstallData.intent(), ctx.error()); 133 uninstallData.key(), uninstallData.intent(), ctx.error());
134 uninstallData.setState(CORRUPT); 134 uninstallData.setState(CORRUPT);
135 uninstallData.incrementErrorCount(); 135 uninstallData.incrementErrorCount();
...@@ -355,6 +355,7 @@ class IntentInstaller { ...@@ -355,6 +355,7 @@ class IntentInstaller {
355 private class FlowObjectiveInstallationContext implements ObjectiveContext { 355 private class FlowObjectiveInstallationContext implements ObjectiveContext {
356 Objective objective; 356 Objective objective;
357 DeviceId deviceId; 357 DeviceId deviceId;
358 + ObjectiveError error;
358 359
359 void setObjective(Objective objective, DeviceId deviceId) { 360 void setObjective(Objective objective, DeviceId deviceId) {
360 this.objective = objective; 361 this.objective = objective;
...@@ -368,6 +369,7 @@ class IntentInstaller { ...@@ -368,6 +369,7 @@ class IntentInstaller {
368 369
369 @Override 370 @Override
370 public void onError(Objective objective, ObjectiveError error) { 371 public void onError(Objective objective, ObjectiveError error) {
372 + this.error = error;
371 errorContexts.add(this); 373 errorContexts.add(this);
372 finish(); 374 finish();
373 } 375 }
...@@ -387,7 +389,7 @@ class IntentInstaller { ...@@ -387,7 +389,7 @@ class IntentInstaller {
387 389
388 @Override 390 @Override
389 public String toString() { 391 public String toString() {
390 - return String.format("(%s, %s)", deviceId, objective); 392 + return String.format("(%s on %s for %s)", error, deviceId, objective);
391 } 393 }
392 } 394 }
393 } 395 }
......
...@@ -97,7 +97,6 @@ public class DistributedFlowObjectiveStore ...@@ -97,7 +97,6 @@ public class DistributedFlowObjectiveStore
97 @Override 97 @Override
98 public void putNextGroup(Integer nextId, NextGroup group) { 98 public void putNextGroup(Integer nextId, NextGroup group) {
99 nextGroups.put(nextId, group.data()); 99 nextGroups.put(nextId, group.data());
100 - notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
101 } 100 }
102 101
103 @Override 102 @Override
...@@ -113,7 +112,6 @@ public class DistributedFlowObjectiveStore ...@@ -113,7 +112,6 @@ public class DistributedFlowObjectiveStore
113 public NextGroup removeNextGroup(Integer nextId) { 112 public NextGroup removeNextGroup(Integer nextId) {
114 Versioned<byte[]> versionGroup = nextGroups.remove(nextId); 113 Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
115 if (versionGroup != null) { 114 if (versionGroup != null) {
116 - notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, nextId));
117 return new DefaultNextGroup(versionGroup.value()); 115 return new DefaultNextGroup(versionGroup.value());
118 } 116 }
119 return null; 117 return null;
......