Madan Jampani
Committed by Gerrit Code Review

ONOS-1326: Added support for observing when node liveness status was last update…

…d. Useful for detecting/debugging stability issues.

Change-Id: I8ffebcf3a09a51c6e3e7526986a0f05530ed757f
......@@ -15,17 +15,19 @@
*/
package org.onosproject.cli;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import static com.google.common.collect.Lists.newArrayList;
import java.util.Collections;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
import org.apache.karaf.shell.commands.Command;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
/**
* Lists all controller cluster nodes.
......@@ -35,7 +37,7 @@ import static com.google.common.collect.Lists.newArrayList;
public class NodesListCommand extends AbstractShellCommand {
private static final String FMT =
"id=%s, address=%s:%s, state=%s %s";
"id=%s, address=%s:%s, state=%s, updated=%s %s";
@Override
protected void execute() {
......@@ -49,6 +51,7 @@ public class NodesListCommand extends AbstractShellCommand {
for (ControllerNode node : nodes) {
print(FMT, node.id(), node.ip(), node.tcpPort(),
service.getState(node.id()),
Tools.timeAgo(service.getLastUpdated(node.id()).getMillis()),
node.equals(self) ? "*" : "");
}
}
......
......@@ -17,6 +17,8 @@ package org.onosproject.cluster;
import java.util.Set;
import org.joda.time.DateTime;
/**
* Service for obtaining information about the individual nodes within
* the controller cluster.
......@@ -54,6 +56,14 @@ public interface ClusterService {
ControllerNode.State getState(NodeId nodeId);
/**
* Returns the system time when the availability state was last updated.
*
* @param nodeId controller node identifier
* @return system time when the availability state was last updated.
*/
DateTime getLastUpdated(NodeId nodeId);
/**
* Adds the specified cluster event listener.
*
* @param listener the cluster listener
......
......@@ -15,11 +15,12 @@
*/
package org.onosproject.cluster;
import org.onosproject.store.Store;
import org.onlab.packet.IpAddress;
import java.util.Set;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.store.Store;
/**
* Manages inventory of controller cluster nodes; not intended for direct use.
*/
......@@ -56,6 +57,14 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
ControllerNode.State getState(NodeId nodeId);
/**
* Returns the system when the availability state was last updated.
*
* @param nodeId controller node identifier
* @return system time when the availability state was last updated.
*/
DateTime getLastUpdated(NodeId nodeId);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
......
......@@ -17,6 +17,8 @@ package org.onosproject.cluster;
import java.util.Set;
import org.joda.time.DateTime;
/**
* Test adapter for the cluster service.
*/
......@@ -42,6 +44,11 @@ public class ClusterServiceAdapter implements ClusterService {
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return null;
}
@Override
public void addListener(ClusterEventListener listener) {
}
......
......@@ -15,12 +15,20 @@
*/
package org.onosproject.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
......@@ -31,15 +39,8 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the cluster service.
*/
......@@ -97,6 +98,12 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
return store.getState(nodeId);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return store.getLastUpdated(nodeId);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
......
......@@ -38,6 +38,7 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
......@@ -99,6 +100,7 @@ public class DistributedClusterStore
private Set<ControllerNode> seedNodes;
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private NettyMessagingService messagingService = new NettyMessagingService();
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
......@@ -131,7 +133,7 @@ public class DistributedClusterStore
seedNodes.forEach(node -> {
allNodes.put(node.id(), node);
nodeStates.put(node.id(), State.INACTIVE);
updateState(node.id(), State.INACTIVE);
});
establishSelfIdentity();
......@@ -216,7 +218,7 @@ public class DistributedClusterStore
checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
nodeStates.put(nodeId, State.INACTIVE);
updateState(nodeId, State.INACTIVE);
delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
return node;
}
......@@ -231,12 +233,17 @@ public class DistributedClusterStore
}
}
private void updateState(NodeId nodeId, State newState) {
nodeStates.put(nodeId, newState);
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
}
private void establishSelfIdentity() {
try {
IpAddress ip = findLocalIp();
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
allNodes.put(localNode.id(), localNode);
nodeStates.put(localNode.id(), State.ACTIVE);
updateState(localNode.id(), State.ACTIVE);
log.info("Local Node: {}", localNode);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
......@@ -256,12 +263,12 @@ public class DistributedClusterStore
double phi = failureDetector.phi(node.id());
if (phi >= PHI_FAILURE_THRESHOLD) {
if (currentState == State.ACTIVE) {
nodeStates.put(node.id(), State.INACTIVE);
updateState(node.id(), State.INACTIVE);
notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
}
} else {
if (currentState == State.INACTIVE) {
nodeStates.put(node.id(), State.ACTIVE);
updateState(node.id(), State.ACTIVE);
notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
}
}
......@@ -334,4 +341,8 @@ public class DistributedClusterStore
}
}
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return nodeStateLastUpdatedTimes.get(nodeId);
}
}
\ No newline at end of file
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
......@@ -28,6 +29,7 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
......@@ -63,6 +65,7 @@ public class HazelcastClusterStore
private String listenerId;
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Map<NodeId, DateTime> lastUpdatedTimes = Maps.newConcurrentMap();
private String nodesListenerId;
......@@ -123,6 +126,11 @@ public class HazelcastClusterStore
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return lastUpdatedTimes.get(nodeId);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
......@@ -139,7 +147,7 @@ public class HazelcastClusterStore
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
updateState(node.id(), State.ACTIVE);
return node;
}
......@@ -153,6 +161,11 @@ public class HazelcastClusterStore
return IpAddress.valueOf(member.getSocketAddress().getAddress());
}
private void updateState(NodeId nodeId, State newState) {
updateState(nodeId, newState);
lastUpdatedTimes.put(nodeId, DateTime.now());
}
// Interceptor for membership events.
private class InternalMembershipListener implements MembershipListener {
@Override
......@@ -166,7 +179,7 @@ public class HazelcastClusterStore
public void memberRemoved(MembershipEvent membershipEvent) {
log.info("Member {} removed", membershipEvent.getMember());
NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
states.put(nodeId, State.INACTIVE);
updateState(nodeId, State.INACTIVE);
notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
}
......@@ -178,4 +191,4 @@ public class HazelcastClusterStore
memberAttributeEvent.getValue());
}
}
}
}
\ No newline at end of file
......
......@@ -15,11 +15,16 @@
*/
package org.onosproject.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
......@@ -29,12 +34,9 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.ImmutableSet;
/**
* Manages inventory of infrastructure devices using trivial in-memory
......@@ -52,6 +54,8 @@ public class SimpleClusterStore
private ControllerNode instance;
private final DateTime creationTime = DateTime.now();
@Activate
public void activate() {
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
......@@ -85,6 +89,11 @@ public class SimpleClusterStore
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return creationTime;
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return null;
}
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.trivial.impl;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
......@@ -33,6 +35,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
......@@ -47,14 +51,11 @@ import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.store.AbstractStore;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import static org.onosproject.mastership.MastershipEvent.Type.*;
/**
* Manages inventory of controller mastership over devices using
* trivial, non-distributed in-memory structures implementation.
......@@ -90,6 +91,8 @@ public class SimpleMastershipStore
clusterService = new ClusterService() {
private final DateTime creationTime = DateTime.now();
@Override
public ControllerNode getLocalNode() {
return instance;
......@@ -118,6 +121,11 @@ public class SimpleMastershipStore
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return creationTime;
}
@Override
public void addListener(ClusterEventListener listener) {
}
......