Brian O'Connor

Adding Leadership Listener to IntentBatchService

ObjectiveTracker uses Leadership Listener to track intents that
it has become the leader of.

Change-Id: I039accb30d27ad718d79a9fec3f546dbdc78e62e
......@@ -266,9 +266,8 @@ public class DemoInstaller implements DemoAPI {
}
count++;
if (count > ITERATIONMAX) {
log.warn("A batch is stuck processing. current : {}" +
", pending : {}",
intentBatchService.getCurrentOperations(),
log.warn("A batch is stuck processing. " +
"pending : {}",
intentBatchService.getPendingOperations());
shutdownAndAwaitTermination(randomWorker);
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.intent;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.event.AbstractEvent;
/**
* A class to represent an intent related event.
*/
public class IntentBatchLeaderEvent extends AbstractEvent<IntentBatchLeaderEvent.Type, ApplicationId> {
public enum Type {
/**
* Signifies that this instance has become the leader for the given application id.
*/
ELECTED,
/**
* Signifies that instance is no longer the leader for a given application id.
*/
BOOTED
}
/**
* Creates an event of a given type and for the specified appId and the
* current time.
*
* @param type event type
* @param appId subject appId
* @param time time the event created in milliseconds since start of epoch
*/
public IntentBatchLeaderEvent(Type type, ApplicationId appId, long time) {
super(type, appId, time);
}
/**
* Creates an event of a given type and for the specified appId and the
* current time.
*
* @param type event type
* @param appId subject appId
*/
public IntentBatchLeaderEvent(Type type, ApplicationId appId) {
super(type, appId);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.intent;
import org.onlab.onos.event.EventListener;
/**
* Listener for {@link org.onlab.onos.net.intent.IntentEvent intent events}.
*/
public interface IntentBatchListener extends EventListener<IntentBatchLeaderEvent> {
}
......@@ -45,14 +45,6 @@ public interface IntentBatchService {
Set<IntentOperations> getPendingOperations();
/**
* Returns the set of intent batches currently being processed.
* @return set of batches
*/
//TODO we may want to get rid of this method
@Deprecated
Set<IntentOperations> getCurrentOperations();
/**
* Return true if this instance is the local leader for batch
* processing a given application id.
*
......@@ -75,4 +67,17 @@ public interface IntentBatchService {
*/
void unsetDelegate(IntentBatchDelegate delegate);
/**
* Adds the specified listener for intent batch leadership events.
*
* @param listener listener to be added
*/
void addListener(IntentBatchListener listener);
/**
* Removes the specified listener for intent batch leadership events.
*
* @param listener listener to be removed
*/
void removeListener(IntentBatchListener listener);
}
......
......@@ -15,22 +15,25 @@
*/
package org.onlab.onos.net.intent.impl;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.event.Event;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.NetworkResource;
import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
import org.onlab.onos.net.intent.IntentBatchListener;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.resource.LinkResourceEvent;
import org.onlab.onos.net.resource.LinkResourceListener;
......@@ -40,8 +43,10 @@ import org.onlab.onos.net.topology.TopologyListener;
import org.onlab.onos.net.topology.TopologyService;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -72,18 +77,26 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkResourceService resourceManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentBatchService batchService;
private ExecutorService executorService =
newSingleThreadExecutor(namedThreads("onos-flowtracker"));
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
private final LeadershipListener leaderListener = new LeadershipListener();
private TopologyChangeDelegate delegate;
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
batchService.addListener(leaderListener);
log.info("Started");
}
......@@ -91,6 +104,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
public void deactivate() {
topologyService.removeListener(listener);
resourceManager.removeListener(linkResourceListener);
batchService.removeListener(leaderListener);
log.info("Stopped");
}
......@@ -220,6 +234,38 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
//TODO consider adding flow rule event tracking
//FIXME the only intents that will be tracked are events that were
//executed on this instance. Need to have some backup trackers...
private void updateTrackedResources(ApplicationId appId, boolean track) {
intentService.getIntents().forEach(intent -> {
if (intent.appId().equals(appId)) {
IntentId id = intent.id();
Collection<NetworkResource> resources = Lists.newArrayList();
intentService.getInstallableIntents(id).stream()
.map(installable -> installable.resources())
.forEach(resources::addAll);
if (track) {
addTrackedResources(id, resources);
} else {
removeTrackedResources(id, resources);
}
}
});
}
private class LeadershipListener implements IntentBatchListener {
@Override
public void event(IntentBatchLeaderEvent event) {
log.debug("leadership event: {}", event);
ApplicationId appId = event.subject();
switch (event.type()) {
case ELECTED:
updateTrackedResources(appId, true);
break;
case BOOTED:
updateTrackedResources(appId, false);
break;
default:
break;
}
}
}
}
......
......@@ -111,16 +111,13 @@ public class IntentManagerTest {
//the batch has not yet been removed when we receive the last event
// FIXME: this doesn't guarantee to avoid the race
for (int tries = 0; tries < 10; tries++) {
if (manager.batchService.getPendingOperations().isEmpty() &&
manager.batchService.getCurrentOperations().isEmpty()) {
if (manager.batchService.getPendingOperations().isEmpty()) {
break;
}
delay(10);
}
assertTrue("There are still pending batch operations.",
manager.batchService.getPendingOperations().isEmpty());
assertTrue("There are still outstanding batch operations.",
manager.batchService.getCurrentOperations().isEmpty());
extensionService.unregisterCompiler(MockIntent.class);
extensionService.unregisterInstaller(MockInstallableIntent.class);
......
......@@ -35,7 +35,11 @@ import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
import org.onlab.onos.net.intent.IntentBatchListener;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.store.hz.SQueue;
......@@ -46,7 +50,6 @@ import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
......@@ -74,6 +77,10 @@ public class HazelcastIntentBatchQueue
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private HazelcastInstance theInstance;
private ControllerNode localControllerNode;
protected StoreSerializer serializer;
......@@ -85,6 +92,9 @@ public class HazelcastIntentBatchQueue
private final Map<ApplicationId, IntentOperations> outstandingOps
= Maps.newHashMap();
private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
listenerRegistry = new AbstractListenerRegistry<>();
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
......@@ -103,11 +113,13 @@ public class HazelcastIntentBatchQueue
}
};
eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
leadershipService.removeListener(leaderListener);
for (ApplicationId appId: batchQueues.keySet()) {
leadershipService.withdraw(getTopic(appId));
......@@ -277,12 +289,6 @@ public class HazelcastIntentBatchQueue
}
@Override
public Set<IntentOperations> getCurrentOperations() {
//FIXME this is not really implemented
return Collections.emptySet();
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return myTopics.contains(applicationId);
}
......@@ -298,4 +304,14 @@ public class HazelcastIntentBatchQueue
this.delegate = null;
}
}
@Override
public void addListener(IntentBatchListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(IntentBatchListener listener) {
listenerRegistry.removeListener(listener);
}
}
......
......@@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchListener;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
......@@ -83,14 +84,9 @@ public class SimpleIntentBatchQueue implements IntentBatchService {
@Override
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
return Sets.newHashSet(pendingBatches);
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
synchronized (this) {
return Sets.newHashSet(currentBatches);
Set<IntentOperations> set = Sets.newHashSet(pendingBatches);
set.addAll(currentBatches); // TODO refactor this current vs. pending
return set;
}
}
......@@ -110,4 +106,16 @@ public class SimpleIntentBatchQueue implements IntentBatchService {
this.delegate = null;
}
}
@Override
public void addListener(IntentBatchListener listener) {
// no-op
//TODO: we are always the master
}
@Override
public void removeListener(IntentBatchListener listener) {
// no-op
//TODO: we are always the master
}
}
......