Brian O'Connor

ONOS-2003 Fixing intent reroute after cluster change

Objective trackers update when partitions are shuffled to
track "local" intents.

Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
......@@ -20,5 +20,5 @@ import org.onosproject.event.EventListener;
/**
* Entity capable of receiving device leadership-related events.
*/
public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.intent;
import org.onosproject.event.AbstractEvent;
/**
* Partition event.
*/
//TODO change String into a proper object type
public class PartitionEvent extends AbstractEvent<PartitionEvent.Type, String> {
public enum Type {
LEADER_CHANGED
}
public PartitionEvent(Type type, String partition) {
super(type, partition);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.intent;
import org.onosproject.event.EventListener;
/**
* Entity capable of receiving device partition-related events.
*/
public interface PartitionEventListener extends EventListener<PartitionEvent> {
}
......@@ -40,4 +40,18 @@ public interface PartitionService {
NodeId getLeader(Key intentKey);
// TODO add API for rebalancing partitions
/**
* Registers a event listener to be notified of partition events.
*
* @param listener listener that will asynchronously notified of partition events.
*/
void addListener(PartitionEventListener listener);
/**
* Unregisters a event listener for partition events.
*
* @param listener listener to be removed.
*/
void removeListener(PartitionEventListener listener);
}
......
......@@ -23,6 +23,7 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.event.Event;
......@@ -38,8 +39,12 @@ import org.onosproject.net.device.DeviceService;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionEvent;
import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.link.LinkResourceEvent;
import org.onosproject.net.resource.link.LinkResourceListener;
......@@ -54,6 +59,10 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -94,25 +103,35 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
policy = ReferencePolicy.DYNAMIC)
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionService partitionService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
private PartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
partitionService.addListener(partitionListener);
log.info("Started");
}
......@@ -122,6 +141,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
resourceManager.removeListener(linkResourceListener);
deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
partitionService.removeListener(partitionListener);
log.info("Stopped");
}
......@@ -268,7 +288,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private void updateTrackedResources(ApplicationId appId, boolean track) {
if (intentService == null) {
log.debug("Intent service is not bound yet");
log.warn("Intent service is not bound yet");
return;
}
intentService.getIntents().forEach(intent -> {
......@@ -342,4 +362,52 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
executorService.execute(new DeviceAvailabilityHandler(id, available));
}
}
protected void doIntentUpdate() {
updateScheduled.set(false);
if (intentService == null) {
log.warn("Intent service is not bound yet");
return;
}
try {
//FIXME very inefficient
for (Intent intent : intentService.getIntents()) {
try {
if (intentService.isLocal(intent.key())) {
log.warn("intent {}, old: {}, new: {}",
intent.key(), intentsByDevice.values().contains(intent.key()), true);
addTrackedResources(intent.key(), intent.resources());
intentService.getInstallableIntents(intent.key()).stream()
.forEach(installable ->
addTrackedResources(intent.key(), installable.resources()));
} else {
log.warn("intent {}, old: {}, new: {}",
intent.key(), intentsByDevice.values().contains(intent.key()), false);
removeTrackedResources(intent.key(), intent.resources());
intentService.getInstallableIntents(intent.key()).stream()
.forEach(installable ->
removeTrackedResources(intent.key(), installable.resources()));
}
} catch (NullPointerException npe) {
log.warn("intent error {}", intent.key(), npe);
}
}
} catch (Exception e) {
log.warn("Exception caught during update task", e);
}
}
private void scheduleIntentUpdate(int afterDelaySec) {
if (updateScheduled.compareAndSet(false, true)) {
executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
}
}
private final class InternalPartitionListener implements PartitionEventListener {
@Override
public void event(PartitionEvent event) {
log.warn("got message {}", event.subject());
scheduleIntentUpdate(1);
}
}
}
......
......@@ -30,7 +30,11 @@ import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionEvent;
import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -58,6 +62,9 @@ public class PartitionManager implements PartitionService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
static final int NUM_PARTITIONS = 14;
......@@ -67,6 +74,7 @@ public class PartitionManager implements PartitionService {
private static final String ELECTION_PREFIX = "intent-partition-";
private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ClusterEventListener clusterListener = new InternalClusterEventListener();
......@@ -78,6 +86,9 @@ public class PartitionManager implements PartitionService {
leadershipService.addListener(leaderListener);
clusterService.addListener(clusterListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
}
......@@ -90,6 +101,7 @@ public class PartitionManager implements PartitionService {
public void deactivate() {
executor.shutdownNow();
eventDispatcher.removeSink(PartitionEvent.class);
leadershipService.removeListener(leaderListener);
clusterService.removeListener(clusterListener);
}
......@@ -133,6 +145,16 @@ public class PartitionManager implements PartitionService {
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
@Override
public void addListener(PartitionEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(PartitionEventListener listener) {
listenerRegistry.removeListener(listener);
}
protected void doRebalance() {
rebalanceScheduled.set(false);
try {
......@@ -203,6 +225,9 @@ public class PartitionManager implements PartitionService {
// See if we need to let some partitions go
scheduleRebalance(0);
eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
}
}
......
......@@ -28,6 +28,7 @@ import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.intent.Key;
import java.util.HashMap;
......@@ -86,6 +87,7 @@ public class PartitionManagerTest {
partitionManager.clusterService = new TestClusterService();
partitionManager.leadershipService = leadershipService;
partitionManager.eventDispatcher = new TestEventDispatcher();
}
/**
......
......@@ -19,6 +19,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
......@@ -28,7 +30,11 @@ import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionEvent;
import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
......@@ -55,14 +61,24 @@ public class SimpleClusterStore
private final DateTime creationTime = DateTime.now();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
@Activate
public void activate() {
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(PartitionEvent.class);
log.info("Stopped");
}
......@@ -110,4 +126,14 @@ public class SimpleClusterStore
public NodeId getLeader(Key intentKey) {
return instance.id();
}
@Override
public void addListener(PartitionEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(PartitionEventListener listener) {
listenerRegistry.removeListener(listener);
}
}
......