Ray Milkey
Committed by Brian O'Connor

Start IntentOperations removal

Change-Id: Ib5fb9c19b37e447a62c61fa33bb98f3d789cbefa
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.intent;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.NetTestTools;
import org.onosproject.net.flow.TrafficSelector;
import com.google.common.testing.EqualsTester;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
/**
* Tests for the IntentOperations class.
*/
public class IntentOperationsTest {
final ConnectPoint egress = NetTestTools.connectPoint("egress", 3);
final ConnectPoint ingress = NetTestTools.connectPoint("ingress", 3);
final TrafficSelector selector = new IntentTestsMocks.MockSelector();
final IntentTestsMocks.MockTreatment treatment = new IntentTestsMocks.MockTreatment();
private final ApplicationId appId = new DefaultApplicationId(1, "IntentOperationsTest");
private Intent intent;
protected IdGenerator idGenerator = new MockIdGenerator();
@Before
public void setUp() {
Intent.bindIdGenerator(idGenerator);
intent = new PointToPointIntent(NetTestTools.APP_ID,
selector,
treatment,
ingress,
egress);
}
@After
public void tearDown() {
Intent.unbindIdGenerator(idGenerator);
}
/**
* Checks that the IntentOperation and IntentOperations classes are immutable.
*/
@Test
public void testImmutability() {
assertThatClassIsImmutable(IntentOperations.class);
assertThatClassIsImmutable(IntentOperations.Builder.class);
assertThatClassIsImmutable(IntentOperation.class);
}
/**
* Tests equals(), hashCode() and toString() methods.
*/
@Test
public void testEquals() {
final IntentOperations operations1 =
IntentOperations.builder(appId)
.addSubmitOperation(intent)
.build();
final IntentOperations sameAsOperations1 =
IntentOperations.builder(appId)
.addSubmitOperation(intent)
.build();
final IntentOperations operations2 =
IntentOperations.builder(appId)
.addReplaceOperation(intent.id(), intent)
.build();
new EqualsTester()
.addEqualityGroup(operations1, sameAsOperations1)
.addEqualityGroup(operations2)
.testEquals();
}
/**
* Checks that objects are created correctly.
*/
@Test
public void testConstruction() {
final IntentOperations operations =
IntentOperations.builder(appId)
.addUpdateOperation(intent.id())
.addWithdrawOperation(intent.id())
.build();
final List<IntentOperation> operationList = operations.operations();
assertThat(operationList, hasSize(2));
for (final IntentOperation operation : operationList) {
assertThat(operation.type(),
isOneOf(IntentOperation.Type.UPDATE,
IntentOperation.Type.WITHDRAW));
assertThat(operation.intent(), is((Intent) null));
assertThat(operation.intentId(), is(intent.id()));
}
}
}
......@@ -50,7 +50,6 @@ import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.net.resource.LinkResourceAllocations;
import org.onosproject.store.trivial.impl.SimpleIntentBatchQueue;
import org.onosproject.store.trivial.impl.SimpleIntentStore;
import com.google.common.collect.HashMultimap;
......@@ -278,7 +277,6 @@ public class IntentManagerTest {
manager = new IntentManager();
flowRuleService = new MockFlowRuleService();
manager.store = new SimpleIntentStore();
manager.batchService = new SimpleIntentBatchQueue();
manager.eventDispatcher = new TestEventDispatcher();
manager.trackerService = new TestIntentTracker();
manager.flowRuleService = flowRuleService;
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentBatchLeaderEvent;
import org.onosproject.net.intent.IntentBatchListener;
import org.onosproject.net.intent.IntentBatchService;
import org.onosproject.net.intent.IntentOperations;
import org.onosproject.store.hz.SQueue;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
@Component(immediate = true)
@Service
public class HazelcastIntentBatchQueue
implements IntentBatchService {
private final Logger log = getLogger(getClass());
private static final String TOPIC_BASE = "intent-batch-";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private HazelcastInstance theInstance;
private NodeId localControllerNodeId;
protected StoreSerializer serializer;
private IntentBatchDelegate delegate;
private InternalLeaderListener leaderListener = new InternalLeaderListener();
private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
= Maps.newHashMap();
private final Set<ApplicationId> myTopics = Sets.newHashSet();
private final Map<ApplicationId, IntentOperations> outstandingOps
= Maps.newHashMap();
private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
listenerRegistry = new AbstractListenerRegistry<>();
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
localControllerNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leaderListener);
serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
leadershipService.removeListener(leaderListener);
for (ApplicationId appId: batchQueues.keySet()) {
leadershipService.withdraw(getTopic(appId));
}
log.info("Stopped");
}
public static String getTopic(ApplicationId appId) {
return TOPIC_BASE + checkNotNull(appId.id());
}
public ApplicationId getAppId(String topic) {
checkState(topic.startsWith(TOPIC_BASE),
"Trying to get app id for invalid topic: {}", topic);
Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
return coreService.getAppId(id);
}
private SQueue<IntentOperations> getQueue(ApplicationId appId) {
SQueue<IntentOperations> queue = batchQueues.get(appId);
if (queue == null) {
synchronized (this) {
String topic = getTopic(appId);
IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
queue = new SQueue<>(rawQueue, serializer);
queue.addItemListener(new InternalItemListener(appId), false);
batchQueues.putIfAbsent(appId, queue);
leadershipService.runForLeadership(topic);
}
}
return queue;
}
@Override
public void addIntentOperations(IntentOperations ops) {
checkNotNull(ops, "Intent operations cannot be null.");
ApplicationId appId = ops.appId();
getQueue(appId).add(ops);
dispatchNextOperation(appId);
}
@Override
public void removeIntentOperations(IntentOperations ops) {
ApplicationId appId = ops.appId();
synchronized (this) {
IntentOperations outstanding = outstandingOps.get(appId);
if (outstanding != null) {
checkState(Objects.equals(ops, outstanding),
"Operation {} does not match outstanding operation {}",
ops, outstanding);
} else {
log.warn("Operation {} not found", ops);
}
SQueue<IntentOperations> queue = batchQueues.get(appId);
checkState(queue.remove().equals(ops),
"Operations are wrong.");
outstandingOps.remove(appId);
dispatchNextOperation(appId);
}
}
/**
* Dispatches the next available operations to the delegate, unless
* we are not the leader for this application id or there is an
* outstanding operations for this application id.
*
* @param appId application id
*/
private void dispatchNextOperation(ApplicationId appId) {
synchronized (this) {
if (!myTopics.contains(appId) ||
outstandingOps.containsKey(appId)) {
return;
}
IntentOperations ops = batchQueues.get(appId).peek();
if (ops != null) {
outstandingOps.put(appId, ops);
delegate.execute(ops);
}
}
}
/**
* Record the leadership change for the given topic. If we have become the
* leader, then dispatch the next operations. If we have lost leadership,
* then cancel the last operations.
*
* @param topic topic based on application id
* @param leader true if we have become the leader, false otherwise
*/
private void leaderChanged(String topic, boolean leader) {
ApplicationId appId = getAppId(topic);
synchronized (this) {
if (leader) {
myTopics.add(appId);
checkState(!outstandingOps.containsKey(appId),
"Existing intent ops for app id: {}", appId);
dispatchNextOperation(appId);
} else {
myTopics.remove(appId);
IntentOperations ops = outstandingOps.get(appId);
if (ops != null) {
delegate.cancel(ops);
}
outstandingOps.remove(appId);
}
}
}
private class InternalItemListener implements ItemListener<IntentOperations> {
private final ApplicationId appId;
public InternalItemListener(ApplicationId appId) {
this.appId = appId;
}
@Override
public void itemAdded(ItemEvent<IntentOperations> item) {
dispatchNextOperation(appId);
}
@Override
public void itemRemoved(ItemEvent<IntentOperations> item) {
// no-op
}
}
private class InternalLeaderListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
log.trace("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
String topic = event.subject().topic();
if (!topic.startsWith(TOPIC_BASE)) {
return; // Not our topic: ignore
}
if (!event.subject().leader().equals(localControllerNodeId)) {
// run for leadership
getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
}
switch (event.type()) {
case LEADER_ELECTED:
log.info("Elected leader for app {}", getAppId(topic));
leaderChanged(topic, true);
break;
case LEADER_BOOTED:
log.info("Lost leader election for app {}", getAppId(topic));
leaderChanged(topic, false);
break;
case LEADER_REELECTED:
break;
default:
break;
}
}
}
@Override
public Set<IntentOperations> getPendingOperations() {
Set<IntentOperations> ops = Sets.newHashSet();
synchronized (this) {
for (SQueue<IntentOperations> queue : batchQueues.values()) {
ops.addAll(queue);
}
return ops;
}
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return myTopics.contains(applicationId);
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
@Override
public void unsetDelegate(IntentBatchDelegate delegate) {
if (this.delegate != null && this.delegate.equals(delegate)) {
this.delegate = null;
}
}
@Override
public void addListener(IntentBatchListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(IntentBatchListener listener) {
listenerRegistry.removeListener(listener);
}
}
......@@ -84,7 +84,6 @@ import org.onosproject.net.intent.HostToHostIntent;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentOperation;
import org.onosproject.net.intent.IntentOperations;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.LinkCollectionIntent;
import org.onosproject.net.intent.MultiPointToSinglePointIntent;
......@@ -316,8 +315,7 @@ public final class KryoNamespaces {
ObstacleConstraint.class,
AnnotationConstraint.class,
BooleanConstraint.class,
IntentOperation.class,
IntentOperations.class
IntentOperation.class
)
.register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
.register(new URISerializer(), URI.class)
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.trivial.impl;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentBatchListener;
import org.onosproject.net.intent.IntentBatchService;
import org.onosproject.net.intent.IntentOperations;
import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class SimpleIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
synchronized (this) {
pendingBatches.add(operations);
if (currentBatches.isEmpty()) {
IntentOperations work = pendingBatches.poll();
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public void removeIntentOperations(IntentOperations operations) {
// we allow at most one outstanding batch at a time
synchronized (this) {
checkState(currentBatches.remove(operations), "Operations not found in current ops.");
checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
IntentOperations work = pendingBatches.poll();
if (work != null) {
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
Set<IntentOperations> set = Sets.newHashSet(pendingBatches);
set.addAll(currentBatches); // TODO refactor this current vs. pending
return set;
}
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return true;
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
@Override
public void unsetDelegate(IntentBatchDelegate delegate) {
if (this.delegate != null && this.delegate.equals(delegate)) {
this.delegate = null;
}
}
@Override
public void addListener(IntentBatchListener listener) {
// no-op
//TODO: we are always the master
}
@Override
public void removeListener(IntentBatchListener listener) {
// no-op
//TODO: we are always the master
}
}