HIGUCHI Yuta
Committed by Gerrit Code Review

Bug fixes for ONOS-3509

- Forwarding behavior added to {Device,Link}Store by ONOS-490
  cauesed false update information sent from ONOS node, which has been detached from the cluster,
  to be accepted by rest of the cluster after the detached node has rejoined cluster.

- Fix for periodic mastership check was left out
  when MastershipService#requestRoleFor(..) return value was changed to Future.

- Fix for triggerProbe() related messages getting dropped,
  right after STANDBY -> MASTER role change.

- Local state (connectedDevices) was preventing
  vertical (Core -> switch) Mastership state synchronization.

- Various debug log, comment added during investigation.

Change-Id: I777beadf04db8a879830a07bfdc7ab0e2279f190
......@@ -31,20 +31,23 @@ public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leaders
*/
public enum Type {
/**
* Signifies that the leader has been elected. The event subject is the
* new leader.
* Signifies that the leader has been elected.
* The event subject is the new leader.
* This event does not guarantee accurate candidate information.
*/
LEADER_ELECTED,
/**
* Signifies that the leader has been re-elected. The event subject is the
* leader.
* Signifies that the leader has been re-elected.
* The event subject is the leader.
* This event does not guarantee accurate candidate information.
*/
LEADER_REELECTED,
/**
* Signifies that the leader has been booted and lost leadership. The
* event subject is the former leader.
* Signifies that the leader has been booted and lost leadership.
* The event subject is the former leader.
* This event does not guarantee accurate candidate information.
*/
LEADER_BOOTED,
......
......@@ -491,9 +491,12 @@ public class DeviceManager
if (Objects.equals(requested, mastershipService.getLocalRole(deviceId))) {
return;
} else {
return;
// FIXME roleManager got the device to comply, but doesn't agree with
log.warn("Role mismatch on {}. set to {}, but store demands {}",
deviceId, response, mastershipService.getLocalRole(deviceId));
// roleManager got the device to comply, but doesn't agree with
// the store; use the store's view, then try to reassert.
backgroundService.submit(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId)));
return;
}
} else {
// we didn't get back what we asked for. Reelect someone else.
......@@ -547,6 +550,7 @@ public class DeviceManager
provider.roleChanged(deviceId, newRole);
if (newRole.equals(MastershipRole.MASTER)) {
log.debug("sent TriggerProbe({})", deviceId);
// only trigger event when request was sent to provider
provider.triggerProbe(deviceId);
}
......@@ -565,12 +569,19 @@ public class DeviceManager
MastershipRole myNextRole = nextRole;
if (myNextRole == NONE) {
mastershipService.requestRoleFor(did);
MastershipTerm term = termService.getMastershipTerm(did);
if (term != null && localNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
try {
mastershipService.requestRoleFor(did).get();
MastershipTerm term = termService.getMastershipTerm(did);
if (term != null && localNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted waiting for Mastership", e);
} catch (ExecutionException e) {
log.error("Encountered an error waiting for Mastership", e);
}
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.cluster.impl;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
......@@ -163,7 +164,7 @@ public class DistributedClusterStore
@Override
public State getState(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
return nodeStates.get(nodeId);
return MoreObjects.firstNonNull(nodeStates.get(nodeId), State.INACTIVE);
}
@Override
......
......@@ -96,17 +96,26 @@ public class DistributedLeadershipManager implements LeadershipService {
protected EventDeliveryService eventDispatcher;
private final Logger log = getLogger(getClass());
private ScheduledExecutorService electionRunner;
private ScheduledExecutorService lockExecutor;
private ScheduledExecutorService staleLeadershipPurgeExecutor;
private ScheduledExecutorService leadershipRefresher;
// leader for each topic
private ConsistentMap<String, NodeId> leaderMap;
// list of candidates (includes chosen leader) for each topic
private ConsistentMap<String, List<NodeId>> candidateMap;
private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
// cached copy of leaderMap
// Note: Map value, Leadership, does not contain proper candidates info
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
// cached copy of candidateMap
// Note: Map value, Leadership, does not contain proper leader info
private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
private NodeId localNodeId;
......
......@@ -286,6 +286,11 @@ public class ECDeviceStore
deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
return refreshDeviceCache(providerId, deviceId);
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
return Futures.getUnchecked(
clusterCommunicator.sendAndReceive(deviceInjectedEvent,
......@@ -413,6 +418,11 @@ public class ECDeviceStore
});
deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
if (master == null) {
return Collections.emptyList();
}
......
......@@ -330,6 +330,11 @@ public class GossipDeviceStore
}
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
// FIXME Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
// redo ONOS-490
......@@ -579,6 +584,11 @@ public class GossipDeviceStore
}
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
// FIXME Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
// redo ONOS-490
......
......@@ -218,6 +218,13 @@ public class ECLinkStore
linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription));
return refreshLinkCache(linkKey);
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
// Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
if (dstNodeId == null) {
return null;
}
......
......@@ -323,6 +323,11 @@ public class GossipLinkStore
}
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.equals("cfg")) {
return null;
}
// FIXME Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
// redo ONOS-490
......
......@@ -159,20 +159,12 @@ public class ConsistentDeviceMastershipStore
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
if (connectedDevices.add(deviceId)) {
return leadershipService.runForLeadership(leadershipTopic)
.thenApply(leadership -> {
return Objects.equal(localNodeId, leadership.leader())
? MastershipRole.MASTER : MastershipRole.STANDBY;
});
} else {
NodeId leader = leadershipService.getLeader(leadershipTopic);
if (Objects.equal(localNodeId, leader)) {
return CompletableFuture.completedFuture(MastershipRole.MASTER);
} else {
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
}
}
connectedDevices.add(deviceId);
return leadershipService.runForLeadership(leadershipTopic)
.thenApply(leadership -> {
return Objects.equal(localNodeId, leadership.leader())
? MastershipRole.MASTER : MastershipRole.STANDBY;
});
}
@Override
......
......@@ -97,6 +97,9 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
// messagesPendingMastership is used as synchronization variable for
// all mastership related changes. In this block, mastership (including
// role update) will have either occurred or not.
private final AtomicReference<List<OFMessage>> messagesPendingMastership
= new AtomicReference<>();
......@@ -275,6 +278,8 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
public final void handleMessage(OFMessage m) {
if (this.role == RoleState.MASTER || m instanceof OFPortStatus) {
this.agent.processMessage(dpid, m);
} else {
log.trace("Dropping received message {}, was not MASTER", m);
}
}
......@@ -309,7 +314,8 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
synchronized (messagesPendingMastership) {
List<OFMessage> messages = messagesPendingMastership.get();
if (messages != null) {
this.sendMsg(messages);
// Cannot use sendMsg here. It will only append to pending list.
sendMsgsOnChannel(messages);
log.debug("Sending {} pending messages to switch {}",
messages.size(), dpid);
messagesPendingMastership.set(null);
......
......@@ -73,12 +73,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true)
......@@ -118,11 +118,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<>();
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
new ConcurrentHashMap<>();
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
protected ConcurrentMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
new ConcurrentHashMap<>();
protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
......@@ -280,6 +280,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
executorMsgs.submit(new OFMessageHandler(dpid, msg));
break;
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
executorMsgs.submit(new OFMessageHandler(dpid, msg));
break;
case STATS_REPLY:
......
......@@ -64,6 +64,7 @@ import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceEvent.Type;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
......@@ -568,6 +569,9 @@ public class LldpLinkProvider extends AbstractProvider implements LinkProvider {
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
if (event.type() == Type.PORT_STATS_UPDATED) {
return;
}
Device device = event.subject();
Port port = event.port();
if (device == null) {
......
......@@ -268,7 +268,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
LOG.error("Unknown Mastership state : {}", newRole);
}
LOG.debug("Accepting mastership role change for device {}", deviceId);
LOG.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
}
......@@ -297,7 +297,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
}
private void pushPortMetrics(Dpid dpid, List<OFPortStatsEntry> portStatsEntries) {
DeviceId deviceId = DeviceId.deviceId(dpid.uri(dpid));
DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
Collection<PortStatistics> stats = buildPortStatistics(deviceId, portStatsEntries);
providerService.updatePortStatistics(deviceId, stats);
}
......@@ -434,6 +434,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
@Override
public void switchChanged(Dpid dpid) {
LOG.debug("switchChanged({})", dpid);
if (providerService == null) {
return;
}
......@@ -442,17 +443,21 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
if (sw == null) {
return;
}
providerService.updatePorts(did, buildPortDescriptions(sw));
final List<PortDescription> ports = buildPortDescriptions(sw);
LOG.debug("switchChanged({}) {}", did, ports);
providerService.updatePorts(did, ports);
}
@Override
public void portChanged(Dpid dpid, OFPortStatus status) {
LOG.debug("portChanged({},{})", dpid, status);
PortDescription portDescription = buildPortDescription(status);
providerService.portStatusChanged(deviceId(uri(dpid)), portDescription);
}
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
LOG.debug("receivedRoleReply({},{},{})", dpid, requested, response);
MastershipRole request = roleOf(requested);
MastershipRole reply = roleOf(response);
providerService.receivedRoleReply(deviceId(uri(dpid)), request, reply);
......@@ -503,7 +508,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
LOG.debug("Ports Of{}", portsOf);
portsOf.forEach(
op -> {
portDescs.add(buildPortDescription(type, (OFObject) op));
portDescs.add(buildPortDescription(type, op));
}
);
});
......
......@@ -353,10 +353,12 @@ public class OpenFlowGroupProvider extends AbstractProvider implements GroupProv
return;
}
if (isGroupSupported(sw)) {
GroupStatsCollector gsc = new GroupStatsCollector(
controller.getSwitch(dpid), POLL_INTERVAL);
GroupStatsCollector gsc = new GroupStatsCollector(sw, POLL_INTERVAL);
gsc.start();
collectors.put(dpid, gsc);
GroupStatsCollector prevGsc = collectors.put(dpid, gsc);
if (prevGsc != null) {
prevGsc.stop();
}
}
//figure out race condition
......