Brian O'Connor
Committed by Ray Milkey

Adding device and host tracking for intents (ONOS-1356)

Also, this should fix ONOS-1184 (intents submitted before hosts detected).

Change-Id: I47a503c18dc728912132eb2e2fcc160d47e518eb
......@@ -18,5 +18,5 @@ package org.onosproject.net;
/**
* Immutable representation of a network element identity.
*/
public abstract class ElementId {
public abstract class ElementId implements NetworkResource {
}
......
......@@ -15,15 +15,14 @@
*/
package org.onosproject.net.intent;
import java.util.Collections;
import java.util.List;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.HostId;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import com.google.common.base.MoreObjects;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -147,7 +146,7 @@ public final class HostToHostIntent extends ConnectivityIntent {
TrafficTreatment treatment,
List<Constraint> constraints,
int priority) {
super(appId, key, Collections.emptyList(), selector, treatment,
super(appId, key, ImmutableSet.of(one, two), selector, treatment,
constraints, priority);
// TODO: consider whether the case one and two are same is allowed
......
......@@ -337,8 +337,8 @@ public class IntentManager
private void submitUpdates(List<FinalIntentProcessPhase> updates) {
store.batchWrite(updates.stream()
.map(FinalIntentProcessPhase::data)
.collect(Collectors.toList()));
.map(FinalIntentProcessPhase::data)
.collect(Collectors.toList()));
}
}
......@@ -377,7 +377,9 @@ public class IntentManager
throw new IllegalStateException("installable intents must be FlowRuleIntent");
}
installables.forEach(x -> trackerService.addTrackedResources(data.key(), x.resources()));
trackerService.addTrackedResources(data.key(), data.intent().resources());
installables.forEach(installable ->
trackerService.addTrackedResources(data.key(), installable.resources()));
List<Collection<FlowRule>> stages = installables.stream()
.map(x -> (FlowRuleIntent) x)
......@@ -415,7 +417,10 @@ public class IntentManager
throw new IllegalStateException("installable intents must be FlowRuleIntent");
}
installables.forEach(x -> trackerService.removeTrackedResources(data.intent().key(), x.resources()));
trackerService.removeTrackedResources(data.key(), data.intent().resources());
installables.forEach(installable ->
trackerService.removeTrackedResources(data.intent().key(),
installable.resources()));
List<Collection<FlowRule>> stages = installables.stream()
.map(x -> (FlowRuleIntent) x)
......
......@@ -26,9 +26,18 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.event.Event;
import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.HostId;
import org.onosproject.net.Link;
import org.onosproject.net.LinkKey;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.link.LinkEvent;
......@@ -41,6 +50,7 @@ import org.onosproject.net.topology.TopologyService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -69,27 +79,40 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
//TODO this could be slow as a point of synchronization
synchronizedSetMultimap(HashMultimap.<LinkKey, Key>create());
private final SetMultimap<ElementId, Key> intentsByDevice =
synchronizedSetMultimap(HashMultimap.<ElementId, Key>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkResourceService resourceManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected IntentService intentService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "flowtracker"));
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
private TopologyChangeDelegate delegate;
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
log.info("Started");
}
......@@ -97,6 +120,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
public void deactivate() {
topologyService.removeListener(listener);
resourceManager.removeListener(linkResourceListener);
deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
log.info("Stopped");
}
......@@ -132,6 +157,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
for (NetworkResource resource : resources) {
if (resource instanceof Link) {
intentsByLink.put(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
intentsByDevice.put((ElementId) resource, intentKey);
}
}
}
......@@ -142,6 +169,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
for (NetworkResource resource : resources) {
if (resource instanceof Link) {
intentsByLink.remove(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
intentsByDevice.remove((ElementId) resource, intentKey);
}
}
}
......@@ -171,7 +200,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
}
if (event.reasons() == null || event.reasons().isEmpty()) {
delegate.triggerCompile(new HashSet<Key>(), true);
delegate.triggerCompile(Collections.emptySet(), true);
} else {
Set<Key> toBeRecompiled = new HashSet<>();
......@@ -231,7 +260,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
return;
}
delegate.triggerCompile(new HashSet<>(), true);
delegate.triggerCompile(Collections.emptySet(), true);
}
}
......@@ -257,4 +286,60 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
}
});
}
/*
* Re-dispatcher of device and host events.
*/
private class DeviceAvailabilityHandler implements Runnable {
private final ElementId id;
private final boolean available;
DeviceAvailabilityHandler(ElementId id, boolean available) {
this.id = checkNotNull(id);
this.available = available;
}
@Override
public void run() {
// If there is no delegate, why bother? Just bail.
if (delegate == null) {
return;
}
// TODO should we recompile on available==true?
delegate.triggerCompile(intentsByDevice.get(id), available);
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
if (type == DeviceEvent.Type.PORT_ADDED ||
type == DeviceEvent.Type.PORT_UPDATED ||
type == DeviceEvent.Type.PORT_REMOVED) {
// skip port events for now
return;
}
DeviceId id = event.subject().id();
// TODO we need to check whether AVAILABILITY_CHANGED means up or down
boolean available = (type == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
type == DeviceEvent.Type.DEVICE_ADDED ||
type == DeviceEvent.Type.DEVICE_UPDATED);
executorService.execute(new DeviceAvailabilityHandler(id, available));
}
}
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
HostId id = event.subject().id();
HostEvent.Type type = event.type();
boolean available = (type == HostEvent.Type.HOST_ADDED);
executorService.execute(new DeviceAvailabilityHandler(id, available));
}
}
}
......
......@@ -29,8 +29,11 @@ import org.onlab.junit.TestUtils;
import org.onlab.junit.TestUtils.TestUtilsException;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.Event;
import org.onosproject.net.Device;
import org.onosproject.net.Link;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.MockIdGenerator;
......@@ -50,6 +53,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.onosproject.net.NetTestTools.APP_ID;
import static org.onosproject.net.NetTestTools.device;
import static org.onosproject.net.NetTestTools.link;
/**
......@@ -62,6 +66,7 @@ public class ObjectiveTrackerTest {
private TestTopologyChangeDelegate delegate;
private List<Event> reasons;
private TopologyListener listener;
private DeviceListener deviceListener;
private LinkResourceListener linkResourceListener;
private IdGenerator mockGenerator;
......@@ -78,6 +83,7 @@ public class ObjectiveTrackerTest {
tracker.setDelegate(delegate);
reasons = new LinkedList<>();
listener = TestUtils.getField(tracker, "listener");
deviceListener = TestUtils.getField(tracker, "deviceListener");
linkResourceListener = TestUtils.getField(tracker, "linkResourceListener");
mockGenerator = new MockIdGenerator();
Intent.bindIdGenerator(mockGenerator);
......@@ -235,4 +241,86 @@ public class ObjectiveTrackerTest {
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
/**
* Tests an event for a host becoming available that matches an intent.
*
* @throws InterruptedException if the latch wait fails.
*/
@Test
public void testEventHostAvailableMatch() throws Exception {
final Device host = device("host1");
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
reasons.add(deviceEvent);
final Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(true));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
equalTo("0x333"));
}
/**
* Tests an event for a host becoming unavailable that matches an intent.
*
* @throws InterruptedException if the latch wait fails.
*/
@Test
public void testEventHostUnavailableMatch() throws Exception {
final Device host = device("host1");
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
reasons.add(deviceEvent);
final Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(false));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
equalTo("0x333"));
}
/**
* Tests an event for a host becoming available that matches an intent.
*
* @throws InterruptedException if the latch wait fails.
*/
@Test
public void testEventHostAvailableNoMatch() throws Exception {
final Device host = device("host1");
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
reasons.add(deviceEvent);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
}
......