Madan Jampani
Committed by Gerrit Code Review

Added a LogicalClockService for ordering arbitrary events in the cluster. Update…

…d couple of areas that are currently vulnerable to clock skew

Change-Id: I14548ecb3c783104de8d72cbb5eb21de6ece08ed
......@@ -33,7 +33,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
description = "Lists information about atomic counters in the system")
public class CountersListCommand extends AbstractShellCommand {
private static final String FMT = "name=%s next_value=%d";
private static final String FMT = "name=%s value=%d";
/**
* Displays counters as text.
......@@ -41,7 +41,7 @@ public class CountersListCommand extends AbstractShellCommand {
* @param mapInfo map descriptions
*/
private void displayCounters(Map<String, Long> counters) {
counters.forEach((name, nextValue) -> print(FMT, name, nextValue));
counters.forEach((name, value) -> print(FMT, name, value));
}
/**
......
......@@ -37,7 +37,7 @@ public class MapsListCommand extends AbstractShellCommand {
// TODO: Add support to display different eventually
// consistent maps as well.
private static final String FMT = "%-20s %8s";
private static final String FMT = "name=%s size=%d";
/**
* Displays map info as text.
......@@ -45,17 +45,9 @@ public class MapsListCommand extends AbstractShellCommand {
* @param mapInfo map descriptions
*/
private void displayMaps(List<MapInfo> mapInfo) {
print("------------------------------");
print(FMT, "Name", "Size");
print("------------------------------");
for (MapInfo info : mapInfo) {
print(FMT, info.name(), info.size());
}
if (mapInfo.size() > 0) {
print("------------------------------");
}
}
/**
......
package org.onosproject.store.service;
import org.onosproject.store.Timestamp;
/**
* Service that issues logical timestamps.
* <p>
* The logical timestamps are useful for establishing a total ordering of
* arbitrary cluster wide events without relying on a fully synchronized
* system clock (wall clock)
*/
public interface LogicalClockService {
/**
* Generates a new logical timestamp.
*
* @return timestamp
*/
Timestamp getTimestamp();
}
......@@ -18,6 +18,7 @@ package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -44,12 +45,11 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
......@@ -61,8 +61,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
......@@ -115,9 +113,10 @@ public class GossipApplicationStore extends ApplicationArchive
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
protected LogicalClockService clockService;
private final AtomicLong sequence = new AtomicLong();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
@Activate
public void activate() {
......@@ -135,24 +134,16 @@ public class GossipApplicationStore extends ApplicationArchive
// FIXME: Consider consolidating into a single map.
ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
sequence.incrementAndGet());
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
.withClockService(appsClockService)
.withClockService((k, v) -> clockService.getTimestamp())
.build();
ClockService<Application, InternalState> statesClockService = (app, state) ->
new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
sequence.incrementAndGet());
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
.withClockService(statesClockService)
.withClockService((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
......@@ -160,7 +151,7 @@ public class GossipApplicationStore extends ApplicationArchive
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
.withClockService(new WallclockClockManager<>())
.withClockService((k, v) -> clockService.getTimestamp())
.build();
log.info("Started");
......
......@@ -26,11 +26,11 @@ import org.onosproject.cfg.ComponentConfigEvent;
import org.onosproject.cfg.ComponentConfigStore;
import org.onosproject.cfg.ComponentConfigStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
......@@ -59,6 +59,9 @@ public class GossipComponentConfigStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
......@@ -67,7 +70,7 @@ public class GossipComponentConfigStore
properties = storageService.<String, String>eventuallyConsistentMapBuilder()
.withName("cfg")
.withSerializer(serializer)
.withClockService(new WallclockClockManager<>())
.withClockService((k, v) -> clockService.getTimestamp())
.build();
properties.addListener(new InternalPropertiesListener());
......
package org.onosproject.store.core.impl;
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
/**
* LogicalClockService implementation based on a AtomicCounter.
*/
@Component(immediate = true, enabled = true)
@Service
public class LogicalClockManager implements LogicalClockService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private static final String SYSTEM_LOGICAL_CLOCK_COUNTER_NAME = "sys-clock-counter";
private AtomicCounter atomicCounter;
@Activate
public void activate() {
atomicCounter = storageService.atomicCounterBuilder()
.withName(SYSTEM_LOGICAL_CLOCK_COUNTER_NAME)
.withPartitionsDisabled()
.build();
log.info("Started.");
}
@Deactivate
public void deactivate() {
log.info("Stopped.");
}
@Override
public Timestamp getTimestamp() {
return new LogicalTimestamp(atomicCounter.incrementAndGet());
}
}
\ No newline at end of file
......@@ -18,6 +18,7 @@ package org.onosproject.store.ecmap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
......@@ -32,6 +33,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
......@@ -223,6 +225,7 @@ public class EventuallyConsistentMapImpl<K, V>
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(RemoveEntry.class)
......
package org.onosproject.store.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
import org.onosproject.store.Timestamp;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
/**
* Timestamp based on logical sequence value.
* <p>
* LogicalTimestamps are ordered by their sequence values.
*/
public class LogicalTimestamp implements Timestamp {
private final long value;
public LogicalTimestamp(long value) {
this.value = value;
}
@Override
public int compareTo(Timestamp o) {
checkArgument(o instanceof LogicalTimestamp,
"Must be LogicalTimestamp", o);
LogicalTimestamp that = (LogicalTimestamp) o;
return ComparisonChain.start()
.compare(this.value, that.value)
.result();
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LogicalTimestamp)) {
return false;
}
LogicalTimestamp that = (LogicalTimestamp) obj;
return Objects.equals(this.value, that.value);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", value)
.toString();
}
/**
* Returns the sequence value.
*
* @return sequence value
*/
public long value() {
return this.value;
}
}
......@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -35,6 +36,7 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
......@@ -102,6 +104,7 @@ public class EventuallyConsistentMapImplTest {
.register(KryoNamespaces.API)
.register(TestTimestamp.class)
// Below is the classes that the map internally registers
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(RemoveEntry.class)
......