Thomas Vachuska
Committed by Gerrit Code Review

Added ability to track whether or not node has all components running fully.

Change-Id: Ib2b90c7a842976a3b3a9711367fa1eed43103b17
......@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.karaf.shell.commands.Command;
import org.joda.time.DateTime;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.utils.Comparators;
......@@ -36,15 +36,14 @@ import static com.google.common.collect.Lists.newArrayList;
* Lists all controller cluster nodes.
*/
@Command(scope = "onos", name = "nodes",
description = "Lists all controller cluster nodes")
description = "Lists all controller cluster nodes")
public class NodesListCommand extends AbstractShellCommand {
private static final String FMT =
"id=%s, address=%s:%s, state=%s, updated=%s %s";
private static final String FMT = "id=%s, address=%s:%s, state=%s, updated=%s %s";
@Override
protected void execute() {
ClusterService service = get(ClusterService.class);
ClusterAdminService service = get(ClusterAdminService.class);
List<ControllerNode> nodes = newArrayList(service.getNodes());
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
if (outputJson()) {
......@@ -58,26 +57,24 @@ public class NodesListCommand extends AbstractShellCommand {
timeAgo = Tools.timeAgo(lastUpdated.getMillis());
}
print(FMT, node.id(), node.ip(), node.tcpPort(),
service.getState(node.id()),
timeAgo,
service.getState(node.id()), timeAgo,
node.equals(self) ? "*" : "");
}
}
}
// Produces JSON structure.
private JsonNode json(ClusterService service, List<ControllerNode> nodes) {
private JsonNode json(ClusterAdminService service, List<ControllerNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
ControllerNode self = service.getLocalNode();
for (ControllerNode node : nodes) {
ControllerNode.State nodeState = service.getState(node.id());
ObjectNode newNode = mapper.createObjectNode()
.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort())
.put("self", node.equals(self));
.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort())
.put("self", node.equals(self));
if (nodeState != null) {
newNode.put("state", nodeState.toString());
}
......
......@@ -49,7 +49,7 @@ public class SummaryCommand extends AbstractShellCommand {
for (final ControllerNode node : nodes) {
final ControllerNode.State nodeState =
get(ClusterService.class).getState(node.id());
if (nodeState == ControllerNode.State.ACTIVE) {
if (nodeState.isActive()) {
nodeCount++;
}
}
......
......@@ -22,7 +22,7 @@ import java.util.Set;
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
public interface ClusterAdminService extends ClusterService {
/**
* Forms cluster configuration based on the specified set of node
......@@ -50,4 +50,11 @@ public interface ClusterAdminService {
*/
void removeNode(NodeId nodeId);
/**
* Marks the current node as fully started or not.
*
* @param started true indicates all components have been started
*/
void markFullyStarted(boolean started);
}
......
......@@ -50,7 +50,9 @@ public interface ClusterService
ControllerNode getNode(NodeId nodeId);
/**
* Returns the availability state of the specified controller node.
* Returns the availability state of the specified controller node. Note
* that this does not imply that all the core and application components
* have been fully activated; only that the node has joined the cluster.
*
* @param nodeId controller node identifier
* @return availability state
......
......@@ -57,6 +57,13 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
ControllerNode.State getState(NodeId nodeId);
/**
* Marks the current node as fully started.
*
* @param started true indicates all components have been started
*/
void markFullyStarted(boolean started);
/**
* Returns the system when the availability state was last updated.
*
* @param nodeId controller node identifier
......
......@@ -25,6 +25,12 @@ public interface ControllerNode {
/** Represents the operational state of the instance. */
enum State {
/**
* Signifies that the instance is active and that all components are
* operating normally.
*/
READY,
/**
* Signifies that the instance is active and operating normally.
*/
ACTIVE,
......@@ -33,7 +39,25 @@ public interface ControllerNode {
* Signifies that the instance is inactive, which means either down or
* up, but not operational.
*/
INACTIVE
INACTIVE;
/**
* Indicates whether the state represents node which is active or ready.
*
* @return true if active or ready
*/
public boolean isActive() {
return this == ACTIVE || this == READY;
}
/**
* Indicates whether the state represents a node which is ready.
*
* @return true if active and ready
*/
public boolean isReady() {
return this == READY;
}
}
/**
......
......@@ -67,6 +67,7 @@ public class SimpleClusterStore
protected EventDeliveryService eventDispatcher;
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private boolean started = false;
@Activate
public void activate() {
......@@ -106,6 +107,11 @@ public class SimpleClusterStore
}
@Override
public void markFullyStarted(boolean started) {
this.started = started;
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return creationTime;
}
......
......@@ -80,6 +80,11 @@
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency>
......@@ -93,6 +98,11 @@
<groupId>org.apache.karaf.system</groupId>
<artifactId>org.apache.karaf.system.core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.cluster.impl;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -44,9 +46,6 @@ import org.onosproject.cluster.PartitionId;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -58,8 +57,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;
/**
* Implementation of the cluster service.
......@@ -133,6 +132,10 @@ public class ClusterManager
return store.getState(nodeId);
}
@Override
public void markFullyStarted(boolean started) {
store.markFullyStarted(started);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import org.apache.felix.scr.Component;
import org.apache.felix.scr.ScrService;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.karaf.features.Feature;
import org.apache.karaf.features.FeaturesService;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cluster.ClusterAdminService;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Monitors the system to make sure that all bundles and their components
* are properly activated and keeps the cluster node service appropriately
* updated.
*/
@org.apache.felix.scr.annotations.Component(immediate = true)
public class ComponentsMonitor {
private Logger log = LoggerFactory.getLogger(getClass());
private static final long PERIOD = 2500;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FeaturesService featuresService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ScrService scrService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterAdminService clusterAdminService;
private BundleContext bundleContext;
private ScheduledFuture<?> poller;
@Activate
protected void activate(ComponentContext context) {
bundleContext = context.getBundleContext();
poller = SharedScheduledExecutors.getSingleThreadExecutor()
.scheduleAtFixedRate(this::checkStartedState, PERIOD,
PERIOD, TimeUnit.MILLISECONDS);
log.info("Started");
}
@Deactivate
protected void deactivate() {
poller.cancel(false);
log.info("Stopped");
}
private void checkStartedState() {
clusterAdminService.markFullyStarted(isFullyStarted());
}
/**
* Scans the system to make sure that all bundles and their components
* are fully started.
*
* @return true if all bundles and their components are active
*/
private boolean isFullyStarted() {
for (Feature feature : featuresService.listInstalledFeatures()) {
if (!isFullyStarted(feature)) {
return false;
}
}
return true;
}
private boolean isFullyStarted(Feature feature) {
return feature.getBundles().stream()
.map(info -> bundleContext.getBundle(info.getLocation()))
.allMatch(this::isFullyStarted);
}
private boolean isFullyStarted(Bundle bundle) {
Component[] components = scrService.getComponents(bundle);
if (components != null) {
for (Component component : components) {
if (!isFullyStarted(component)) {
return false;
}
}
}
return true;
}
private boolean isFullyStarted(Component component) {
int state = component.getState();
return state == Component.STATE_ACTIVE || state == Component.STATE_DISABLED ||
(state == Component.STATE_REGISTERED && !component.isImmediate());
}
}
......@@ -76,7 +76,7 @@ public class LeadershipManager
deadlockDetector.scheduleWithFixedDelay(() -> clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.filter(id -> clusterService.getState(id) != ControllerNode.State.ACTIVE)
.filter(id -> !clusterService.getState(id).isActive())
.forEach(this::unregister), 0, 2, TimeUnit.SECONDS);
log.info("Started");
}
......
......@@ -30,8 +30,8 @@ import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.core.MetricsHelper;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
......@@ -57,11 +57,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.metrics.MetricsUtil.startTimer;
import static org.onlab.metrics.MetricsUtil.stopTimer;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;
......@@ -204,7 +204,7 @@ public class MastershipManager
// Create buckets reflecting current ownership.
for (ControllerNode node : nodes) {
if (clusterService.getState(node.id()) == ACTIVE) {
if (clusterService.getState(node.id()).isActive()) {
Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
deviceCount += devicesOf.size();
controllerDevices.put(node, devicesOf);
......
......@@ -18,7 +18,6 @@ package org.onosproject.store.cluster.impl;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -90,6 +89,7 @@ public class DistributedClusterStore
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
......@@ -168,6 +168,11 @@ public class DistributedClusterStore
}
@Override
public void markFullyStarted(boolean started) {
updateState(localNode.id(), started ? State.READY : State.ACTIVE);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
addNode(node);
......@@ -201,13 +206,14 @@ public class DistributedClusterStore
.stream()
.filter(node -> !(node.id().equals(localNode.id())))
.collect(Collectors.toSet());
byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
State state = nodeStates.get(localNode.id());
byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state, peers));
peers.forEach((node) -> {
heartbeatToPeer(hbMessagePayload, node);
State currentState = nodeStates.get(node.id());
double phi = failureDetector.phi(node.id());
if (phi >= PHI_FAILURE_THRESHOLD) {
if (currentState == State.ACTIVE) {
if (currentState.isActive()) {
updateState(node.id(), State.INACTIVE);
notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
}
......@@ -225,7 +231,7 @@ public class DistributedClusterStore
private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
ControllerNode node = allNodes.get(nodeId);
if (newState == State.ACTIVE) {
if (newState.isActive()) {
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
} else {
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
......@@ -246,6 +252,7 @@ public class DistributedClusterStore
public void accept(Endpoint sender, byte[] message) {
HeartbeatMessage hb = SERIALIZER.decode(message);
failureDetector.report(hb.source().id());
updateState(hb.source().id(), hb.state);
hb.knownPeers().forEach(node -> {
allNodes.put(node.id(), node);
});
......@@ -254,10 +261,12 @@ public class DistributedClusterStore
private static class HeartbeatMessage {
private ControllerNode source;
private State state;
private Set<ControllerNode> knownPeers;
public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
public HeartbeatMessage(ControllerNode source, State state, Set<ControllerNode> members) {
this.source = source;
this.state = state != null ? state : State.ACTIVE;
this.knownPeers = ImmutableSet.copyOf(members);
}
......
......@@ -22,7 +22,6 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
......@@ -30,10 +29,10 @@ import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -173,7 +172,7 @@ public class IntentPartitionManager implements IntentPartitionService {
private void rebalance() {
int activeNodes = (int) clusterService.getNodes()
.stream()
.filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
.filter(node -> clusterService.getState(node.id()).isActive())
.count();
int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
......
......@@ -556,7 +556,7 @@ public class EventuallyConsistentMapImpl<K, V>
.stream()
.map(ControllerNode::id)
.filter(id -> !localNodeId.equals(id))
.filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
.filter(id -> clusterService.getState(id).isActive())
.collect(Collectors.toList());
Collections.shuffle(activePeers);
return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
......
......@@ -200,7 +200,7 @@ public class MutexExecutionManager implements MutexExecutionService {
long activeNodes = clusterService.getNodes()
.stream()
.map(node -> clusterService.getState(node.id()))
.filter(State.ACTIVE::equals)
.filter(State::isActive)
.count();
if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
......
......@@ -262,6 +262,12 @@
<version>1.9.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.karaf.features</groupId>
......
......@@ -44,10 +44,11 @@ public class ClusterViewMessageHandler extends UiMessageHandler {
private static final String IP = "ip";
private static final String TCP_PORT = "tcp";
private static final String STATE_IID = "_iconid_state";
private static final String STARTED_IID = "_iconid_started";
private static final String UPDATED = "updated";
private static final String[] COL_IDS = {
ID, IP, TCP_PORT, STATE_IID, UPDATED
ID, IP, TCP_PORT, STATE_IID, STARTED_IID, UPDATED
};
private static final String ICON_ID_ONLINE = "active";
......@@ -95,13 +96,15 @@ public class ClusterViewMessageHandler extends UiMessageHandler {
ClusterService cs) {
NodeId id = node.id();
DateTime lastUpdated = cs.getLastUpdated(id);
String iconId = (cs.getState(id) == ControllerNode.State.ACTIVE) ?
ICON_ID_ONLINE : ICON_ID_OFFLINE;
ControllerNode.State state = cs.getState(id);
String iconId = state.isActive() ? ICON_ID_ONLINE : ICON_ID_OFFLINE;
String startedId = state.isReady() ? ICON_ID_ONLINE : ICON_ID_OFFLINE;
row.cell(ID, id)
.cell(IP, node.ip())
.cell(TCP_PORT, node.tcpPort())
.cell(STATE_IID, iconId)
.cell(STARTED_IID, startedId)
.cell(UPDATED, lastUpdated);
}
}
......
......@@ -81,7 +81,6 @@ import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.DefaultEdgeLink.createEdgeLink;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.ui.topo.TopoConstants.CoreButtons;
......@@ -230,7 +229,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
ObjectNode payload = objectNode()
.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("online", clusterService.getState(node.id()) == ACTIVE)
.put("online", clusterService.getState(node.id()).isActive())
.put("uiAttached", node.equals(clusterService.getLocalNode()))
.put("switches", switchCount);
......
......@@ -30,7 +30,8 @@
<div class="table-header" onos-sortable-header>
<table>
<tr>
<td colId="_iconid_state" class="table-icon" sortable></td>
<td colId="_iconid_state" class="table-icon" col-width="60px" sortable>Active </td>
<td colId="_iconid_started" class="table-icon" col-width="60px" sortable>Started </td>
<td colId="id" sortable>ID </td>
<td colId="ip" sortable>IP Address </td>
<td colId="tcp" sortable>TCP Port </td>
......@@ -52,6 +53,9 @@
<td class="table-icon">
<div icon icon-id="{{node._iconid_state}}"></div>
</td>
<td class="table-icon">
<div icon icon-id="{{node._iconid_started}}"></div>
</td>
<td>{{node.id}}</td>
<td>{{node.ip}}</td>
<td>{{node.tcp}}</td>
......