Yuta HIGUCHI
Committed by Brian O'Connor

IntentManager: use IntentStore batch APIs

Change-Id: Ie60f3e53f48fa6acbcaf5cf6837bdef12b36a98d
...@@ -159,6 +159,12 @@ public class SdnIpLeadershipService implements LeadershipService { ...@@ -159,6 +159,12 @@ public class SdnIpLeadershipService implements LeadershipService {
159 } 159 }
160 160
161 @Override 161 @Override
162 + public Map<String, Leadership> getLeaderBoard() {
163 + throw new UnsupportedOperationException("I don't know what to do." +
164 + " I wish you luck.");
165 + }
166 +
167 + @Override
162 public void addListener(LeadershipEventListener listener) { 168 public void addListener(LeadershipEventListener listener) {
163 listenerRegistry.addListener(listener); 169 listenerRegistry.addListener(listener);
164 } 170 }
......
...@@ -15,10 +15,13 @@ ...@@ -15,10 +15,13 @@
15 */ 15 */
16 package org.onlab.onos.cli.net; 16 package org.onlab.onos.cli.net;
17 17
18 +import com.google.common.collect.ArrayListMultimap;
18 import com.google.common.collect.Lists; 19 import com.google.common.collect.Lists;
19 import org.apache.karaf.shell.commands.Argument; 20 import org.apache.karaf.shell.commands.Argument;
20 import org.apache.karaf.shell.commands.Command; 21 import org.apache.karaf.shell.commands.Command;
21 import org.onlab.onos.cli.AbstractShellCommand; 22 import org.onlab.onos.cli.AbstractShellCommand;
23 +import org.onlab.onos.core.ApplicationId;
24 +import org.onlab.onos.core.CoreService;
22 import org.onlab.onos.net.ConnectPoint; 25 import org.onlab.onos.net.ConnectPoint;
23 import org.onlab.onos.net.DeviceId; 26 import org.onlab.onos.net.DeviceId;
24 import org.onlab.onos.net.PortNumber; 27 import org.onlab.onos.net.PortNumber;
...@@ -61,17 +64,27 @@ public class IntentPushTestCommand extends AbstractShellCommand ...@@ -61,17 +64,27 @@ public class IntentPushTestCommand extends AbstractShellCommand
61 required = true, multiValued = false) 64 required = true, multiValued = false)
62 String egressDeviceString = null; 65 String egressDeviceString = null;
63 66
64 - @Argument(index = 2, name = "count", 67 +
65 - description = "Number of intents to push", 68 + @Argument(index = 2, name = "Intents per appId",
69 + description = "Number of intents per appId",
66 required = true, multiValued = false) 70 required = true, multiValued = false)
67 - String countString = null; 71 + String intentsPerAppId = null;
72 +
73 + @Argument(index = 3, name = "apps",
74 + description = "Number of appIds",
75 + required = false, multiValued = false)
76 + String appIds = null;
77 +
68 78
69 private IntentService service; 79 private IntentService service;
70 private CountDownLatch latch; 80 private CountDownLatch latch;
71 private long start, end; 81 private long start, end;
82 + private int apps;
83 + private int intentsPerApp;
72 private int count; 84 private int count;
73 private boolean add; 85 private boolean add;
74 86
87 +
75 @Override 88 @Override
76 protected void execute() { 89 protected void execute() {
77 service = get(IntentService.class); 90 service = get(IntentService.class);
...@@ -85,13 +98,18 @@ public class IntentPushTestCommand extends AbstractShellCommand ...@@ -85,13 +98,18 @@ public class IntentPushTestCommand extends AbstractShellCommand
85 PortNumber egressPortNumber = portNumber(getPortNumber(egressDeviceString)); 98 PortNumber egressPortNumber = portNumber(getPortNumber(egressDeviceString));
86 ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber); 99 ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
87 100
88 - count = Integer.parseInt(countString); 101 + apps = appIds != null ? Integer.parseInt(appIds) : 1;
102 + intentsPerApp = Integer.parseInt(intentsPerAppId);
103 +
104 + count = intentsPerApp * apps;
105 +
89 106
90 service.addListener(this); 107 service.addListener(this);
91 108
109 + ArrayListMultimap<Integer, Intent> operations = generateIntents(ingress, egress);
110 +
92 add = true; 111 add = true;
93 latch = new CountDownLatch(count); 112 latch = new CountDownLatch(count);
94 - List<Intent> operations = generateIntents(ingress, egress);
95 submitIntents(operations); 113 submitIntents(operations);
96 114
97 add = false; 115 add = false;
...@@ -101,36 +119,41 @@ public class IntentPushTestCommand extends AbstractShellCommand ...@@ -101,36 +119,41 @@ public class IntentPushTestCommand extends AbstractShellCommand
101 service.removeListener(this); 119 service.removeListener(this);
102 } 120 }
103 121
104 - private List<Intent> generateIntents(ConnectPoint ingress, ConnectPoint egress) { 122 + private ArrayListMultimap<Integer, Intent> generateIntents(ConnectPoint ingress, ConnectPoint egress) {
105 TrafficSelector.Builder selector = DefaultTrafficSelector.builder() 123 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
106 .matchEthType(Ethernet.TYPE_IPV4); 124 .matchEthType(Ethernet.TYPE_IPV4);
107 TrafficTreatment treatment = DefaultTrafficTreatment.builder().build(); 125 TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
108 126
109 - List<Intent> intents = Lists.newArrayList(); 127 + ArrayListMultimap<Integer, Intent> intents = ArrayListMultimap.create();
110 - for (int i = 1; i <= count; i++) { 128 + for (int app = 1; app <= apps; app++) {
129 + for (int i = 1; i <= intentsPerApp; i++) {
111 TrafficSelector s = selector 130 TrafficSelector s = selector
112 .matchEthSrc(MacAddress.valueOf(i)) 131 .matchEthSrc(MacAddress.valueOf(i))
113 .build(); 132 .build();
114 - intents.add(new PointToPointIntent(appId(), s, treatment, 133 + intents.put(app, new PointToPointIntent(appId(), s, treatment,
115 ingress, egress)); 134 ingress, egress));
116 135
117 } 136 }
137 + }
118 return intents; 138 return intents;
119 } 139 }
120 140
121 - private void submitIntents(List<Intent> intents) { 141 + private void submitIntents(ArrayListMultimap<Integer, Intent> intents) {
122 - IntentOperations.Builder builder = IntentOperations.builder(appId()); 142 + List<IntentOperations> opList = Lists.newArrayList();
123 - for (Intent intent : intents) { 143 + for (Integer app : intents.keySet()) {
144 + IntentOperations.Builder builder = IntentOperations.builder(appId(app));
145 + for (Intent intent : intents.get(app)) {
124 if (add) { 146 if (add) {
125 builder.addSubmitOperation(intent); 147 builder.addSubmitOperation(intent);
126 } else { 148 } else {
127 builder.addWithdrawOperation(intent.id()); 149 builder.addWithdrawOperation(intent.id());
128 } 150 }
129 } 151 }
130 - IntentOperations ops = builder.build(); 152 + opList.add(builder.build());
153 + }
131 154
132 start = System.currentTimeMillis(); 155 start = System.currentTimeMillis();
133 - service.execute(ops); 156 + opList.forEach(ops -> service.execute(ops));
134 try { 157 try {
135 if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) { 158 if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) {
136 printResults(count); 159 printResults(count);
...@@ -148,6 +171,16 @@ public class IntentPushTestCommand extends AbstractShellCommand ...@@ -148,6 +171,16 @@ public class IntentPushTestCommand extends AbstractShellCommand
148 print("Time to %s %d intents: %d ms", text, count, delta); 171 print("Time to %s %d intents: %d ms", text, count, delta);
149 } 172 }
150 173
174 +
175 + /**
176 + * Returns application ID for the CLI.
177 + *
178 + * @return command-line application identifier
179 + */
180 + protected ApplicationId appId(Integer id) {
181 + return get(CoreService.class).registerApplication("org.onlab.onos.cli-" + id);
182 + }
183 +
151 /** 184 /**
152 * Extracts the port number portion of the ConnectPoint. 185 * Extracts the port number portion of the ConnectPoint.
153 * 186 *
......
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.onos.cli.net;
17 +
18 +import org.apache.karaf.shell.commands.Command;
19 +import org.onlab.onos.cli.AbstractShellCommand;
20 +import org.onlab.onos.cluster.Leadership;
21 +import org.onlab.onos.cluster.LeadershipService;
22 +
23 +import java.util.Map;
24 +
25 +/**
26 + * Prints the leader for every topic.
27 + */
28 +@Command(scope = "onos", name = "leaders",
29 + description = "Finds the leader for particular topic.")
30 +public class LeaderCommand extends AbstractShellCommand {
31 +
32 + @Override
33 + protected void execute() {
34 + LeadershipService leaderService = get(LeadershipService.class);
35 + Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
36 + print("Topic:\t\tLeader");
37 + for (String topic : leaderBoard.keySet()) {
38 + print("%s:\t%s", topic, leaderBoard.get(topic).leader().id());
39 + }
40 + }
41 +
42 +}
...@@ -258,6 +258,9 @@ ...@@ -258,6 +258,9 @@
258 <command> 258 <command>
259 <action class="org.onlab.onos.cli.net.AddFlowsCommand"/> 259 <action class="org.onlab.onos.cli.net.AddFlowsCommand"/>
260 </command> 260 </command>
261 + <command>
262 + <action class="org.onlab.onos.cli.net.LeaderCommand"/>
263 + </command>
261 264
262 <command> 265 <command>
263 <action class="org.onlab.onos.cli.net.WipeOutCommand"/> 266 <action class="org.onlab.onos.cli.net.WipeOutCommand"/>
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onlab.onos.cluster; 16 package org.onlab.onos.cluster;
17 17
18 +import java.util.Map;
19 +
18 /** 20 /**
19 * Service for leader election. 21 * Service for leader election.
20 * Leadership contests are organized around topics. A instance can join the 22 * Leadership contests are organized around topics. A instance can join the
...@@ -43,6 +45,8 @@ public interface LeadershipService { ...@@ -43,6 +45,8 @@ public interface LeadershipService {
43 */ 45 */
44 void withdraw(String path); 46 void withdraw(String path);
45 47
48 + Map<String, Leadership> getLeaderBoard();
49 +
46 /** 50 /**
47 * Registers a event listener to be notified of leadership events. 51 * Registers a event listener to be notified of leadership events.
48 * @param listener listener that will asynchronously notified of leadership events. 52 * @param listener listener that will asynchronously notified of leadership events.
......
...@@ -15,18 +15,10 @@ ...@@ -15,18 +15,10 @@
15 */ 15 */
16 package org.onlab.onos.net.intent.impl; 16 package org.onlab.onos.net.intent.impl;
17 17
18 -import java.util.ArrayList; 18 +import com.google.common.collect.ImmutableList;
19 -import java.util.Collections; 19 +import com.google.common.collect.ImmutableMap;
20 -import java.util.List; 20 +import com.google.common.collect.Lists;
21 -import java.util.Map; 21 +import com.google.common.collect.Maps;
22 -import java.util.concurrent.ConcurrentHashMap;
23 -import java.util.concurrent.ConcurrentMap;
24 -import java.util.concurrent.ExecutionException;
25 -import java.util.concurrent.ExecutorService;
26 -import java.util.concurrent.Future;
27 -import java.util.concurrent.TimeUnit;
28 -import java.util.concurrent.TimeoutException;
29 -
30 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
31 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
32 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -46,6 +38,7 @@ import org.onlab.onos.net.intent.IntentBatchDelegate; ...@@ -46,6 +38,7 @@ import org.onlab.onos.net.intent.IntentBatchDelegate;
46 import org.onlab.onos.net.intent.IntentBatchService; 38 import org.onlab.onos.net.intent.IntentBatchService;
47 import org.onlab.onos.net.intent.IntentCompiler; 39 import org.onlab.onos.net.intent.IntentCompiler;
48 import org.onlab.onos.net.intent.IntentEvent; 40 import org.onlab.onos.net.intent.IntentEvent;
41 +import org.onlab.onos.net.intent.IntentEvent.Type;
49 import org.onlab.onos.net.intent.IntentException; 42 import org.onlab.onos.net.intent.IntentException;
50 import org.onlab.onos.net.intent.IntentExtensionService; 43 import org.onlab.onos.net.intent.IntentExtensionService;
51 import org.onlab.onos.net.intent.IntentId; 44 import org.onlab.onos.net.intent.IntentId;
...@@ -56,23 +49,27 @@ import org.onlab.onos.net.intent.IntentOperations; ...@@ -56,23 +49,27 @@ import org.onlab.onos.net.intent.IntentOperations;
56 import org.onlab.onos.net.intent.IntentService; 49 import org.onlab.onos.net.intent.IntentService;
57 import org.onlab.onos.net.intent.IntentState; 50 import org.onlab.onos.net.intent.IntentState;
58 import org.onlab.onos.net.intent.IntentStore; 51 import org.onlab.onos.net.intent.IntentStore;
52 +import org.onlab.onos.net.intent.IntentStore.BatchWrite;
59 import org.onlab.onos.net.intent.IntentStoreDelegate; 53 import org.onlab.onos.net.intent.IntentStoreDelegate;
60 import org.slf4j.Logger; 54 import org.slf4j.Logger;
61 55
62 -import com.google.common.collect.ImmutableList; 56 +import java.util.ArrayList;
63 -import com.google.common.collect.ImmutableMap; 57 +import java.util.Collections;
64 -import com.google.common.collect.Lists; 58 +import java.util.EnumSet;
65 -import com.google.common.collect.Maps; 59 +import java.util.List;
60 +import java.util.Map;
61 +import java.util.concurrent.ConcurrentHashMap;
62 +import java.util.concurrent.ConcurrentMap;
63 +import java.util.concurrent.ExecutionException;
64 +import java.util.concurrent.ExecutorService;
65 +import java.util.concurrent.Future;
66 +import java.util.concurrent.TimeUnit;
67 +import java.util.concurrent.TimeoutException;
66 68
67 import static com.google.common.base.Preconditions.checkArgument; 69 import static com.google.common.base.Preconditions.checkArgument;
68 import static com.google.common.base.Preconditions.checkNotNull; 70 import static com.google.common.base.Preconditions.checkNotNull;
69 -import static java.util.concurrent.Executors.newSingleThreadExecutor; 71 +import static java.util.concurrent.Executors.newFixedThreadPool;
70 -import static org.onlab.onos.net.intent.IntentState.COMPILING; 72 +import static org.onlab.onos.net.intent.IntentState.*;
71 -import static org.onlab.onos.net.intent.IntentState.FAILED;
72 -import static org.onlab.onos.net.intent.IntentState.INSTALLED;
73 -import static org.onlab.onos.net.intent.IntentState.INSTALLING;
74 -import static org.onlab.onos.net.intent.IntentState.WITHDRAWING;
75 -import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
76 import static org.onlab.util.Tools.namedThreads; 73 import static org.onlab.util.Tools.namedThreads;
77 import static org.slf4j.LoggerFactory.getLogger; 74 import static org.slf4j.LoggerFactory.getLogger;
78 75
...@@ -88,6 +85,8 @@ public class IntentManager ...@@ -88,6 +85,8 @@ public class IntentManager
88 public static final String INTENT_NULL = "Intent cannot be null"; 85 public static final String INTENT_NULL = "Intent cannot be null";
89 public static final String INTENT_ID_NULL = "Intent ID cannot be null"; 86 public static final String INTENT_ID_NULL = "Intent ID cannot be null";
90 87
88 + private static final int NUM_THREADS = 12;
89 +
91 // Collections for compiler, installer, and listener are ONOS instance local 90 // Collections for compiler, installer, and listener are ONOS instance local
92 private final ConcurrentMap<Class<? extends Intent>, 91 private final ConcurrentMap<Class<? extends Intent>,
93 IntentCompiler<? extends Intent>> compilers = new ConcurrentHashMap<>(); 92 IntentCompiler<? extends Intent>> compilers = new ConcurrentHashMap<>();
...@@ -117,7 +116,6 @@ public class IntentManager ...@@ -117,7 +116,6 @@ public class IntentManager
117 116
118 117
119 private ExecutorService executor; 118 private ExecutorService executor;
120 - private ExecutorService monitorExecutor;
121 119
122 private final IntentStoreDelegate delegate = new InternalStoreDelegate(); 120 private final IntentStoreDelegate delegate = new InternalStoreDelegate();
123 private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate(); 121 private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
...@@ -130,8 +128,7 @@ public class IntentManager ...@@ -130,8 +128,7 @@ public class IntentManager
130 trackerService.setDelegate(topoDelegate); 128 trackerService.setDelegate(topoDelegate);
131 batchService.setDelegate(batchDelegate); 129 batchService.setDelegate(batchDelegate);
132 eventDispatcher.addSink(IntentEvent.class, listenerRegistry); 130 eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
133 - executor = newSingleThreadExecutor(namedThreads("onos-intents")); 131 + executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-monitor"));
134 - monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
135 idGenerator = coreService.getIdGenerator("intent-ids"); 132 idGenerator = coreService.getIdGenerator("intent-ids");
136 Intent.bindIdGenerator(idGenerator); 133 Intent.bindIdGenerator(idGenerator);
137 log.info("Started"); 134 log.info("Started");
...@@ -144,7 +141,6 @@ public class IntentManager ...@@ -144,7 +141,6 @@ public class IntentManager
144 batchService.unsetDelegate(batchDelegate); 141 batchService.unsetDelegate(batchDelegate);
145 eventDispatcher.removeSink(IntentEvent.class); 142 eventDispatcher.removeSink(IntentEvent.class);
146 executor.shutdown(); 143 executor.shutdown();
147 - monitorExecutor.shutdown();
148 Intent.unbindIdGenerator(idGenerator); 144 Intent.unbindIdGenerator(idGenerator);
149 log.info("Stopped"); 145 log.info("Stopped");
150 } 146 }
...@@ -288,7 +284,7 @@ public class IntentManager ...@@ -288,7 +284,7 @@ public class IntentManager
288 private void executeCompilingPhase(IntentUpdate update) { 284 private void executeCompilingPhase(IntentUpdate update) {
289 Intent intent = update.newIntent(); 285 Intent intent = update.newIntent();
290 // Indicate that the intent is entering the compiling phase. 286 // Indicate that the intent is entering the compiling phase.
291 - update.setState(intent, COMPILING); 287 + update.setInflightState(intent, COMPILING);
292 288
293 try { 289 try {
294 // Compile the intent into installable derivatives. 290 // Compile the intent into installable derivatives.
...@@ -301,7 +297,7 @@ public class IntentManager ...@@ -301,7 +297,7 @@ public class IntentManager
301 log.warn("Unable to compile intent {} due to:", intent.id(), e); 297 log.warn("Unable to compile intent {} due to:", intent.id(), e);
302 298
303 // If compilation failed, mark the intent as failed. 299 // If compilation failed, mark the intent as failed.
304 - update.setState(intent, FAILED); 300 + update.setInflightState(intent, FAILED);
305 } 301 }
306 } 302 }
307 303
...@@ -338,7 +334,7 @@ public class IntentManager ...@@ -338,7 +334,7 @@ public class IntentManager
338 return; 334 return;
339 } 335 }
340 // Indicate that the intent is entering the installing phase. 336 // Indicate that the intent is entering the installing phase.
341 - update.setState(update.newIntent(), INSTALLING); 337 + update.setInflightState(update.newIntent(), INSTALLING);
342 338
343 List<FlowRuleBatchOperation> batches = Lists.newArrayList(); 339 List<FlowRuleBatchOperation> batches = Lists.newArrayList();
344 for (Intent installable : update.newInstallables()) { 340 for (Intent installable : update.newInstallables()) {
...@@ -365,7 +361,7 @@ public class IntentManager ...@@ -365,7 +361,7 @@ public class IntentManager
365 */ 361 */
366 private void executeWithdrawingPhase(IntentUpdate update) { 362 private void executeWithdrawingPhase(IntentUpdate update) {
367 if (!update.oldIntent().equals(update.newIntent())) { 363 if (!update.oldIntent().equals(update.newIntent())) {
368 - update.setState(update.oldIntent(), WITHDRAWING); 364 + update.setInflightState(update.oldIntent(), WITHDRAWING);
369 } // else newIntent is FAILED 365 } // else newIntent is FAILED
370 update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables())); 366 update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
371 } 367 }
...@@ -405,9 +401,9 @@ public class IntentManager ...@@ -405,9 +401,9 @@ public class IntentManager
405 "Old and New Intent must have equivalent installable intents."); 401 "Old and New Intent must have equivalent installable intents.");
406 if (!update.oldIntent().equals(update.newIntent())) { 402 if (!update.oldIntent().equals(update.newIntent())) {
407 // only set the old intent's state if it is different 403 // only set the old intent's state if it is different
408 - update.setState(update.oldIntent(), WITHDRAWING); 404 + update.setInflightState(update.oldIntent(), WITHDRAWING);
409 } 405 }
410 - update.setState(update.newIntent(), INSTALLING); 406 + update.setInflightState(update.newIntent(), INSTALLING);
411 407
412 List<FlowRuleBatchOperation> batches = Lists.newArrayList(); 408 List<FlowRuleBatchOperation> batches = Lists.newArrayList();
413 for (int i = 0; i < update.oldInstallables().size(); i++) { 409 for (int i = 0; i < update.oldInstallables().size(); i++) {
...@@ -427,7 +423,7 @@ public class IntentManager ...@@ -427,7 +423,7 @@ public class IntentManager
427 log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e); 423 log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
428 //FIXME... we failed. need to uninstall (if same) or revert (if different) 424 //FIXME... we failed. need to uninstall (if same) or revert (if different)
429 trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources()); 425 trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
430 - update.setState(update.newIntent(), FAILED); 426 + update.setInflightState(update.newIntent(), FAILED);
431 batches = uninstallIntent(update.oldIntent(), update.oldInstallables()); 427 batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
432 } 428 }
433 } 429 }
...@@ -539,6 +535,7 @@ public class IntentManager ...@@ -539,6 +535,7 @@ public class IntentManager
539 } 535 }
540 } 536 }
541 537
538 + // TODO move this inside IntentUpdate?
542 /** 539 /**
543 * TODO. rename this... 540 * TODO. rename this...
544 * @param update intent update 541 * @param update intent update
...@@ -560,7 +557,7 @@ public class IntentManager ...@@ -560,7 +557,7 @@ public class IntentManager
560 if (update.oldIntent() != null && 557 if (update.oldIntent() != null &&
561 !update.oldIntent().equals(update.newIntent())) { 558 !update.oldIntent().equals(update.newIntent())) {
562 // removing failed intent 559 // removing failed intent
563 - update.setState(update.oldIntent(), WITHDRAWING); 560 + update.setInflightState(update.oldIntent(), WITHDRAWING);
564 } 561 }
565 // if (update.newIntent() != null) { 562 // if (update.newIntent() != null) {
566 // // TODO assert that next state is failed 563 // // TODO assert that next state is failed
...@@ -602,13 +599,6 @@ public class IntentManager ...@@ -602,13 +599,6 @@ public class IntentManager
602 newIntent = null; 599 newIntent = null;
603 break; 600 break;
604 } 601 }
605 - // add new intent to store (if required)
606 - if (newIntent != null) {
607 - IntentEvent event = store.createIntent(newIntent);
608 - if (event != null) {
609 - eventDispatcher.post(event);
610 - }
611 - }
612 // fetch the old intent's installables from the store 602 // fetch the old intent's installables from the store
613 if (oldIntent != null) { 603 if (oldIntent != null) {
614 oldInstallables = store.getInstallableIntents(oldIntent.id()); 604 oldInstallables = store.getInstallableIntents(oldIntent.id());
...@@ -617,6 +607,13 @@ public class IntentManager ...@@ -617,6 +607,13 @@ public class IntentManager
617 } 607 }
618 } 608 }
619 609
610 + void init(BatchWrite batchWrite) {
611 + // add new intent to store (if required)
612 + if (newIntent != null) {
613 + batchWrite.createIntent(newIntent);
614 + }
615 + }
616 +
620 Intent oldIntent() { 617 Intent oldIntent() {
621 return oldIntent; 618 return oldIntent;
622 } 619 }
...@@ -635,7 +632,10 @@ public class IntentManager ...@@ -635,7 +632,10 @@ public class IntentManager
635 632
636 void setInstallables(List<Intent> installables) { 633 void setInstallables(List<Intent> installables) {
637 newInstallables = installables; 634 newInstallables = installables;
638 - store.setInstallableIntents(newIntent.id(), installables); 635 + //FIXME batch this
636 +
637 + //store.setInstallableIntents(newIntent.id(), installables);
638 +
639 } 639 }
640 640
641 boolean isComplete() { 641 boolean isComplete() {
...@@ -646,12 +646,17 @@ public class IntentManager ...@@ -646,12 +646,17 @@ public class IntentManager
646 return !isComplete() ? batches.get(currentBatch) : null; 646 return !isComplete() ? batches.get(currentBatch) : null;
647 } 647 }
648 648
649 - void incrementBatch(boolean success) { 649 + List<IntentEvent> batchSuccess(BatchWrite batchWrite) {
650 - if (success) { // actually increment 650 + // move on to next Batch
651 if (++currentBatch == batches.size()) { 651 if (++currentBatch == batches.size()) {
652 - finalizeStates(); 652 + return finalizeStates(batchWrite);
653 + }
654 + return Collections.emptyList();
653 } 655 }
654 - } else { // the current batch has failed, so recompile 656 +
657 + void batchFailed() {
658 +
659 + // the current batch has failed, so recompile
655 // remove the current batch and all remaining 660 // remove the current batch and all remaining
656 for (int i = currentBatch; i < batches.size(); i++) { 661 for (int i = currentBatch; i < batches.size(); i++) {
657 batches.remove(i); 662 batches.remove(i);
...@@ -660,28 +665,32 @@ public class IntentManager ...@@ -660,28 +665,32 @@ public class IntentManager
660 executeWithdrawingPhase(this); // remove the old intent 665 executeWithdrawingPhase(this); // remove the old intent
661 } 666 }
662 if (newIntent != null) { 667 if (newIntent != null) {
663 - setState(newIntent, FAILED); 668 + setInflightState(newIntent, FAILED);
664 batches.addAll(uninstallIntent(newIntent, newInstallables())); 669 batches.addAll(uninstallIntent(newIntent, newInstallables()));
665 } 670 }
666 671
667 // FIXME: should we try to recompile? 672 // FIXME: should we try to recompile?
668 } 673 }
669 - }
670 674
671 // FIXME make sure this is called!!! 675 // FIXME make sure this is called!!!
672 - private void finalizeStates() { 676 + private List<IntentEvent> finalizeStates(BatchWrite batchWrite) {
677 + // events to be triggered on successful write
678 + List<IntentEvent> events = new ArrayList<>();
673 for (Intent intent : stateMap.keySet()) { 679 for (Intent intent : stateMap.keySet()) {
674 - switch (getState(intent)) { 680 + switch (getInflightState(intent)) {
675 case INSTALLING: 681 case INSTALLING:
676 - setState(intent, INSTALLED); 682 + batchWrite.setState(intent, INSTALLED);
683 + batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
684 + events.add(new IntentEvent(Type.INSTALLED, intent));
677 break; 685 break;
678 case WITHDRAWING: 686 case WITHDRAWING:
679 - setState(intent, WITHDRAWN); 687 + batchWrite.setState(intent, WITHDRAWN);
680 - store.removeInstalledIntents(intent.id()); 688 + events.add(new IntentEvent(Type.WITHDRAWN, intent));
681 - //store.removeIntent(intent.id()); // FIXME we die a horrible death here 689 + batchWrite.removeInstalledIntents(intent.id());
690 + batchWrite.removeIntent(intent.id());
682 break; 691 break;
683 case FAILED: 692 case FAILED:
684 - store.removeInstalledIntents(intent.id()); 693 + batchWrite.removeInstalledIntents(intent.id());
685 break; 694 break;
686 695
687 // FALLTHROUGH to default from here 696 // FALLTHROUGH to default from here
...@@ -692,10 +701,11 @@ public class IntentManager ...@@ -692,10 +701,11 @@ public class IntentManager
692 case INSTALLED: 701 case INSTALLED:
693 default: 702 default:
694 //FIXME clean this up (we shouldn't ever get here) 703 //FIXME clean this up (we shouldn't ever get here)
695 - log.warn("Bad state: {} for {}", getState(intent), intent); 704 + log.warn("Bad state: {} for {}", getInflightState(intent), intent);
696 break; 705 break;
697 } 706 }
698 } 707 }
708 + return events;
699 } 709 }
700 710
701 List<FlowRuleBatchOperation> batches() { 711 List<FlowRuleBatchOperation> batches() {
...@@ -706,11 +716,21 @@ public class IntentManager ...@@ -706,11 +716,21 @@ public class IntentManager
706 this.batches.addAll(batches); 716 this.batches.addAll(batches);
707 } 717 }
708 718
709 - IntentState getState(Intent intent) { 719 + IntentState getInflightState(Intent intent) {
710 return stateMap.get(intent); 720 return stateMap.get(intent);
711 } 721 }
712 722
713 - void setState(Intent intent, IntentState newState) { 723 +
724 + // set transient state during intent update process
725 + void setInflightState(Intent intent, IntentState newState) {
726 + // This method should be called for
727 + // transition to non-parking or Failed only
728 + EnumSet<IntentState> nonParkingOrFailed
729 + = EnumSet.complementOf(EnumSet.of(SUBMITTED, INSTALLED, WITHDRAWN));
730 + if (!nonParkingOrFailed.contains(newState)) {
731 + log.error("Unexpected transition to {}", newState);
732 + }
733 +
714 // TODO: clean this up, or set to debug 734 // TODO: clean this up, or set to debug
715 IntentState oldState = stateMap.get(intent); 735 IntentState oldState = stateMap.get(intent);
716 log.debug("intent id: {}, old state: {}, new state: {}", 736 log.debug("intent id: {}, old state: {}, new state: {}",
...@@ -721,10 +741,6 @@ public class IntentManager ...@@ -721,10 +741,6 @@ public class IntentManager
721 if (event != null) { 741 if (event != null) {
722 eventDispatcher.post(event); 742 eventDispatcher.post(event);
723 } 743 }
724 -
725 - if (newState == WITHDRAWN) {
726 - store.removeIntent(intent.id());
727 - }
728 } 744 }
729 745
730 Map<Intent, IntentState> stateMap() { 746 Map<Intent, IntentState> stateMap() {
...@@ -741,6 +757,7 @@ public class IntentManager ...@@ -741,6 +757,7 @@ public class IntentManager
741 private final IntentOperations ops; 757 private final IntentOperations ops;
742 private final List<IntentUpdate> intentUpdates = Lists.newArrayList(); 758 private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
743 759
760 + // future holding current FlowRuleBatch installation result
744 private Future<CompletedBatchOperation> future; 761 private Future<CompletedBatchOperation> future;
745 private long startTime = System.currentTimeMillis(); 762 private long startTime = System.currentTimeMillis();
746 private long endTime; 763 private long endTime;
...@@ -758,9 +775,27 @@ public class IntentManager ...@@ -758,9 +775,27 @@ public class IntentManager
758 } 775 }
759 776
760 private void buildIntentUpdates() { 777 private void buildIntentUpdates() {
778 + BatchWrite batchWrite = store.newBatchWrite();
779 +
780 + // create context and record new request to store
761 for (IntentOperation op : ops.operations()) { 781 for (IntentOperation op : ops.operations()) {
762 IntentUpdate update = new IntentUpdate(op); 782 IntentUpdate update = new IntentUpdate(op);
783 + update.init(batchWrite);
763 intentUpdates.add(update); 784 intentUpdates.add(update);
785 + }
786 +
787 + if (!batchWrite.isEmpty()) {
788 + store.batchWrite(batchWrite);
789 + }
790 +
791 + // start processing each Intents
792 + for (IntentUpdate update : intentUpdates) {
793 + if (update.newIntent() != null) {
794 + IntentState state = store.getIntentState(update.newIntent().id());
795 + if (state == SUBMITTED) {
796 + eventDispatcher.post(new IntentEvent(Type.SUBMITTED, update.newIntent));
797 + }
798 + }
764 processIntentUpdate(update); 799 processIntentUpdate(update);
765 } 800 }
766 future = applyNextBatch(); 801 future = applyNextBatch();
...@@ -784,8 +819,16 @@ public class IntentManager ...@@ -784,8 +819,16 @@ public class IntentManager
784 819
785 private void updateBatches(CompletedBatchOperation completed) { 820 private void updateBatches(CompletedBatchOperation completed) {
786 if (completed.isSuccess()) { 821 if (completed.isSuccess()) {
822 + BatchWrite batchWrite = store.newBatchWrite();
823 + List<IntentEvent> events = new ArrayList<>();
787 for (IntentUpdate update : intentUpdates) { 824 for (IntentUpdate update : intentUpdates) {
788 - update.incrementBatch(true); 825 + events.addAll(update.batchSuccess(batchWrite));
826 + }
827 + if (!batchWrite.isEmpty()) {
828 + store.batchWrite(batchWrite);
829 + for (IntentEvent event : events) {
830 + eventDispatcher.post(event);
831 + }
789 } 832 }
790 } else { 833 } else {
791 // entire batch has been reverted... 834 // entire batch has been reverted...
...@@ -798,7 +841,7 @@ public class IntentManager ...@@ -798,7 +841,7 @@ public class IntentManager
798 installables.addAll(update.oldInstallables()); 841 installables.addAll(update.oldInstallables());
799 for (Intent intent : installables) { 842 for (Intent intent : installables) {
800 if (intent.id().equals(targetId)) { 843 if (intent.id().equals(targetId)) {
801 - update.incrementBatch(false); 844 + update.batchFailed();
802 break; 845 break;
803 } 846 }
804 } 847 }
...@@ -834,12 +877,13 @@ public class IntentManager ...@@ -834,12 +877,13 @@ public class IntentManager
834 if (installAttempt++ >= MAX_ATTEMPTS) { 877 if (installAttempt++ >= MAX_ATTEMPTS) {
835 log.warn("Install request timed out: {}", ops); 878 log.warn("Install request timed out: {}", ops);
836 for (IntentUpdate update : intentUpdates) { 879 for (IntentUpdate update : intentUpdates) {
837 - update.incrementBatch(false); 880 + update.batchFailed();
838 } 881 }
839 } // else just resubmit the work 882 } // else just resubmit the work
840 future = applyNextBatch(); 883 future = applyNextBatch();
841 - monitorExecutor.submit(this); 884 + executor.submit(this);
842 } else { 885 } else {
886 + log.error("Cancelling FlowRuleBatch failed.");
843 // FIXME 887 // FIXME
844 // cancel failed... batch is broken; shouldn't happen! 888 // cancel failed... batch is broken; shouldn't happen!
845 // we could manually reverse everything 889 // we could manually reverse everything
...@@ -859,20 +903,37 @@ public class IntentManager ...@@ -859,20 +903,37 @@ public class IntentManager
859 if (intentUpdates.isEmpty()) { 903 if (intentUpdates.isEmpty()) {
860 // this should only be called on the first iteration 904 // this should only be called on the first iteration
861 // note: this a "expensive", so it is not done in the constructor 905 // note: this a "expensive", so it is not done in the constructor
906 +
907 + // - creates per Intent installation context (IntentUpdate)
908 + // - write Intents to store
909 + // - process (compile, install, etc.) each Intents
910 + // - generate FlowRuleBatch for this phase
862 buildIntentUpdates(); 911 buildIntentUpdates();
863 } 912 }
913 +
914 + // - peek if current FlowRuleBatch is complete
915 + // -- If complete OK:
916 + // step each IntentUpdate forward
917 + // If phase left: generate next FlowRuleBatch
918 + // If no more phase: write parking states
919 + // -- If complete FAIL:
920 + // Intent which failed: transition Intent to FAILED
921 + // Other Intents: resubmit same FlowRuleBatch for this phase
864 processFutures(); 922 processFutures();
865 if (isComplete()) { 923 if (isComplete()) {
866 // there are no outstanding batches; we are done 924 // there are no outstanding batches; we are done
867 batchService.removeIntentOperations(ops); 925 batchService.removeIntentOperations(ops);
868 } else if (endTime < System.currentTimeMillis()) { 926 } else if (endTime < System.currentTimeMillis()) {
927 + // - cancel current FlowRuleBatch and resubmit again
869 retry(); 928 retry();
870 } else { 929 } else {
871 // we are not done yet, yield the thread by resubmitting ourselves 930 // we are not done yet, yield the thread by resubmitting ourselves
872 - monitorExecutor.submit(this); 931 + executor.submit(this);
873 } 932 }
874 } catch (Exception e) { 933 } catch (Exception e) {
875 log.error("Error submitting batches:", e); 934 log.error("Error submitting batches:", e);
935 + // FIXME incomplete Intents should be cleaned up
936 + // (transition to FAILED, etc.)
876 } 937 }
877 } 938 }
878 } 939 }
...@@ -883,7 +944,7 @@ public class IntentManager ...@@ -883,7 +944,7 @@ public class IntentManager
883 log.info("Execute {} operation(s).", operations.operations().size()); 944 log.info("Execute {} operation(s).", operations.operations().size());
884 log.debug("Execute operations: {}", operations.operations()); 945 log.debug("Execute operations: {}", operations.operations());
885 //FIXME: perhaps we want to track this task so that we can cancel it. 946 //FIXME: perhaps we want to track this task so that we can cancel it.
886 - monitorExecutor.execute(new IntentInstallMonitor(operations)); 947 + executor.execute(new IntentInstallMonitor(operations));
887 } 948 }
888 949
889 @Override 950 @Override
......
...@@ -11,6 +11,7 @@ import java.util.concurrent.Executors; ...@@ -11,6 +11,7 @@ import java.util.concurrent.Executors;
11 import java.util.concurrent.ScheduledExecutorService; 11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit; 12 import java.util.concurrent.TimeUnit;
13 13
14 +import com.google.common.collect.ImmutableMap;
14 import org.apache.felix.scr.annotations.Activate; 15 import org.apache.felix.scr.annotations.Activate;
15 import org.apache.felix.scr.annotations.Component; 16 import org.apache.felix.scr.annotations.Component;
16 import org.apache.felix.scr.annotations.Deactivate; 17 import org.apache.felix.scr.annotations.Deactivate;
...@@ -159,6 +160,11 @@ public class LeadershipManager implements LeadershipService { ...@@ -159,6 +160,11 @@ public class LeadershipManager implements LeadershipService {
159 } 160 }
160 161
161 @Override 162 @Override
163 + public Map<String, Leadership> getLeaderBoard() {
164 + return ImmutableMap.copyOf(leaderBoard);
165 + }
166 +
167 + @Override
162 public void addListener(LeadershipEventListener listener) { 168 public void addListener(LeadershipEventListener listener) {
163 checkArgument(listener != null); 169 checkArgument(listener != null);
164 listeners.add(listener); 170 listeners.add(listener);
......