Jon Hall

[ONOS-4460] Relinquish device role when partitioned away from cluster

Change-Id: I578029614cced96a2d4503e4fe3052c927f051ab
......@@ -41,13 +41,23 @@ public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leaders
* Signifies that the leader for a topic has changed.
*/
// TODO: We may not need this. We currently do not support a way for a current leader to step down
// while still reamining a candidate
// while still remaining a candidate
LEADER_CHANGED,
/**
* Signifies a change in the list of candidates for a topic.
*/
CANDIDATES_CHANGED
CANDIDATES_CHANGED,
/**
* Signifies the Leadership Elector is unavailable.
*/
SERVICE_DISRUPTED,
/**
* Signifies the Leadership Elector is available again.
*/
SERVICE_RESTORED
}
/**
......
......@@ -24,7 +24,7 @@ import org.onosproject.store.Store;
public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
/**
* Adds registration for the local instance to be leader for topic.
* Adds registration for the local instance to be part of the leadership contest for topic.
*
* @param topic leadership topic
* @return Updated leadership after operation is completed
......
......@@ -44,7 +44,13 @@ public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceI
* the change in the backups list is accompanied by a change in
* master, the event is subsumed by MASTER_CHANGED.
*/
BACKUPS_CHANGED
BACKUPS_CHANGED,
/**
* Signifies that the underlying storage for the Mastership state
* of this device is unavailable.
*/
SUSPENDED
}
/**
......
......@@ -729,16 +729,17 @@ public class DeviceManager
}
private void handleMastershipEvent(MastershipEvent event) {
if (event.type() != MastershipEvent.Type.MASTER_CHANGED) {
if (event.type() == MastershipEvent.Type.BACKUPS_CHANGED) {
// Don't care if backup list changed.
return;
}
final DeviceId did = event.subject();
// myRole suggested by MastershipService
MastershipRole myNextRole;
if (localNodeId.equals(event.roleInfo().master())) {
if (event.type() == MastershipEvent.Type.SUSPENDED) {
myNextRole = NONE; // FIXME STANDBY OR NONE?
} else if (localNodeId.equals(event.roleInfo().master())) {
// confirm latest info
MastershipTerm term = termService.getMastershipTerm(did);
final boolean iHaveControl = term != null && localNodeId.equals(term.master());
......
......@@ -15,12 +15,16 @@
*/
package org.onosproject.store.cluster.impl;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -35,6 +39,7 @@ import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
......@@ -57,8 +62,10 @@ public class DistributedLeadershipStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private ExecutorService statusChangeHandler;
private NodeId localNodeId;
private LeaderElector leaderElector;
private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
......@@ -77,22 +84,54 @@ public class DistributedLeadershipStore
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
// Update local cache of currently held leaderships
if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
localLeaderCache.put(newValue.topic(), newValue);
} else {
localLeaderCache.remove(newValue.topic());
}
};
private final Consumer<Status> clientStatusListener = status ->
statusChangeHandler.execute(() -> handleStatusChange(status));
private void handleStatusChange(Status status) {
// Notify mastership Service of disconnect and reconnect
if (status == Status.ACTIVE) {
// Service Restored
localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
leaderElector.getLeaderships().forEach((topic, leadership) ->
notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
} else if (status == Status.SUSPENDED) {
// Service Suspended
localLeaderCache.forEach((topic, leadership) ->
notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
} else {
// Should be only inactive state
return;
}
}
@Activate
public void activate() {
statusChangeHandler = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
localNodeId = clusterService.getLocalNode().id();
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
leaderElector.addStatusChangeListener(clientStatusListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
leaderElector.removeStatusChangeListener(clientStatusListener);
statusChangeHandler.shutdown();
log.info("Stopped");
}
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -319,7 +320,8 @@ public class ConsistentDeviceMastershipStore
private void handleEvent(LeadershipEvent event) {
Leadership leadership = event.subject();
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
RoleInfo roleInfo = getNodes(deviceId);
RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
getNodes(deviceId) : new RoleInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
......@@ -331,6 +333,12 @@ public class ConsistentDeviceMastershipStore
case CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
case SERVICE_DISRUPTED:
notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
break;
case SERVICE_RESTORED:
// Do nothing, wait for updates from peers
break;
default:
return;
}
......
......@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
......@@ -63,6 +64,19 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private final LoadingCache<String, CompletableFuture<Leadership>> cache;
Function<CopycatClient.State, Status> mapper = state -> {
switch (state) {
case CONNECTED:
return Status.ACTIVE;
case SUSPENDED:
return Status.SUSPENDED;
case CLOSED:
return Status.INACTIVE;
default:
throw new IllegalStateException("Unknown state " + state);
}
};
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
cache = CacheBuilder.newBuilder()
......@@ -79,6 +93,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
}
};
addStatusChangeListener(statusListener);
client.onStateChange(this::handleStateChange);
}
@Override
......@@ -193,4 +208,8 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
}
private void handleStateChange(CopycatClient.State state) {
statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
}
}
......
......@@ -542,22 +542,20 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
private class InternalRoleListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
if (MastershipEvent.Type.MASTER_CHANGED.equals(event.type())) {
// only need new master events
return;
eventExecutor.execute(() -> {
DeviceId deviceId = event.subject();
Device device = deviceService.getDevice(deviceId);
if (device == null) {
log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
return;
}
if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
}
});
}
eventExecutor.execute(() -> {
DeviceId deviceId = event.subject();
Device device = deviceService.getDevice(deviceId);
if (device == null) {
log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
return;
}
if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
}
});
}
}
......