Madan Jampani
Committed by Gerrit Code Review

Refactored IntentPartitionService as WorkPartitionService

Change-Id: Ic5cf1978b7fce55b34f84eae9b03c8f9ddcfb9c1
Showing 14 changed files with 113 additions and 121 deletions
......@@ -51,7 +51,7 @@ import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
......@@ -139,7 +139,7 @@ public class IntentPerfInstaller {
protected MastershipService mastershipService;
@Reference(cardinality = MANDATORY_UNARY)
protected IntentPartitionService partitionService;
protected WorkPartitionService partitionService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
......@@ -371,7 +371,7 @@ public class IntentPerfInstaller {
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(keyPrefix + k, appId);
NodeId leader = partitionService.getLeader(key);
NodeId leader = partitionService.getLeader(key, Key::hash);
if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
// Bail if we are not sending to this node or we have enough for this node
continue;
......
......@@ -23,13 +23,13 @@ import org.onosproject.event.AbstractEvent;
*/
//TODO change String into a proper object type
@Beta
public class IntentPartitionEvent extends AbstractEvent<IntentPartitionEvent.Type, String> {
public class WorkPartitionEvent extends AbstractEvent<WorkPartitionEvent.Type, String> {
public enum Type {
LEADER_CHANGED
}
public IntentPartitionEvent(Type type, String partition) {
public WorkPartitionEvent(Type type, String partition) {
super(type, partition);
}
}
......
......@@ -22,5 +22,5 @@ import org.onosproject.event.EventListener;
* Entity capable of receiving device partition-related events.
*/
@Beta
public interface IntentPartitionEventListener extends EventListener<IntentPartitionEvent> {
public interface WorkPartitionEventListener extends EventListener<WorkPartitionEvent> {
}
......
......@@ -15,34 +15,36 @@
*/
package org.onosproject.net.intent;
import com.google.common.annotations.Beta;
import java.util.function.Function;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.ListenerService;
import com.google.common.annotations.Beta;
/**
* Service for interacting with the intent partition-to-instance assignments.
* Service for partitioning work, represented via a unique identifier, onto cluster nodes.
*/
@Beta
public interface IntentPartitionService
extends ListenerService<IntentPartitionEvent, IntentPartitionEventListener> {
public interface WorkPartitionService
extends ListenerService<WorkPartitionEvent, WorkPartitionEventListener> {
/**
* Returns whether the given intent key is in a partition owned by this
* instance or not.
* Returns whether a given id maps to a partition owned by this
* instance.
*
* @param intentKey intent key to query
* @return true if the key is owned by this instance, otherwise false
* @param id id
* @param hasher function that maps id to a long value
* @return {@code true} if the id maps to a partition owned by this instance, otherwise {@code false}
*/
boolean isMine(Key intentKey);
<K> boolean isMine(K id, Function<K, Long> hasher);
/**
* Returns the leader for a particular key.
* Returns the owner for a given id.
*
* @param intentKey intent key to query
* @return the leader node
* @param id id to query
* @param hasher function that maps id to a long value
* @return the leader node identifier
*/
NodeId getLeader(Key intentKey);
// TODO add API for rebalancing partitions
<K> NodeId getLeader(K id, Function<K, Long> hasher);
}
......
......@@ -15,29 +15,31 @@
*/
package org.onosproject.net.intent;
import java.util.function.Function;
import org.onosproject.cluster.NodeId;
/**
* Testing adapter for the IntentPartitionService.
* Testing adapter for the WorkPartitionService.
*/
public class IntentPartitionServiceAdapter implements IntentPartitionService {
public class WorkPartitionServiceAdapter implements WorkPartitionService {
@Override
public boolean isMine(Key intentKey) {
public <K> boolean isMine(K id, Function<K, Long> hasher) {
return true;
}
@Override
public NodeId getLeader(Key intentKey) {
public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
return null;
}
@Override
public void addListener(IntentPartitionEventListener listener) {
public void addListener(WorkPartitionEventListener listener) {
}
@Override
public void removeListener(IntentPartitionEventListener listener) {
public void removeListener(WorkPartitionEventListener listener) {
}
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.trivial;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -32,14 +33,14 @@ import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionEvent;
import org.onosproject.net.intent.WorkPartitionEventListener;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
import java.util.Set;
import java.util.function.Function;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.*;
......@@ -53,7 +54,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Service
public class SimpleClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore, IntentPartitionService {
implements ClusterStore, WorkPartitionService {
public static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
......@@ -66,7 +67,7 @@ public class SimpleClusterStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
private boolean started = false;
@Activate
......@@ -74,14 +75,14 @@ public class SimpleClusterStore
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(IntentPartitionEvent.class);
eventDispatcher.removeSink(WorkPartitionEvent.class);
log.info("Stopped");
}
......@@ -126,25 +127,25 @@ public class SimpleClusterStore
}
@Override
public boolean isMine(Key intentKey) {
public <K> boolean isMine(K key, Function<K, Long> hasher) {
checkPermission(INTENT_READ);
return true;
}
@Override
public NodeId getLeader(Key intentKey) {
public <K> NodeId getLeader(K key, Function<K, Long> hasher) {
checkPermission(INTENT_READ);
return instance.id();
}
@Override
public void addListener(IntentPartitionEventListener listener) {
public void addListener(WorkPartitionEventListener listener) {
checkPermission(INTENT_EVENT);
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(IntentPartitionEventListener listener) {
public void removeListener(WorkPartitionEventListener listener) {
checkPermission(INTENT_EVENT);
listenerRegistry.removeListener(listener);
}
......
......@@ -43,9 +43,9 @@ import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionEvent;
import org.onosproject.net.intent.WorkPartitionEventListener;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.ResourceEvent;
import org.onosproject.net.resource.ResourceListener;
......@@ -113,7 +113,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentPartitionService partitionService;
protected WorkPartitionService partitionService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
......@@ -124,7 +124,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private ResourceListener resourceListener = new InternalResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
private IntentPartitionEventListener partitionListener = new InternalPartitionListener();
private WorkPartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
......@@ -417,9 +417,9 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
}
}
private final class InternalPartitionListener implements IntentPartitionEventListener {
private final class InternalPartitionListener implements WorkPartitionEventListener {
@Override
public void event(IntentPartitionEvent event) {
public void event(WorkPartitionEvent event) {
log.debug("got message {}", event.subject());
scheduleIntentUpdate(1);
}
......
......@@ -59,7 +59,7 @@ import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentExtensionService;
import org.onosproject.net.intent.IntentClockService;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.link.LinkAdminService;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.PacketService;
......@@ -227,7 +227,7 @@ public final class DefaultPolicyBuilder {
permSet.add(new ServicePermission(IntentService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(IntentClockService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(IntentExtensionService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(IntentPartitionService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(WorkPartitionService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(DeviceKeyService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(LinkService.class.getName(), ServicePermission.GET));
// permSet.add(new ServicePermission(MulticastRouteService.class.getName(), ServicePermission.GET));
......@@ -314,12 +314,12 @@ public final class DefaultPolicyBuilder {
serviceDirectory.put(HOST_EVENT, ImmutableSet.of(
HostService.class.getName()));
serviceDirectory.put(INTENT_READ, ImmutableSet.of(
IntentService.class.getName(), IntentPartitionService.class.getName(),
IntentService.class.getName(), WorkPartitionService.class.getName(),
IntentClockService.class.getName(), IntentExtensionService.class.getName()));
serviceDirectory.put(INTENT_WRITE, ImmutableSet.of(
IntentService.class.getName(), IntentExtensionService.class.getName()));
serviceDirectory.put(INTENT_EVENT, ImmutableSet.of(
IntentService.class.getName(), IntentPartitionService.class.getName()));
IntentService.class.getName(), WorkPartitionService.class.getName()));
// serviceDirectory.put(LINK_READ, ImmutableSet.of(
// LinkService.class.getName(), LinkResourceService.class.getName(),
// LabelResourceService.class.getName()));
......
......@@ -32,7 +32,7 @@ import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
......@@ -84,7 +84,7 @@ public class GossipIntentStore
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentPartitionService partitionService;
protected WorkPartitionService partitionService;
private final AtomicLong sequenceNumber = new AtomicLong(0);
......@@ -211,7 +211,7 @@ public class GossipIntentStore
}
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
NodeId master = partitionService.getLeader(key);
NodeId master = partitionService.getLeader(key, Key::hash);
NodeId origin = (data != null) ? data.origin() : null;
if (data != null && (master == null || origin == null)) {
log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
......@@ -283,7 +283,7 @@ public class GossipIntentStore
@Override
public boolean isMaster(Key intentKey) {
return partitionService.isMine(intentKey);
return partitionService.isMine(intentKey, Key::hash);
}
@Override
......
......@@ -29,10 +29,9 @@ import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.WorkPartitionEvent;
import org.onosproject.net.intent.WorkPartitionEventListener;
import org.onosproject.net.intent.WorkPartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,17 +43,18 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Manages the assignment of intent keyspace partitions to instances.
* Manages the assignment of work partitions to instances.
*/
@Component(immediate = true)
@Service
public class IntentPartitionManager implements IntentPartitionService {
public class WorkPartitionManager implements WorkPartitionService {
private static final Logger log = LoggerFactory.getLogger(IntentPartitionManager.class);
private static final Logger log = LoggerFactory.getLogger(WorkPartitionManager.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
......@@ -72,14 +72,14 @@ public class IntentPartitionManager implements IntentPartitionService {
private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
private static final int RETRY_AFTER_DELAY_SEC = 5;
private static final String ELECTION_PREFIX = "intent-partition-";
private static final String ELECTION_PREFIX = "work-partition-";
protected NodeId localNodeId;
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log));
.newScheduledThreadPool(1, groupedThreads("work-parition", "balancer-%d", log));
@Activate
public void activate() {
......@@ -87,7 +87,7 @@ public class IntentPartitionManager implements IntentPartitionService {
leadershipService.addListener(leaderListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
......@@ -103,7 +103,7 @@ public class IntentPartitionManager implements IntentPartitionService {
public void deactivate() {
executor.shutdownNow();
eventDispatcher.removeSink(IntentPartitionEvent.class);
eventDispatcher.removeSink(WorkPartitionEvent.class);
leadershipService.removeListener(leaderListener);
log.info("Stopped");
}
......@@ -112,9 +112,9 @@ public class IntentPartitionManager implements IntentPartitionService {
* Sets the specified executor to be used for scheduling background tasks.
*
* @param executor scheduled executor service for background tasks
* @return this PartitionManager
* @return this WorkPartitionManager
*/
IntentPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
WorkPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}
......@@ -123,38 +123,25 @@ public class IntentPartitionManager implements IntentPartitionService {
return ELECTION_PREFIX + i;
}
private String getPartitionPath(PartitionId id) {
return getPartitionPath(id.value());
}
private PartitionId getPartitionForKey(Key intentKey) {
int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
//TODO investigate Guava consistent hash method
// ... does it add significant computational complexity? is it worth it?
//int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
PartitionId id = new PartitionId(partition);
return id;
}
@Override
public boolean isMine(Key intentKey) {
return Objects.equals(leadershipService.getLeadership(getPartitionPath(getPartitionForKey(intentKey)))
.leaderNodeId(),
localNodeId);
public <K> boolean isMine(K id, Function<K, Long> hasher) {
return Objects.equals(localNodeId, getLeader(id, hasher));
}
@Override
public NodeId getLeader(Key intentKey) {
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
int partition = Math.abs(hasher.apply(id).intValue()) % NUM_PARTITIONS;
PartitionId partitionId = new PartitionId(partition);
return leadershipService.getLeadership(getPartitionPath(partitionId.value())).leaderNodeId();
}
@Override
public void addListener(IntentPartitionEventListener listener) {
public void addListener(WorkPartitionEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(IntentPartitionEventListener listener) {
public void removeListener(WorkPartitionEventListener listener) {
listenerRegistry.removeListener(listener);
}
......@@ -235,7 +222,7 @@ public class IntentPartitionManager implements IntentPartitionService {
if (Objects.equals(leadership.leaderNodeId(), localNodeId) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
eventDispatcher.post(new WorkPartitionEvent(WorkPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
......
......@@ -30,7 +30,7 @@ import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.net.intent.MockIdGenerator;
import org.onosproject.net.intent.IntentPartitionServiceAdapter;
import org.onosproject.net.intent.WorkPartitionServiceAdapter;
import org.onosproject.store.service.TestStorageService;
import static org.hamcrest.Matchers.is;
......@@ -52,7 +52,7 @@ public class GossipIntentStoreTest {
public void setUp() {
intentStore = new GossipIntentStore();
intentStore.storageService = new TestStorageService();
intentStore.partitionService = new IntentPartitionServiceAdapter();
intentStore.partitionService = new WorkPartitionServiceAdapter();
intentStore.clusterService = new ClusterServiceAdapter();
idGenerator = new MockIdGenerator();
Intent.bindIdGenerator(idGenerator);
......
......@@ -50,9 +50,9 @@ import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for the IntentPartitionManager class.
* Unit tests for the WorkPartitionManager class.
*/
public class IntentPartitionManagerTest {
public class WorkPartitionManagerTest {
private final LeadershipEvent event
= new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
......@@ -64,12 +64,12 @@ public class IntentPartitionManagerTest {
private static final NodeId OTHER_NODE_ID = new NodeId("other");
private static final NodeId INACTIVE_NODE_ID = new NodeId("inactive");
private static final String ELECTION_PREFIX = "intent-partition-";
private static final String ELECTION_PREFIX = "work-partition-";
private LeadershipService leadershipService;
private LeadershipEventListener leaderListener;
private IntentPartitionManager partitionManager;
private WorkPartitionManager partitionManager;
@Before
public void setUp() {
......@@ -77,13 +77,13 @@ public class IntentPartitionManagerTest {
leadershipService.addListener(anyObject(LeadershipEventListener.class));
expectLastCall().andDelegateTo(new TestLeadershipService());
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(null)
.times(1);
}
partitionManager = new IntentPartitionManager()
partitionManager = new WorkPartitionManager()
.withScheduledExecutor(new NullScheduledExecutor());
partitionManager.clusterService = new TestClusterService();
......@@ -109,14 +109,14 @@ public class IntentPartitionManagerTest {
.anyTimes();
}
for (int i = numMine; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
for (int i = numMine; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.getLeadership(ELECTION_PREFIX + i))
.andReturn(new Leadership(ELECTION_PREFIX + i,
new Leader(OTHER_NODE_ID, 1, 1000),
allNodes))
.anyTimes();
}
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.getCandidates(ELECTION_PREFIX + i))
.andReturn(Arrays.asList(MY_NODE_ID, OTHER_NODE_ID))
.anyTimes();
......@@ -133,7 +133,7 @@ public class IntentPartitionManagerTest {
leadershipService.addListener(anyObject(LeadershipEventListener.class));
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(null)
.times(1);
......@@ -159,20 +159,20 @@ public class IntentPartitionManagerTest {
Key myKey = new ControllableHashKey(0);
Key notMyKey = new ControllableHashKey(1);
assertTrue(partitionManager.isMine(myKey));
assertFalse(partitionManager.isMine(notMyKey));
assertTrue(partitionManager.isMine(myKey, Key::hash));
assertFalse(partitionManager.isMine(notMyKey, Key::hash));
// Make us the owner of 4 partitions now
reset(leadershipService);
setUpLeadershipService(4);
replay(leadershipService);
assertTrue(partitionManager.isMine(myKey));
assertTrue(partitionManager.isMine(myKey, Key::hash));
// notMyKey is now my key because because we're in control of that
// partition now
assertTrue(partitionManager.isMine(notMyKey));
assertTrue(partitionManager.isMine(notMyKey, Key::hash));
assertFalse(partitionManager.isMine(new ControllableHashKey(4)));
assertFalse(partitionManager.isMine(new ControllableHashKey(4), Key::hash));
}
/**
......@@ -183,7 +183,7 @@ public class IntentPartitionManagerTest {
@Test
public void testRebalanceScheduling() {
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS);
replay(leadershipService);
......@@ -202,7 +202,7 @@ public class IntentPartitionManagerTest {
@Test
public void testRebalance() {
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS);
leadershipService.withdraw(anyString());
expectLastCall().times(7);
......@@ -224,7 +224,7 @@ public class IntentPartitionManagerTest {
@Test
public void testNoRebalance() {
// Partitions are already perfectly balanced among the two active instances
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS / 2);
setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS / 2);
replay(leadershipService);
partitionManager.activate();
......@@ -236,7 +236,7 @@ public class IntentPartitionManagerTest {
reset(leadershipService);
// We have a smaller share than we should
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS / 2 - 1);
setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS / 2 - 1);
replay(leadershipService);
// trigger rebalance
......
......@@ -32,7 +32,7 @@ import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.Key;
......@@ -64,7 +64,7 @@ public class VirtualNetworkIntentService extends AbstractListenerManager<IntentE
protected IntentService intentService;
protected VirtualNetworkStore store;
protected IntentPartitionService partitionService;
protected WorkPartitionService partitionService;
private final VirtualNetwork network;
private final VirtualNetworkService manager;
......@@ -83,7 +83,7 @@ public class VirtualNetworkIntentService extends AbstractListenerManager<IntentE
this.manager = virtualNetworkManager;
this.store = serviceDirectory.get(VirtualNetworkStore.class);
this.intentService = serviceDirectory.get(IntentService.class);
this.partitionService = serviceDirectory.get(IntentPartitionService.class);
this.partitionService = serviceDirectory.get(WorkPartitionService.class);
}
@Override
......@@ -225,7 +225,7 @@ public class VirtualNetworkIntentService extends AbstractListenerManager<IntentE
checkNotNull(intentKey, INTENT_KEY_NULL);
Intent intent = getIntent(intentKey);
checkNotNull(intent, INTENT_NULL);
return partitionService.isMine(intentKey);
return partitionService.isMine(intentKey, Key::hash);
}
@Override
......
......@@ -54,8 +54,8 @@ import org.onosproject.net.intent.IntentCompiler;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentExtensionService;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.IntentPartitionServiceAdapter;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.WorkPartitionServiceAdapter;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentTestsMocks;
......@@ -103,7 +103,7 @@ public class VirtualNetworkIntentServiceTest extends TestDeviceParams {
private VirtualNetworkIntentService vnetIntentService;
private TestIntentCompiler compiler = new TestIntentCompiler();
private IntentExtensionService intentExtensionService;
private IntentPartitionService intentPartitionService;
private WorkPartitionService workPartitionService;
private ServiceDirectory testDirectory;
private TestListener listener = new TestListener();
private IdGenerator idGenerator = new MockIdGenerator();
......@@ -142,11 +142,11 @@ public class VirtualNetworkIntentServiceTest extends TestDeviceParams {
withdrawn = new Semaphore(0, true);
purged = new Semaphore(0, true);
intentPartitionService = new IntentPartitionServiceAdapter();
workPartitionService = new WorkPartitionServiceAdapter();
testDirectory = new TestServiceDirectory()
.add(VirtualNetworkStore.class, virtualNetworkManagerStore)
.add(IntentService.class, intentService)
.add(IntentPartitionService.class, intentPartitionService);
.add(WorkPartitionService.class, workPartitionService);
BaseResource.setServiceDirectory(testDirectory);
}
......@@ -215,7 +215,7 @@ public class VirtualNetworkIntentServiceTest extends TestDeviceParams {
vnetIntentService = new VirtualNetworkIntentService(manager, virtualNetwork, testDirectory);
vnetIntentService.intentService = intentService;
vnetIntentService.store = virtualNetworkManagerStore;
vnetIntentService.partitionService = intentPartitionService;
vnetIntentService.partitionService = workPartitionService;
return virtualNetwork;
}
......