Madan Jampani
Committed by Brian O'Connor

Logging improvements.

Change-Id: I79b9ff16a0000e4bd72022f02baef5c779ea1b48
......@@ -15,14 +15,14 @@
*/
package org.onosproject.cli.net;
import java.util.Comparator;
import java.util.Map;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipService;
import java.util.Comparator;
import java.util.Map;
/**
* Prints the leader for every topic.
*/
......@@ -30,13 +30,15 @@ import java.util.Map;
description = "Finds the leader for particular topic.")
public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-20s: %15s %15s";
private static final String FMT = "%-20s | %-15s | %-6s |";
@Override
protected void execute() {
LeadershipService leaderService = get(LeadershipService.class);
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
print("-------------------------------------------------");
print(FMT, "Topic", "Leader", "Epoch");
print("-------------------------------------------------");
Comparator<Leadership> leadershipComparator =
(e1, e2) -> {
......@@ -56,6 +58,7 @@ public class LeaderCommand extends AbstractShellCommand {
.stream()
.sorted(leadershipComparator)
.forEach(l -> print(FMT, l.topic(), l.leader(), l.epoch()));
print("-------------------------------------------------");
}
}
......
package org.onosproject.store.cluster.impl;
import java.util.Set;
import org.onosproject.cluster.DefaultControllerNode;
import com.google.common.collect.ImmutableSet;
/**
* Cluster definition.
*/
public class ClusterDefinition {
private Set<DefaultControllerNode> nodes;
private String ipPrefix;
/**
* Creates a new cluster definition.
* @param nodes cluster nodes.
* @param ipPrefix ip prefix common to all cluster nodes.
* @return cluster definition
*/
public static ClusterDefinition from(Set<DefaultControllerNode> nodes, String ipPrefix) {
ClusterDefinition definition = new ClusterDefinition();
definition.ipPrefix = ipPrefix;
definition.nodes = ImmutableSet.copyOf(nodes);
return definition;
}
/**
* Returns set of cluster nodes.
* @return cluster nodes.
*/
public Set<DefaultControllerNode> nodes() {
return ImmutableSet.copyOf(nodes);
}
/**
* Returns ipPrefix in dotted decimal notion.
* @return ip prefix.
*/
public String ipPrefix() {
return ipPrefix;
}
}
\ No newline at end of file
......@@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
......@@ -53,7 +55,7 @@ public class ClusterDefinitionStore {
*
* @return set of controller nodes
*/
public Set<DefaultControllerNode> read() throws IOException {
public ClusterDefinition read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
......@@ -64,20 +66,23 @@ public class ClusterDefinitionStore {
IpAddress.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
return nodes;
String ipPrefix = clusterNodeDef.get("ipPrefix").asText();
return ClusterDefinition.from(nodes, ipPrefix);
}
/*
* Writes the given set of the controller nodes.
* Writes the given cluster definition.
*
* @param nodes set of controller nodes
* @param cluster definition
*/
public void write(Set<DefaultControllerNode> nodes) throws IOException {
public void write(ClusterDefinition definition) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
clusterNodeDef.set("ipPrefix", new TextNode(definition.ipPrefix()));
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : nodes) {
for (DefaultControllerNode node : definition.nodes()) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static org.onlab.util.Tools.groupedThreads;
......@@ -45,6 +60,7 @@ import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.hazelcast.util.AddressUtil;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -60,7 +76,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
private final Logger log = getLogger(getClass());
protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
listenerRegistry = new AbstractListenerRegistry<>();
listenerRegistry = new AbstractListenerRegistry<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
......@@ -73,7 +89,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
private static final String CONFIG_DIR = "../config";
private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
private ClusterDefinitionStore clusterDefinition;
private ClusterDefinition clusterDefinition;
private Set<ControllerNode> seedNodes;
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
......@@ -108,9 +124,10 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
public void activate() {
File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath());
try {
seedNodes = ImmutableSet.copyOf(clusterDefinition.read());
clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath()).read();
seedNodes = ImmutableSet.copyOf(clusterDefinition.nodes());
} catch (IOException e) {
log.warn("Failed to read cluster definition.", e);
}
......@@ -128,7 +145,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
messagingService.activate();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Failed to cleanly initialize membership and"
throw new IllegalStateException("Failed to cleanly initialize membership and"
+ " failure detector communication channel.", e);
}
messagingService.registerHandler(
......@@ -156,8 +173,8 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
log.trace("Failed to cleanly shutdown cluster membership messaging", e);
}
heartBeatSender.shutdown();
heartBeatMessageHandler.shutdown();
heartBeatSender.shutdownNow();
heartBeatMessageHandler.shutdownNow();
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
......@@ -287,7 +304,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
}
private class HeartbeatMessage {
private static class HeartbeatMessage {
private ControllerNode source;
private Set<ControllerNode> knownPeers;
......@@ -306,13 +323,16 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
private IpAddress findLocalIp() throws SocketException {
NetworkInterface ni = NetworkInterface.getByName("eth0");
Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress ia = inetAddresses.nextElement();
if (!ia.isLinkLocalAddress()) {
return IpAddress.valueOf(ia);
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.ipPrefix())) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -21,6 +36,7 @@ public class PhiAccrualFailureDetector {
// TODO: make these configurable.
private static final int WINDOW_SIZE = 250;
private static final int MIN_SAMPLES = 25;
private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
// If a node does not have any heartbeats, this is the phi
// value to report. Indicates the node is inactive (from the
......@@ -59,11 +75,11 @@ public class PhiAccrualFailureDetector {
* @param nodeId node id
* @return phi value
*/
public Double phi(NodeId nodeId) {
public double phi(NodeId nodeId) {
checkNotNull(nodeId, "NodeId must not be null");
if (!states.containsKey(nodeId)) {
return BOOTSTRAP_PHI_VALUE;
}
checkNotNull(nodeId, "NodeId must not be null");
History nodeState = states.get(nodeId);
synchronized (nodeState) {
long latestHeartbeat = nodeState.latestHeartbeatTime();
......@@ -79,7 +95,7 @@ public class PhiAccrualFailureDetector {
long size = samples.getN();
long t = tNow - tLast;
return (size > 0)
? (1.0 / Math.log(10.0)) * t / samples.getMean()
? PHI_FACTOR * t / samples.getMean()
: BOOTSTRAP_PHI_VALUE;
}
......
......@@ -150,7 +150,7 @@ public class ClusterCommunicationManager
messagingService.sendAsync(nodeEp, subject.value(), payload);
return true;
} catch (IOException e) {
log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
throw e;
}
}
......@@ -179,6 +179,7 @@ public class ClusterCommunicationManager
}
@Override
@Deprecated
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber) {
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
......@@ -210,13 +211,13 @@ public class ClusterCommunicationManager
try {
clusterMessage = SERIALIZER.decode(message.payload());
} catch (Exception e) {
log.error("Failed decoding ClusterMessage {}", message, e);
log.error("Failed decoding {}", message, e);
throw e;
}
try {
handler.handle(new InternalClusterMessage(clusterMessage, message));
} catch (Exception e) {
log.error("Exception caught handling {}", clusterMessage, e);
log.trace("Failed handling {}", clusterMessage, e);
throw e;
}
}
......
......@@ -87,7 +87,7 @@ public class DistributedLeadershipManager implements LeadershipService {
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
......@@ -134,7 +134,7 @@ public class DistributedLeadershipManager implements LeadershipService {
deadLockDetectionExecutor.scheduleWithFixedDelay(
this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
leadershipStatusBroadcaster.scheduleWithFixedDelay(
this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL, TimeUnit.SECONDS);
this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
......@@ -190,7 +190,7 @@ public class DistributedLeadershipManager implements LeadershipService {
@Override
public void runForLeadership(String path) {
log.info("Running for leadership for topic: {}", path);
log.trace("Running for leadership for topic: {}", path);
activeTopics.add(path);
tryLeaderLock(path);
}
......@@ -200,11 +200,11 @@ public class DistributedLeadershipManager implements LeadershipService {
activeTopics.remove(path);
try {
if (lockMap.remove(path, localNodeId)) {
log.info("Sucessfully gave up leadership for {}", path);
log.info("Gave up leadership for {}", path);
}
// else we are not the current owner.
} catch (Exception e) {
log.warn("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
}
}
......@@ -244,7 +244,7 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
} catch (Exception e) {
log.warn("Attempt to acquire leadership lock for topic {} failed", path, e);
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
retry(path);
}
}
......@@ -300,7 +300,7 @@ public class DistributedLeadershipManager implements LeadershipService {
LeadershipEvent leadershipEvent =
SERIALIZER.decode(message.payload());
log.trace("Leadership Event: time = {} type = {} event = {}",
log.debug("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
leadershipEvent);
......@@ -350,7 +350,7 @@ public class DistributedLeadershipManager implements LeadershipService {
log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch);
}
} catch (Exception e) {
......@@ -361,7 +361,7 @@ public class DistributedLeadershipManager implements LeadershipService {
log.info("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch);
}
} catch (Exception e) {
......@@ -370,20 +370,24 @@ public class DistributedLeadershipManager implements LeadershipService {
}
});
} catch (Exception e) {
log.warn("Failed cleaning up stale locks", e);
log.debug("Failed cleaning up stale locks", e);
}
}
private void sendLeadershipStatus() {
leaderBoard.forEach((path, leadership) -> {
if (leadership.leader().equals(localNodeId)) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
clusterCommunicator.broadcast(
new ClusterMessage(
clusterService.getLocalNode().id(),
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(event)));
}
});
try {
leaderBoard.forEach((path, leadership) -> {
if (leadership.leader().equals(localNodeId)) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
clusterCommunicator.broadcast(
new ClusterMessage(
clusterService.getLocalNode().id(),
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(event)));
}
});
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}
}
}
\ No newline at end of file
}
......
......@@ -10,7 +10,8 @@ remote=$ONOS_USER@${1:-$OCI}
# Generate a cluster.json from the ON* environment variables
CDEF_FILE=/tmp/${remote}.cluster.json
echo "{ \"nodes\":[" > $CDEF_FILE
echo "{ \"ipPrefix\": \"$ONOS_NIC\"," > $CDEF_FILE
echo " \"nodes\":[" >> $CDEF_FILE
for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE
done
......