Brian O'Connor
Committed by Gerrit Code Review

Adding IntentCleanup as a component.

Listens for CORRUPT intent events and periodically polls for CORRUPT intents.

Change-Id: I29d8dbe14b46522815dc13e969f259f68b690855
...@@ -32,12 +32,22 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> { ...@@ -32,12 +32,22 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
32 long getIntentCount(); 32 long getIntentCount();
33 33
34 /** 34 /**
35 - * Returns a collection of all intents in the store. 35 + * Returns an iterable of all intents in the store.
36 * 36 *
37 - * @return iterable collection of all intents 37 + * @return iterable of all intents
38 */ 38 */
39 Iterable<Intent> getIntents(); 39 Iterable<Intent> getIntents();
40 40
41 +
42 + /**
43 + * Returns an iterable of all intent data objects in the store.
44 + *
45 + * @param localOnly should only intents for which this instance is master
46 + * should be returned
47 + * @return iterable of all intent data objects
48 + */
49 + Iterable<IntentData> getIntentData(boolean localOnly);
50 +
41 /** 51 /**
42 * Returns the state of the specified intent. 52 * Returns the state of the specified intent.
43 * 53 *
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.intent.impl;
17 +
18 +import org.apache.felix.scr.annotations.Activate;
19 +import org.apache.felix.scr.annotations.Component;
20 +import org.apache.felix.scr.annotations.Deactivate;
21 +import org.apache.felix.scr.annotations.Modified;
22 +import org.apache.felix.scr.annotations.Property;
23 +import org.apache.felix.scr.annotations.Reference;
24 +import org.apache.felix.scr.annotations.ReferenceCardinality;
25 +import org.onosproject.cfg.ComponentConfigService;
26 +import org.onosproject.net.intent.IntentData;
27 +import org.onosproject.net.intent.IntentEvent;
28 +import org.onosproject.net.intent.IntentListener;
29 +import org.onosproject.net.intent.IntentService;
30 +import org.onosproject.net.intent.IntentStore;
31 +import org.osgi.service.component.ComponentContext;
32 +import org.slf4j.Logger;
33 +
34 +import java.util.Dictionary;
35 +import java.util.Properties;
36 +import java.util.Timer;
37 +import java.util.TimerTask;
38 +import java.util.concurrent.ExecutorService;
39 +
40 +import static com.google.common.base.Strings.isNullOrEmpty;
41 +import static java.util.concurrent.Executors.newSingleThreadExecutor;
42 +import static org.onlab.util.Tools.get;
43 +import static org.onlab.util.Tools.groupedThreads;
44 +import static org.onosproject.net.intent.IntentState.CORRUPT;
45 +import static org.slf4j.LoggerFactory.getLogger;
46 +
47 +/**
48 + * FIXME Class to cleanup Intents in CORRUPT state.
49 + * FIXME move this to its own file eventually (but need executor for now)
50 + */
51 +@Component(immediate = true)
52 +public class IntentCleanup implements Runnable, IntentListener {
53 +
54 + private static final Logger log = getLogger(IntentManager.class);
55 +
56 + private static final int DEFAULT_PERIOD = 5; //seconds
57 +
58 + @Property(name = "period", intValue = DEFAULT_PERIOD,
59 + label = "Frequency in ms between cleanup runs")
60 + protected int period = DEFAULT_PERIOD;
61 +
62 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 + protected IntentService service;
64 +
65 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 + protected IntentStore store;
67 +
68 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 + protected ComponentConfigService cfgService;
70 +
71 + private ExecutorService executor;
72 + private Timer timer;
73 + private TimerTask timerTask;
74 +
75 + @Activate
76 + public void activate() {
77 + cfgService.registerProperties(getClass());
78 + executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup"));
79 + timer = new Timer("onos-intent-cleanup-timer");
80 + service.addListener(this);
81 + adjustRate();
82 + log.info("Started");
83 + }
84 +
85 + @Deactivate
86 + public void deactivate() {
87 + cfgService.unregisterProperties(getClass(), false);
88 + service.removeListener(this);
89 + timer.cancel();
90 + timerTask = null;
91 + executor.shutdown();
92 + log.info("Stopped");
93 + }
94 +
95 + @Modified
96 + public void modified(ComponentContext context) {
97 + Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
98 +
99 + int newPeriod;
100 + try {
101 + String s = get(properties, "period");
102 + newPeriod = isNullOrEmpty(s) ? period : Integer.parseInt(s.trim());
103 + } catch (NumberFormatException e) {
104 + log.warn(e.getMessage());
105 + newPeriod = period;
106 + }
107 +
108 + // Any change in the following parameters implies hard restart
109 + if (newPeriod != period) {
110 + period = newPeriod;
111 + adjustRate();
112 + }
113 +
114 + log.info("Settings: period={}", period);
115 + }
116 +
117 + private void adjustRate() {
118 + if (timerTask != null) {
119 + timerTask.cancel();
120 + }
121 +
122 + timerTask = new TimerTask() {
123 + @Override
124 + public void run() {
125 + executor.submit(IntentCleanup.this);
126 + }
127 + };
128 +
129 + long periodMs = period * 1000; //convert to ms
130 + timer.scheduleAtFixedRate(timerTask, periodMs, periodMs);
131 + }
132 +
133 +
134 + @Override
135 + public void run() {
136 + try {
137 + cleanup();
138 + } catch (Exception e) {
139 + log.warn("Caught exception during Intent cleanup", e);
140 + }
141 + }
142 +
143 + /**
144 + * Iterate through CORRUPT intents and re-submit/withdraw.
145 + *
146 + * FIXME we want to eventually count number of retries per intent and give up
147 + * FIXME we probably also want to look at intents that have been stuck
148 + * in *_REQ or *ING for "too long".
149 + */
150 + private void cleanup() {
151 + int count = 0;
152 + for (IntentData intentData : store.getIntentData(true)) {
153 + if (intentData.state() == CORRUPT) {
154 + switch (intentData.request()) {
155 + case INSTALL_REQ:
156 + service.submit(intentData.intent());
157 + count++;
158 + break;
159 + case WITHDRAW_REQ:
160 + service.withdraw(intentData.intent());
161 + count++;
162 + break;
163 + default:
164 + //TODO this is an error
165 + break;
166 + }
167 + }
168 + }
169 + log.debug("Intent cleanup ran and resubmitted {} intents", count);
170 + }
171 +
172 + @Override
173 + public void event(IntentEvent event) {
174 + if (event.type() == IntentEvent.Type.CORRUPT) {
175 + // FIXME drop this if we exceed retry threshold
176 + // just run the whole cleanup script for now
177 + executor.submit(this);
178 + }
179 + }
180 +}
...@@ -132,6 +132,7 @@ public class IntentManager ...@@ -132,6 +132,7 @@ public class IntentManager
132 trackerService.unsetDelegate(topoDelegate); 132 trackerService.unsetDelegate(topoDelegate);
133 eventDispatcher.removeSink(IntentEvent.class); 133 eventDispatcher.removeSink(IntentEvent.class);
134 batchExecutor.shutdown(); 134 batchExecutor.shutdown();
135 + workerExecutor.shutdown();
135 Intent.unbindIdGenerator(idGenerator); 136 Intent.unbindIdGenerator(idGenerator);
136 log.info("Stopped"); 137 log.info("Stopped");
137 } 138 }
......
...@@ -15,17 +15,11 @@ ...@@ -15,17 +15,11 @@
15 */ 15 */
16 package org.onosproject.net.intent.impl; 16 package org.onosproject.net.intent.impl;
17 17
18 -import java.util.Arrays; 18 +import com.google.common.collect.HashMultimap;
19 -import java.util.Collection; 19 +import com.google.common.collect.Lists;
20 -import java.util.Collections; 20 +import com.google.common.collect.Maps;
21 -import java.util.List; 21 +import com.google.common.collect.Multimap;
22 -import java.util.Map; 22 +import com.google.common.collect.Sets;
23 -import java.util.Set;
24 -import java.util.concurrent.CountDownLatch;
25 -import java.util.concurrent.TimeUnit;
26 -import java.util.stream.Collectors;
27 -import java.util.stream.IntStream;
28 -
29 import org.hamcrest.Description; 23 import org.hamcrest.Description;
30 import org.hamcrest.TypeSafeMatcher; 24 import org.hamcrest.TypeSafeMatcher;
31 import org.junit.After; 25 import org.junit.After;
...@@ -33,6 +27,7 @@ import org.junit.Before; ...@@ -33,6 +27,7 @@ import org.junit.Before;
33 import org.junit.Ignore; 27 import org.junit.Ignore;
34 import org.junit.Test; 28 import org.junit.Test;
35 import org.onosproject.TestApplicationId; 29 import org.onosproject.TestApplicationId;
30 +import org.onosproject.cfg.ComponentConfigAdapter;
36 import org.onosproject.core.ApplicationId; 31 import org.onosproject.core.ApplicationId;
37 import org.onosproject.core.impl.TestCoreManager; 32 import org.onosproject.core.impl.TestCoreManager;
38 import org.onosproject.event.impl.TestEventDispatcher; 33 import org.onosproject.event.impl.TestEventDispatcher;
...@@ -51,18 +46,21 @@ import org.onosproject.net.intent.Key; ...@@ -51,18 +46,21 @@ import org.onosproject.net.intent.Key;
51 import org.onosproject.net.resource.LinkResourceAllocations; 46 import org.onosproject.net.resource.LinkResourceAllocations;
52 import org.onosproject.store.trivial.impl.SimpleIntentStore; 47 import org.onosproject.store.trivial.impl.SimpleIntentStore;
53 48
54 -import com.google.common.collect.HashMultimap; 49 +import java.util.Arrays;
55 -import com.google.common.collect.Lists; 50 +import java.util.Collection;
56 -import com.google.common.collect.Maps; 51 +import java.util.Collections;
57 -import com.google.common.collect.Multimap; 52 +import java.util.List;
58 -import com.google.common.collect.Sets; 53 +import java.util.Map;
54 +import java.util.Set;
55 +import java.util.concurrent.CountDownLatch;
56 +import java.util.concurrent.TimeUnit;
57 +import java.util.stream.Collectors;
58 +import java.util.stream.IntStream;
59 59
60 import static org.hamcrest.MatcherAssert.assertThat; 60 import static org.hamcrest.MatcherAssert.assertThat;
61 import static org.hamcrest.Matchers.hasSize; 61 import static org.hamcrest.Matchers.hasSize;
62 import static org.hamcrest.Matchers.is; 62 import static org.hamcrest.Matchers.is;
63 -import static org.junit.Assert.assertEquals; 63 +import static org.junit.Assert.*;
64 -import static org.junit.Assert.assertNotNull;
65 -import static org.junit.Assert.assertTrue;
66 import static org.onlab.junit.TestTools.assertAfter; 64 import static org.onlab.junit.TestTools.assertAfter;
67 import static org.onlab.util.Tools.delay; 65 import static org.onlab.util.Tools.delay;
68 import static org.onosproject.net.intent.IntentState.*; 66 import static org.onosproject.net.intent.IntentState.*;
...@@ -460,14 +458,13 @@ public class IntentManagerTest { ...@@ -460,14 +458,13 @@ public class IntentManagerTest {
460 * Tests an intent with no installer. 458 * Tests an intent with no installer.
461 */ 459 */
462 @Test 460 @Test
463 - @Ignore //FIXME corrupt or failed?
464 public void intentWithoutInstaller() { 461 public void intentWithoutInstaller() {
465 MockIntent intent = new MockIntent(MockIntent.nextId()); 462 MockIntent intent = new MockIntent(MockIntent.nextId());
466 listener.setLatch(1, Type.INSTALL_REQ); 463 listener.setLatch(1, Type.INSTALL_REQ);
467 - listener.setLatch(1, Type.FAILED); 464 + listener.setLatch(1, Type.CORRUPT);
468 service.submit(intent); 465 service.submit(intent);
469 listener.await(Type.INSTALL_REQ); 466 listener.await(Type.INSTALL_REQ);
470 - listener.await(Type.FAILED); 467 + listener.await(Type.CORRUPT);
471 verifyState(); 468 verifyState();
472 } 469 }
473 470
...@@ -549,16 +546,63 @@ public class IntentManagerTest { ...@@ -549,16 +546,63 @@ public class IntentManagerTest {
549 } 546 }
550 547
551 /** 548 /**
549 + * Test failure to install an intent, then succeed on retry via IntentCleanup.
550 + */
551 + @Test
552 + public void testCorruptCleanup() {
553 + IntentCleanup cleanup = new IntentCleanup();
554 + cleanup.service = manager;
555 + cleanup.store = manager.store;
556 + cleanup.cfgService = new ComponentConfigAdapter();
557 +
558 + try {
559 + cleanup.activate();
560 +
561 + final TestIntentCompilerMultipleFlows errorCompiler = new TestIntentCompilerMultipleFlows();
562 + extensionService.registerCompiler(MockIntent.class, errorCompiler);
563 + List<Intent> intents;
564 +
565 + flowRuleService.setFuture(false);
566 +
567 + intents = Lists.newArrayList(service.getIntents());
568 + assertThat(intents, hasSize(0));
569 +
570 + final MockIntent intent1 = new MockIntent(MockIntent.nextId());
571 +
572 + listener.setLatch(1, Type.INSTALL_REQ);
573 + listener.setLatch(1, Type.CORRUPT);
574 + listener.setLatch(1, Type.INSTALLED);
575 +
576 + service.submit(intent1);
577 +
578 + listener.await(Type.INSTALL_REQ);
579 + listener.await(Type.CORRUPT);
580 +
581 + flowRuleService.setFuture(true);
582 +
583 + listener.await(Type.INSTALLED);
584 +
585 + assertThat(listener.getCounts(Type.CORRUPT), is(1));
586 + assertThat(listener.getCounts(Type.INSTALLED), is(1));
587 + assertEquals(INSTALLED, manager.getIntentState(intent1.key()));
588 + assertThat(flowRuleService.getFlowRuleCount(), is(5));
589 + } finally {
590 + cleanup.deactivate();
591 + }
592 + }
593 +
594 + /**
552 * Tests that an intent that fails installation results in no flows remaining. 595 * Tests that an intent that fails installation results in no flows remaining.
553 */ 596 */
554 @Test 597 @Test
555 - @Ignore("Cleanup state is not yet implemented in the intent manager") 598 + @Ignore("MockFlowRule numbering issue") //test works if run independently
556 public void testFlowRemovalInstallError() { 599 public void testFlowRemovalInstallError() {
557 final TestIntentCompilerMultipleFlows errorCompiler = new TestIntentCompilerMultipleFlows(); 600 final TestIntentCompilerMultipleFlows errorCompiler = new TestIntentCompilerMultipleFlows();
558 extensionService.registerCompiler(MockIntent.class, errorCompiler); 601 extensionService.registerCompiler(MockIntent.class, errorCompiler);
559 List<Intent> intents; 602 List<Intent> intents;
560 603
561 flowRuleService.setFuture(true); 604 flowRuleService.setFuture(true);
605 + //FIXME relying on "3" is brittle
562 flowRuleService.setErrorFlow(3); 606 flowRuleService.setErrorFlow(3);
563 607
564 intents = Lists.newArrayList(service.getIntents()); 608 intents = Lists.newArrayList(service.getIntents());
...@@ -567,13 +611,14 @@ public class IntentManagerTest { ...@@ -567,13 +611,14 @@ public class IntentManagerTest {
567 final MockIntent intent1 = new MockIntent(MockIntent.nextId()); 611 final MockIntent intent1 = new MockIntent(MockIntent.nextId());
568 612
569 listener.setLatch(1, Type.INSTALL_REQ); 613 listener.setLatch(1, Type.INSTALL_REQ);
570 - listener.setLatch(1, Type.FAILED); 614 + listener.setLatch(1, Type.CORRUPT);
571 615
572 service.submit(intent1); 616 service.submit(intent1);
573 listener.await(Type.INSTALL_REQ); 617 listener.await(Type.INSTALL_REQ);
574 - listener.await(Type.FAILED); 618 + listener.await(Type.CORRUPT);
575 619
576 - assertThat(listener.getCounts(Type.FAILED), is(1)); 620 + assertThat(listener.getCounts(Type.CORRUPT), is(1));
577 - assertThat(flowRuleService.getFlowRuleCount(), is(0)); 621 + // in this test, there will still be flows abandoned on the data plane
622 + //assertThat(flowRuleService.getFlowRuleCount(), is(0));
578 } 623 }
579 } 624 }
......
...@@ -15,9 +15,7 @@ ...@@ -15,9 +15,7 @@
15 */ 15 */
16 package org.onosproject.net.intent.impl; 16 package org.onosproject.net.intent.impl;
17 17
18 -import java.util.Set; 18 +import com.google.common.collect.Sets;
19 -import java.util.stream.Collectors;
20 -
21 import org.onosproject.core.ApplicationId; 19 import org.onosproject.core.ApplicationId;
22 import org.onosproject.net.DeviceId; 20 import org.onosproject.net.DeviceId;
23 import org.onosproject.net.flow.DefaultFlowEntry; 21 import org.onosproject.net.flow.DefaultFlowEntry;
...@@ -26,7 +24,11 @@ import org.onosproject.net.flow.FlowRule; ...@@ -26,7 +24,11 @@ import org.onosproject.net.flow.FlowRule;
26 import org.onosproject.net.flow.FlowRuleOperations; 24 import org.onosproject.net.flow.FlowRuleOperations;
27 import org.onosproject.net.flow.FlowRuleServiceAdapter; 25 import org.onosproject.net.flow.FlowRuleServiceAdapter;
28 26
29 -import com.google.common.collect.Sets; 27 +import java.util.Set;
28 +import java.util.concurrent.atomic.AtomicBoolean;
29 +import java.util.stream.Collectors;
30 +
31 +import static org.onosproject.net.flow.FlowRuleOperation.Type.REMOVE;
30 32
31 33
32 public class MockFlowRuleService extends FlowRuleServiceAdapter { 34 public class MockFlowRuleService extends FlowRuleServiceAdapter {
...@@ -45,9 +47,10 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter { ...@@ -45,9 +47,10 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter {
45 47
46 @Override 48 @Override
47 public void apply(FlowRuleOperations ops) { 49 public void apply(FlowRuleOperations ops) {
50 + AtomicBoolean thisSuccess = new AtomicBoolean(success);
48 ops.stages().forEach(stage -> stage.forEach(flow -> { 51 ops.stages().forEach(stage -> stage.forEach(flow -> {
49 if (errorFlow == flow.rule().id().value()) { 52 if (errorFlow == flow.rule().id().value()) {
50 - success = false; 53 + thisSuccess.set(false);
51 } else { 54 } else {
52 switch (flow.type()) { 55 switch (flow.type()) {
53 case ADD: 56 case ADD:
...@@ -62,7 +65,7 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter { ...@@ -62,7 +65,7 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter {
62 } 65 }
63 } 66 }
64 })); 67 }));
65 - if (success) { 68 + if (thisSuccess.get()) {
66 ops.callback().onSuccess(ops); 69 ops.callback().onSuccess(ops);
67 } else { 70 } else {
68 ops.callback().onError(ops); 71 ops.callback().onError(ops);
......
...@@ -132,6 +132,16 @@ public class GossipIntentStore ...@@ -132,6 +132,16 @@ public class GossipIntentStore
132 } 132 }
133 133
134 @Override 134 @Override
135 + public Iterable<IntentData> getIntentData(boolean localOnly) {
136 + if (localOnly) {
137 + return currentMap.values().stream()
138 + .filter(data -> isMaster(data.key()))
139 + .collect(Collectors.toList());
140 + }
141 + return currentMap.values();
142 + }
143 +
144 + @Override
135 public IntentState getIntentState(Key intentKey) { 145 public IntentState getIntentState(Key intentKey) {
136 IntentData data = currentMap.get(intentKey); 146 IntentData data = currentMap.get(intentKey);
137 if (data != null) { 147 if (data != null) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.trivial.impl; 16 package org.onosproject.store.trivial.impl;
17 17
18 +import com.google.common.collect.Lists;
18 import com.google.common.collect.Maps; 19 import com.google.common.collect.Maps;
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
...@@ -75,6 +76,16 @@ public class SimpleIntentStore ...@@ -75,6 +76,16 @@ public class SimpleIntentStore
75 } 76 }
76 77
77 @Override 78 @Override
79 + public Iterable<IntentData> getIntentData(boolean localOnly) {
80 + if (localOnly) {
81 + return current.values().stream()
82 + .filter(data -> isMaster(data.key()))
83 + .collect(Collectors.toList());
84 + }
85 + return Lists.newArrayList(current.values());
86 + }
87 +
88 + @Override
78 public IntentState getIntentState(Key intentKey) { 89 public IntentState getIntentState(Key intentKey) {
79 IntentData data = current.get(intentKey); 90 IntentData data = current.get(intentKey);
80 return (data != null) ? data.state() : null; 91 return (data != null) ? data.state() : null;
......