Madan Jampani
Committed by Gerrit Code Review

ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.

- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy

Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
......@@ -16,6 +16,7 @@
package org.onosproject.mastership;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
......@@ -80,7 +81,7 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
* @param deviceId device identifier
* @return a mastership event
*/
MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId);
CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId);
/**
* Returns the current master and number of past mastership hand-offs
......@@ -100,7 +101,7 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId);
/**
* Allows a controller instance to give up its current role for a device.
......@@ -111,7 +112,7 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId);
/**
* Removes all the roles for the specified controller instance.
......
......@@ -17,6 +17,7 @@ package org.onosproject.cluster.impl;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -50,6 +51,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.metrics.MetricsUtil.startTimer;
......@@ -111,26 +114,28 @@ public class MastershipManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
MastershipEvent event = null;
CompletableFuture<MastershipEvent> eventFuture = null;
switch (role) {
case MASTER:
event = store.setMaster(nodeId, deviceId);
eventFuture = store.setMaster(nodeId, deviceId);
break;
case STANDBY:
event = store.setStandby(nodeId, deviceId);
eventFuture = store.setStandby(nodeId, deviceId);
break;
case NONE:
event = store.relinquishRole(nodeId, deviceId);
eventFuture = store.relinquishRole(nodeId, deviceId);
break;
default:
log.info("Unknown role; ignoring");
return;
}
if (event != null) {
post(event);
}
eventFuture.whenComplete((event, error) -> {
if (event != null) {
post(event);
}
});
}
@Override
......@@ -141,12 +146,12 @@ public class MastershipManager
@Override
public void relinquishMastership(DeviceId deviceId) {
MastershipEvent event = null;
event = store.relinquishRole(
clusterService.getLocalNode().id(), deviceId);
if (event != null) {
post(event);
}
store.relinquishRole(clusterService.getLocalNode().id(), deviceId)
.whenComplete((event, error) -> {
if (event != null) {
post(event);
}
});
}
@Override
......
......@@ -62,6 +62,8 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
......@@ -181,6 +183,7 @@ public class NewDistributedFlowRuleStore
registerMessageHandlers(messageHandlingExecutor);
if (backupEnabled) {
replicaInfoManager.addListener(flowTable);
backupTask = backupSenderExecutor.scheduleWithFixedDelay(
flowTable::backup,
0,
......@@ -193,6 +196,10 @@ public class NewDistributedFlowRuleStore
@Deactivate
public void deactivate(ComponentContext context) {
if (backupEnabled) {
replicaInfoManager.removeListener(flowTable);
backupTask.cancel(true);
}
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
messageHandlingExecutor.shutdownNow();
......@@ -232,9 +239,14 @@ public class NewDistributedFlowRuleStore
boolean restartBackupTask = false;
if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
if (!backupEnabled && backupTask != null) {
backupTask.cancel(false);
backupTask = null;
if (!backupEnabled) {
replicaInfoManager.removeListener(flowTable);
if (backupTask != null) {
backupTask.cancel(false);
backupTask = null;
}
} else {
replicaInfoManager.addListener(flowTable);
}
restartBackupTask = backupEnabled;
}
......@@ -590,7 +602,7 @@ public class NewDistributedFlowRuleStore
}
}
private class InternalFlowTable {
private class InternalFlowTable implements ReplicaInfoEventListener {
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
......@@ -603,6 +615,43 @@ public class NewDistributedFlowRuleStore
return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
}
@Override
public void event(ReplicaInfoEvent event) {
if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
DeviceId deviceId = event.subject();
if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
// ignore since this event is for a device this node does not manage.
return;
}
NodeId latestBackupNode = getBackupNode(deviceId);
NodeId existingBackupNode = lastBackupNodes.get(deviceId);
if (Objects.equal(latestBackupNode, existingBackupNode)) {
// ignore since backup location hasn't changed.
return;
}
backupFlowEntries(latestBackupNode, Sets.newHashSet(deviceId));
}
}
private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Maps.newConcurrentMap();
flowEntries.forEach((key, value) -> {
if (deviceIds.contains(key)) {
deviceFlowEntries.put(key, value);
}
});
clusterCommunicator.unicast(deviceFlowEntries,
FLOW_TABLE_BACKUP,
SERIALIZER::encode,
nodeId);
deviceIds.forEach(id -> {
lastBackupTimes.put(id, System.currentTimeMillis());
lastBackupNodes.put(id, nodeId);
});
}
/**
* Returns the flow table for specified device.
*
......@@ -662,7 +711,6 @@ public class NewDistributedFlowRuleStore
if (!backupEnabled) {
return;
}
//TODO: Force backup when backups change.
try {
// determine the set of devices that we need to backup during this run.
Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
......@@ -686,35 +734,15 @@ public class NewDistributedFlowRuleStore
.add(deviceId);
}
});
// send the device flow entries to their respective backup nodes
devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Maps.newConcurrentMap();
flowEntries.forEach((key, value) -> {
if (deviceIds.contains(key)) {
deviceFlowEntries.put(key, value);
}
});
clusterCommunicator.unicast(deviceFlowEntries,
FLOW_TABLE_BACKUP,
SERIALIZER::encode,
nodeId);
});
// update state for use in subsequent run.
devicesToBackupByNode.forEach((node, devices) -> {
devices.forEach(id -> {
lastBackupTimes.put(id, System.currentTimeMillis());
lastBackupNodes.put(id, node);
});
});
devicesToBackupByNode.forEach(this::backupFlowEntries);
} catch (Exception e) {
log.error("Backup failed.", e);
}
}
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
log.debug("Received flows for {} to backup", flowTables.keySet());
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
......
......@@ -25,8 +25,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -98,11 +101,13 @@ public class ConsistentDeviceMastershipStore
Pattern.compile("device:(.*)");
private ExecutorService messageHandlingExecutor;
private ScheduledExecutorService transferExecutor;
private final LeadershipEventListener leadershipEventListener =
new InternalDeviceMastershipEventListener();
private static final String NODE_ID_NULL = "Node ID cannot be null";
private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
private static final String DEVICE_ID_NULL = "Device ID cannot be null";
private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
public static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
......@@ -119,7 +124,11 @@ public class ConsistentDeviceMastershipStore
@Activate
public void activate() {
messageHandlingExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
Executors.newSingleThreadExecutor(
groupedThreads("onos/store/device/mastership", "message-handler"));
transferExecutor =
Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
SERIALIZER::decode,
deviceId -> getRole(localNodeId, deviceId),
......@@ -127,7 +136,7 @@ public class ConsistentDeviceMastershipStore
messageHandlingExecutor);
clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode,
deviceId -> relinquishRole(localNodeId, deviceId),
this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
......@@ -147,6 +156,7 @@ public class ConsistentDeviceMastershipStore
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
log.info("Stoppped.");
......@@ -246,26 +256,36 @@ public class ConsistentDeviceMastershipStore
}
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
return null;
return CompletableFuture.completedFuture(null);
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
return null;
return CompletableFuture.completedFuture(null);
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
return transitionFromMasterToStandby(deviceId);
CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
// There is brief wait before we step down from mastership.
// This is to ensure any work that happens when standby preference
// order changes can complete. For example: flow entries need to be backed
// to the new top standby (ONOS-1883)
// FIXME: This potentially introduces a race-condition.
// Right now role changes are only forced via CLI.
transferExecutor.schedule(() -> {
result.complete(transitionFromMasterToStandby(deviceId));
}, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
return result;
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
}
return null;
return CompletableFuture.completedFuture(null);
}
@Override
......@@ -278,13 +298,13 @@ public class ConsistentDeviceMastershipStore
}
@Override
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (!nodeId.equals(currentMaster)) {
return null;
return CompletableFuture.completedFuture(null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
......@@ -304,20 +324,25 @@ public class ConsistentDeviceMastershipStore
}
@Override
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
return futureGetOrElse(clusterCommunicator.sendAndReceive(
return clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
nodeId), null);
nodeId);
}
return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
}
private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
// Check if this node is can be managed by this node.
if (!connectedDevices.contains(deviceId)) {
......
......@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -130,7 +131,7 @@ public class DistributedMastershipStore
}
@Override
public MastershipEvent setMaster(NodeId newMaster, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
roleMap.lock(deviceId);
try {
......@@ -147,7 +148,7 @@ public class DistributedMastershipStore
log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
// trigger BACKUPS_CHANGED?
}
return null;
return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
final NodeId currentMaster = rv.get(MASTER);
......@@ -163,10 +164,11 @@ public class DistributedMastershipStore
rv.reassign(newMaster, STANDBY, NONE);
updateTerm(deviceId);
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
return null;
return CompletableFuture.completedFuture(null);
}
} finally {
roleMap.unlock(deviceId);
......@@ -282,7 +284,7 @@ public class DistributedMastershipStore
}
@Override
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
// if nodeId was MASTER, rotate STANDBY
// if nodeId was STANDBY no-op
// if nodeId was NONE, add to STANDBY
......@@ -298,30 +300,33 @@ public class DistributedMastershipStore
updateTerm(deviceId);
if (newMaster != null) {
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// no master candidate
roleMap.put(deviceId, rv);
// TBD: Should there be new event type for no MASTER?
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
}
case STANDBY:
return null;
return CompletableFuture.completedFuture(null);
case NONE:
rv.reassign(nodeId, NONE, STANDBY);
roleMap.put(deviceId, rv);
return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
}
return null;
return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
// relinquishRole is basically set to None
// If nodeId was master reelect next and remove nodeId
......@@ -337,13 +342,14 @@ public class DistributedMastershipStore
if (newMaster != null) {
updateTerm(deviceId);
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// No master candidate - no more backups, device is likely
// fully disconnected
roleMap.put(deviceId, rv);
// Should there be new event type?
return null;
return CompletableFuture.completedFuture(null);
}
case STANDBY:
//fall through to reinforce relinquishment
......@@ -351,13 +357,14 @@ public class DistributedMastershipStore
boolean modified = rv.reassign(nodeId, STANDBY, NONE);
if (modified) {
roleMap.put(deviceId, rv);
return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
}
return null;
return CompletableFuture.completedFuture(null);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
return null;
return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
......@@ -374,10 +381,11 @@ public class DistributedMastershipStore
if (roleValue.contains(MASTER, nodeId) ||
roleValue.contains(STANDBY, nodeId)) {
MastershipEvent event = relinquishRole(nodeId, deviceId);
if (event != null) {
events.add(event);
}
relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
if (event != null) {
events.add(event);
}
});
}
}
notifyDelegate(events);
......
......@@ -44,6 +44,7 @@ import org.onosproject.store.hz.TestStoreManager;
import org.onosproject.store.serializers.KryoSerializer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
......@@ -168,15 +169,15 @@ public class DistributedMastershipStoreTest {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
assertNull("wrong event:", dms.setMaster(N1, DID1));
assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1)));
//switch over to N2
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.setMaster(N2, DID1)).type());
System.out.println(dms.getTermFor(DID1).master() + ":" + dms.getTermFor(DID1).termNumber());
assertEquals("wrong term", MastershipTerm.of(N2, 2), dms.getTermFor(DID1));
//orphan switch - should be rare case
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.setMaster(N2, DID2)).type());
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
//disconnect and reconnect - sign of failing re-election or single-instance channel
dms.roleMap.clear();
......@@ -190,18 +191,18 @@ public class DistributedMastershipStoreTest {
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
//no backup, no new MASTER/event
assertNull("wrong event:", dms.relinquishRole(N1, DID1));
assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1)));
dms.requestRole(DID1);
//add backup CN2, get it elected MASTER by relinquishing
testStore.setCurrent(CN2);
assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
//all nodes "give up" on device, which goes back to NONE.
assertNull("wrong event:", dms.relinquishRole(N2, DID1));
assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N2, DID1)));
assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
assertEquals("wrong number of retired nodes", 2,
......@@ -215,11 +216,11 @@ public class DistributedMastershipStoreTest {
dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
//If STANDBY, should drop to NONE
assertEquals("wrong event:", Type.BACKUPS_CHANGED, dms.relinquishRole(N1, DID1).type());
assertEquals("wrong event:", Type.BACKUPS_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
//NONE - nothing happens
assertEquals("wrong event:", Type.BACKUPS_CHANGED, dms.relinquishRole(N1, DID2).type());
assertEquals("wrong event:", Type.BACKUPS_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID2)).type());
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
}
......
......@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
......@@ -143,13 +144,13 @@ public class SimpleMastershipStore
}
@Override
public synchronized MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
// no-op
return null;
return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
NodeId prevMaster = masterMap.put(deviceId, nodeId);
......@@ -162,8 +163,8 @@ public class SimpleMastershipStore
return null;
}
return new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
}
@Override
......@@ -285,7 +286,7 @@ public class SimpleMastershipStore
}
@Override
public synchronized MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
......@@ -294,22 +295,22 @@ public class SimpleMastershipStore
// no master alternative
masterMap.remove(deviceId);
// TODO: Should there be new event type for no MASTER?
return new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
} else {
NodeId prevMaster = masterMap.put(deviceId, backup);
incrementTerm(deviceId);
addToBackup(deviceId, prevMaster);
return new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
}
case STANDBY:
case NONE:
boolean modified = addToBackup(deviceId, nodeId);
if (modified) {
return new MastershipEvent(BACKUPS_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
}
break;
......@@ -335,20 +336,20 @@ public class SimpleMastershipStore
}
@Override
public synchronized MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
NodeId backup = reelect(deviceId, nodeId);
masterMap.put(deviceId, backup);
incrementTerm(deviceId);
return new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
case STANDBY:
if (removeFromBackups(deviceId, nodeId)) {
return new MastershipEvent(BACKUPS_CHANGED, deviceId,
getNodes(deviceId));
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
}
break;
......@@ -358,12 +359,12 @@ public class SimpleMastershipStore
default:
log.warn("unknown Mastership Role {}", role);
}
return null;
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized void relinquishAllRole(NodeId nodeId) {
List<MastershipEvent> events = new ArrayList<>();
List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
Set<DeviceId> toRelinquish = new HashSet<>();
masterMap.entrySet().stream()
......@@ -375,12 +376,13 @@ public class SimpleMastershipStore
.forEach(entry -> toRelinquish.add(entry.getKey()));
toRelinquish.forEach(deviceId -> {
MastershipEvent event = relinquishRole(nodeId, deviceId);
if (event != null) {
events.add(event);
}
eventFutures.add(relinquishRole(nodeId, deviceId));
});
notifyDelegate(events);
eventFutures.forEach(future -> {
future.whenComplete((event, error) -> {
notifyDelegate(event);
});
});
}
}
......
......@@ -29,6 +29,7 @@ import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
......@@ -92,15 +93,15 @@ public class SimpleMastershipStoreTest {
@Test
public void setMaster() {
put(DID1, N1, false, false);
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID1).type());
assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID1)).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID1));
//set node that's already master - should be ignored
assertNull("wrong event", sms.setMaster(N1, DID1));
assertNull("wrong event", Futures.getUnchecked(sms.setMaster(N1, DID1)));
//set STANDBY to MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID2).type());
assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID2)).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID2));
}
......@@ -156,7 +157,7 @@ public class SimpleMastershipStoreTest {
//no backup, MASTER
put(DID1, N1, true, false);
assertNull("expect no MASTER event", sms.setStandby(N1, DID1).roleInfo().master());
assertNull("expect no MASTER event", Futures.getUnchecked(sms.setStandby(N1, DID1)).roleInfo().master());
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
......@@ -164,7 +165,7 @@ public class SimpleMastershipStoreTest {
put(DID1, N1, true, true);
put(DID1, N2, false, true);
put(DID2, N2, true, true);
MastershipEvent event = sms.setStandby(N1, DID1);
MastershipEvent event = Futures.getUnchecked(sms.setStandby(N1, DID1));
assertEquals("wrong event", MASTER_CHANGED, event.type());
assertEquals("wrong master", N2, event.roleInfo().master());
}
......