Madan Jampani

Added creationTime to Versioned object. This enables supporting a electedTime in…

… leadership, which in turn helps us track how stable leadership terms are.

Change-Id: Ib051027625324646152ed85535ba337e95f8a061
......@@ -19,6 +19,7 @@ import java.util.Comparator;
import java.util.Map;
import org.apache.karaf.shell.commands.Command;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipService;
......@@ -30,15 +31,16 @@ import org.onosproject.cluster.LeadershipService;
description = "Finds the leader for particular topic.")
public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-20s | %-15s | %-6s |";
private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
@Override
protected void execute() {
LeadershipService leaderService = get(LeadershipService.class);
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
print("-------------------------------------------------");
print(FMT, "Topic", "Leader", "Epoch");
print("-------------------------------------------------");
print("--------------------------------------------------------------");
print(FMT, "Topic", "Leader", "Epoch", "Elected");
print("--------------------------------------------------------------");
Comparator<Leadership> leadershipComparator =
(e1, e2) -> {
......@@ -57,8 +59,11 @@ public class LeaderCommand extends AbstractShellCommand {
leaderBoard.values()
.stream()
.sorted(leadershipComparator)
.forEach(l -> print(FMT, l.topic(), l.leader(), l.epoch()));
print("-------------------------------------------------");
.forEach(l -> print(FMT,
l.topic(),
l.leader(),
l.epoch(),
Tools.timeAgo(l.electedTime())));
print("--------------------------------------------------------------");
}
}
\ No newline at end of file
......
......@@ -17,6 +17,8 @@ package org.onosproject.cluster;
import java.util.Objects;
import org.joda.time.DateTime;
import com.google.common.base.MoreObjects;
/**
......@@ -27,11 +29,13 @@ public class Leadership {
private final String topic;
private final NodeId leader;
private final long epoch;
private final long electedTime;
public Leadership(String topic, NodeId leader, long epoch) {
public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
this.topic = topic;
this.leader = leader;
this.epoch = epoch;
this.electedTime = electedTime;
}
/**
......@@ -52,12 +56,31 @@ public class Leadership {
/**
* The epoch when the leadership was assumed.
* <p>
* Comparing epochs is only appropriate for leadership
* events for the same topic. The system guarantees that
* for any given topic the epoch for a new term is higher
* (not necessarily by 1) than the epoch for any previous term.
* @return leadership epoch
*/
public long epoch() {
return epoch;
}
/**
* The system time when the term started.
* <p>
* The elected time is initially set on the node coordinating
* the leader election using its local system time. Due to possible
* clock skew, relying on this value for determining event ordering
* is discouraged. Epoch is more appropriate for determining
* event ordering.
* @return elected time.
*/
public long electedTime() {
return electedTime;
}
@Override
public int hashCode() {
return Objects.hash(topic, leader, epoch);
......@@ -72,7 +95,8 @@ public class Leadership {
final Leadership other = (Leadership) obj;
return Objects.equals(this.topic, other.topic) &&
Objects.equals(this.leader, other.leader) &&
Objects.equals(this.epoch, other.epoch);
Objects.equals(this.epoch, other.epoch) &&
Objects.equals(this.electedTime, other.electedTime);
}
return false;
}
......@@ -83,6 +107,7 @@ public class Leadership {
.add("topic", topic)
.add("leader", leader)
.add("epoch", epoch)
.add("electedTime", new DateTime(electedTime))
.toString();
}
}
......
......@@ -16,6 +16,8 @@
package org.onosproject.store.service;
import org.joda.time.DateTime;
import com.google.common.base.MoreObjects;
/**
......@@ -27,15 +29,28 @@ public class Versioned<V> {
private final V value;
private final long version;
private final long creationTime;
/**
* Constructs a new versioned value.
* @param value value
* @param version version
* @param creationTime milliseconds of the creation event
* from the Java epoch of 1970-01-01T00:00:00Z
*/
public Versioned(V value, long version) {
public Versioned(V value, long version, long creationTime) {
this.value = value;
this.version = version;
this.creationTime = System.currentTimeMillis();
}
/**
* Constructs a new versioned value.
* @param value value
* @param version version
*/
public Versioned(V value, long version) {
this(value, version, System.currentTimeMillis());
}
/**
......@@ -56,11 +71,26 @@ public class Versioned<V> {
return version;
}
/**
* Returns the system time when this version was created.
* <p>
* Care should be taken when relying on creationTime to
* implement any behavior in a distributed setting. Due
* to the possibility of clock skew it is likely that
* even creationTimes of causally related versions can be
* out or order.
* @return creation time
*/
public long creationTime() {
return creationTime;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("value", value)
.add("version", version)
.add("creationTime", new DateTime(creationTime))
.toString();
}
}
......
......@@ -170,7 +170,8 @@ public class HazelcastLeadershipService implements LeadershipService {
if (topic != null) {
return new Leadership(topic.topicName(),
topic.leader(),
topic.term());
topic.term(),
0);
}
return null;
}
......@@ -215,7 +216,8 @@ public class HazelcastLeadershipService implements LeadershipService {
for (Topic topic : topics.values()) {
Leadership leadership = new Leadership(topic.topicName(),
topic.leader(),
topic.term());
topic.term(),
0);
result.put(topic.topicName(), leadership);
}
return result;
......@@ -412,7 +414,7 @@ public class HazelcastLeadershipService implements LeadershipService {
//
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
// Dispatch to all instances
clusterCommunicator.broadcastIncludeSelf(
......@@ -431,7 +433,7 @@ public class HazelcastLeadershipService implements LeadershipService {
topicName, leader);
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, leader, myLastLeaderTerm));
new Leadership(topicName, leader, myLastLeaderTerm, 0));
// Dispatch only to the local listener(s)
eventDispatcher.post(leadershipEvent);
leader = null;
......@@ -487,7 +489,7 @@ public class HazelcastLeadershipService implements LeadershipService {
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
......@@ -515,7 +517,7 @@ public class HazelcastLeadershipService implements LeadershipService {
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
......
......@@ -105,7 +105,13 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
public Versioned<V> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
if (value == null) {
return null;
}
return new Versioned<>(
serializer.decode(value.value()),
value.version(),
value.creationTime());
}
@Override
......@@ -114,16 +120,26 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> previousValue =
complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
return (previousValue != null) ?
new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
if (previousValue == null) {
return null;
}
return new Versioned<>(
serializer.decode(previousValue.value()),
previousValue.version(),
previousValue.creationTime());
}
@Override
public Versioned<V> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
if (value == null) {
return null;
}
return new Versioned<>(
serializer.decode(value.value()),
value.version(),
value.creationTime());
}
@Override
......@@ -143,7 +159,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
public Collection<Versioned<V>> values() {
return Collections.unmodifiableList(complete(proxy.values(name))
.stream()
.map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
.map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
.collect(Collectors.toList()));
}
......@@ -161,8 +177,13 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)));
return (existingValue != null) ?
new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
if (existingValue == null) {
return null;
}
return new Versioned<>(
serializer.decode(existingValue.value()),
existingValue.version(),
existingValue.creationTime());
}
@Override
......@@ -212,6 +233,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
dK(e.getKey()),
new Versioned<>(
serializer.decode(e.getValue().value()),
e.getValue().version()));
e.getValue().version(),
e.getValue().creationTime()));
}
}
\ No newline at end of file
......
......@@ -227,7 +227,7 @@ public class DistributedLeadershipManager implements LeadershipService {
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
notifyNewLeader(path, localNodeId, currentLeader.version());
notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
} else {
// someone else has leadership. will retry after sometime.
retry(path);
......@@ -237,7 +237,7 @@ public class DistributedLeadershipManager implements LeadershipService {
log.info("Assumed leadership for {}", path);
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = lockMap.get(path);
notifyNewLeader(path, localNodeId, newLeader.version());
notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
} else {
// someone beat us to it.
retry(path);
......@@ -249,8 +249,8 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
private void notifyNewLeader(String path, NodeId leader, long epoch) {
Leadership newLeadership = new Leadership(path, leader, epoch);
private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
......@@ -271,8 +271,8 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
private void notifyRemovedLeader(String path, NodeId leader, long epoch) {
Leadership oldLeadership = new Leadership(path, leader, epoch);
private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
......@@ -346,12 +346,13 @@ public class DistributedLeadershipManager implements LeadershipService {
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
long epoch = entry.getValue().version();
long creationTime = entry.getValue().creationTime();
if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch);
notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
......@@ -362,7 +363,7 @@ public class DistributedLeadershipManager implements LeadershipService {
try {
if (lockMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch);
notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
......
......@@ -61,7 +61,7 @@ public class SimpleLeadershipManager implements LeadershipService {
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0) : null;
return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null;
}
@Override
......@@ -79,7 +79,7 @@ public class SimpleLeadershipManager implements LeadershipService {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
new Leadership(path, clusterService.getLocalNode().id(), 0)));
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
}
......@@ -88,7 +88,7 @@ public class SimpleLeadershipManager implements LeadershipService {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
new Leadership(path, clusterService.getLocalNode().id(), 0)));
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
}
......
......@@ -18,6 +18,7 @@ package org.onlab.util;
import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import java.io.BufferedReader;
......@@ -239,6 +240,29 @@ public abstract class Tools {
}
}
/**
* Returns a human friendly time ago string for a specified system time.
* @param unixTime system time in millis
* @return human friendly time ago
*/
public static String timeAgo(long unixTime) {
long deltaMillis = System.currentTimeMillis() - unixTime;
long secondsSince = (long) (deltaMillis / 1000.0);
long minsSince = (long) (deltaMillis / (1000.0 * 60));
long hoursSince = (long) (deltaMillis / (1000.0 * 60 * 60));
long daysSince = (long) (deltaMillis / (1000.0 * 60 * 60 * 24));
if (daysSince > 0) {
return String.format("%dd ago", daysSince);
} else if (hoursSince > 0) {
return String.format("%dh ago", hoursSince);
} else if (minsSince > 0) {
return String.format("%dm ago", minsSince);
} else if (secondsSince > 0) {
return String.format("%ds ago", secondsSince);
} else {
return "just now";
}
}
/**
* Copies the specified directory path.&nbsp;Use with great caution since
......