Madan Jampani

Removed ClockService<K, V> and replaced its usage with a BiFunction<K, V, Timestamp>

Change-Id: Ide8d979f9361f1aff6727a83733747f4512ef8ff
......@@ -55,7 +55,6 @@ import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -171,7 +170,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
nsNextObjStore = nsNextObjMapBuilder
.withName("nsnextobjectivestore")
.withSerializer(kryoBuilder)
.withClockService(new WallclockClockManager<>())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
log.trace("Current size {}", nsNextObjStore.size());
......@@ -181,7 +180,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
tunnelStore = tunnelMapBuilder
.withName("tunnelstore")
.withSerializer(kryoBuilder)
.withClockService(new WallclockClockManager<>())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
EventuallyConsistentMapBuilder<String, Policy> policyMapBuilder =
......@@ -190,7 +189,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
policyStore = policyMapBuilder
.withName("policystore")
.withSerializer(kryoBuilder)
.withClockService(new WallclockClockManager<>())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
networkConfigService.init();
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.store.Timestamp;
/**
* Clock service that can generate timestamps based off of two input objects.
* Implementations are free to only take one or none of the objects into account
* when generating timestamps.
*/
public interface ClockService<T, U> {
/**
* Gets a new timestamp for the given objects.
*
* @param object1 First object to use when generating timestamps
* @param object2 Second object to use when generating timestamps
* @return the new timestamp
*/
Timestamp getTimestamp(T object1, U object2);
}
......@@ -114,7 +114,7 @@ public interface EventuallyConsistentMap<K, V> {
* Removes the given key-value mapping from the map, if it exists.
* <p>
* This actually means remove any values up to and including the timestamp
* given by {@link org.onosproject.store.service.ClockService#getTimestamp(Object, Object)}.
* given by the map's timestampProvider.
* Any mappings that produce an earlier timestamp than this given key-value
* pair will be removed, and any mappings that produce a later timestamp
* will supersede this remove.
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
......@@ -66,10 +67,10 @@ public interface EventuallyConsistentMapBuilder<K, V> {
KryoNamespace.Builder serializerBuilder);
/**
* Sets the clock service to use for generating timestamps for map updates.
* Sets the function to use for generating timestamps for map updates.
* <p>
* The client must provide an {@link org.onosproject.store.service.ClockService}
* which can generate timestamps for a given key. The clock service is free
* The client must provide an {@code BiFunction<K, V, Timestamp>}
* which can generate timestamps for a given key. The function is free
* to generate timestamps however it wishes, however these timestamps will
* be used to serialize updates to the map so they must be strict enough
* to ensure updates are properly ordered for the use case (i.e. in some
......@@ -80,11 +81,11 @@ public interface EventuallyConsistentMapBuilder<K, V> {
* Note: This is a mandatory parameter.
* </p>
*
* @param clockService clock service
* @param timestampProvider provides a new timestamp
* @return this EventuallyConsistentMapBuilder
*/
EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService);
EventuallyConsistentMapBuilder<K, V> withTimestampProvider(
BiFunction<K, V, Timestamp> timestampProvider);
/**
* Sets the executor to use for processing events coming in from peers.
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.store.Timestamp;
/**
* A clock service which hands out wallclock-based timestamps.
*/
public class WallclockClockManager<T, U> implements ClockService<T, U> {
@Override
public Timestamp getTimestamp(T object1, U object2) {
return new WallClockTimestamp();
}
}
......@@ -133,13 +133,13 @@ public class GossipApplicationStore extends ApplicationArchive
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
.withClockService((k, v) -> clockService.getTimestamp())
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
.withClockService((k, v) -> clockService.getTimestamp())
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
......@@ -147,7 +147,7 @@ public class GossipApplicationStore extends ApplicationArchive
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
.withClockService((k, v) -> clockService.getTimestamp())
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
log.info("Started");
......
......@@ -70,7 +70,7 @@ public class GossipComponentConfigStore
properties = storageService.<String, String>eventuallyConsistentMapBuilder()
.withName("cfg")
.withSerializer(serializer)
.withClockService((k, v) -> clockService.getTimestamp())
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
properties.addListener(new InternalPropertiesListener());
......
......@@ -78,7 +78,6 @@ import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger;
import java.io.IOException;
......@@ -244,7 +243,7 @@ public class GossipDeviceStore
.withName("port-stats")
.withSerializer(deviceDataSerializer)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withClockService(new WallclockClockManager<>())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.withTombstonesDisabled()
.build();
devicePortStats.addListener(portStatsListener);
......
......@@ -18,8 +18,8 @@ package org.onosproject.store.ecmap;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
......@@ -45,7 +45,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
private ExecutorService eventExecutor;
private ExecutorService communicationExecutor;
private ScheduledExecutorService backgroundExecutor;
private ClockService<K, V> clockService;
private BiFunction<K, V, Timestamp> timestampProvider;
private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
private boolean tombstonesDisabled = false;
private long antiEntropyPeriod = 5;
......@@ -79,9 +79,9 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
}
@Override
public EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService) {
this.clockService = checkNotNull(clockService);
public EventuallyConsistentMapBuilder<K, V> withTimestampProvider(
BiFunction<K, V, Timestamp> timestampProvider) {
this.timestampProvider = checkNotNull(timestampProvider);
return this;
}
......@@ -141,13 +141,13 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
checkNotNull(clockService, "clockService is a mandatory parameter");
checkNotNull(timestampProvider, "timestampProvider is a mandatory parameter");
return new EventuallyConsistentMapImpl<>(name,
clusterService,
clusterCommunicator,
serializerBuilder,
clockService,
timestampProvider,
peerUpdateFunction,
eventExecutor,
communicationExecutor,
......
......@@ -37,7 +37,6 @@ import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
......@@ -84,7 +83,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final ClockService<K, V> clockService;
private final BiFunction<K, V, Timestamp> timestampProvider;
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
......@@ -130,8 +129,7 @@ public class EventuallyConsistentMapImpl<K, V>
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K and V
* @param timestampProvider provider of timestamps for K and V
* @param peerUpdateFunction function that provides a set of nodes to immediately
* update to when there writes to the map
* @param eventExecutor executor to use for processing incoming
......@@ -150,7 +148,7 @@ public class EventuallyConsistentMapImpl<K, V>
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
ExecutorService eventExecutor,
ExecutorService communicationExecutor,
......@@ -170,7 +168,7 @@ public class EventuallyConsistentMapImpl<K, V>
this.serializer = createSerializer(serializerBuilder);
this.clockService = clockService;
this.timestampProvider = timestampProvider;
if (peerUpdateFunction != null) {
this.peerUpdateFunction = peerUpdateFunction;
......@@ -302,7 +300,7 @@ public class EventuallyConsistentMapImpl<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
Timestamp timestamp = clockService.getTimestamp(key, value);
Timestamp timestamp = timestampProvider.apply(key, value);
if (putInternal(key, value, timestamp)) {
notifyPeers(new PutEntry<>(key, value, timestamp),
......@@ -354,7 +352,7 @@ public class EventuallyConsistentMapImpl<K, V>
checkNotNull(key, ERROR_NULL_KEY);
// TODO prevent calls here if value is important for timestamp
Timestamp timestamp = clockService.getTimestamp(key, null);
Timestamp timestamp = timestampProvider.apply(key, null);
if (removeInternal(key, timestamp)) {
notifyPeers(new RemoveEntry<>(key, timestamp),
......@@ -412,7 +410,7 @@ public class EventuallyConsistentMapImpl<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
Timestamp timestamp = clockService.getTimestamp(key, value);
Timestamp timestamp = timestampProvider.apply(key, value);
if (removeInternal(key, timestamp)) {
notifyPeers(new RemoveEntry<>(key, timestamp),
......
......@@ -59,7 +59,6 @@ import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.group.StoredGroupBucketEntry;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
......@@ -67,7 +66,6 @@ import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.URISerializer;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
......@@ -142,6 +140,8 @@ public class DistributedGroupStore
private KryoNamespace.Builder kryoBuilder = null;
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Activate
public void activate() {
kryoBuilder = new KryoNamespace.Builder()
......@@ -210,7 +210,8 @@ public class DistributedGroupStore
groupStoreEntriesByKey = keyMapBuilder
.withName("groupstorekeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement()))
.build();
groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
log.debug("Current size of groupstorekeymap:{}",
......@@ -223,7 +224,8 @@ public class DistributedGroupStore
auditPendingReqQueue = auditMapBuilder
.withName("pendinggroupkeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement()))
.build();
log.debug("Current size of pendinggroupkeymap:{}",
auditPendingReqQueue.size());
......@@ -909,21 +911,6 @@ public class DistributedGroupStore
}
/**
* ClockService that generates wallclock based timestamps.
*/
private class GroupStoreLogicalClockManager<T, U>
implements ClockService<T, U> {
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Override
public Timestamp getTimestamp(T t1, U u1) {
return new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement());
}
}
/**
* Map handler to receive any events when the group key map is updated.
*/
private class GroupStoreKeyMapListener implements
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.intent.impl;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -48,6 +49,7 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -83,6 +85,8 @@ public class GossipIntentStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionService partitionService;
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
......@@ -94,14 +98,17 @@ public class GossipIntentStore
currentMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
.withName("intent-current")
.withSerializer(intentSerializer)
.withClockService(new IntentDataLogicalClockManager<>())
.withTimestampProvider((key, intentData) ->
new MultiValuedTimestamp<>(intentData.version(),
sequenceNumber.getAndIncrement()))
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
.build();
pendingMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
.withName("intent-pending")
.withSerializer(intentSerializer)
.withClockService(new IntentDataClockManager<>())
.withTimestampProvider((key, intentData) -> new MultiValuedTimestamp<>(intentData.version(),
System.nanoTime()))
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
.build();
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
/**
* ClockService that uses IntentData versions as timestamps.
*/
public class IntentDataClockManager<K> implements ClockService<K, IntentData> {
@Override
public Timestamp getTimestamp(K key, IntentData intentData) {
return new MultiValuedTimestamp<>(intentData.version(), System.nanoTime());
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
import java.util.concurrent.atomic.AtomicLong;
/**
* ClockService that generates logical timestamps based on IntentData versions.
*/
public class IntentDataLogicalClockManager<K> implements ClockService<K, IntentData> {
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Override
public Timestamp getTimestamp(K key, IntentData intentData) {
return new MultiValuedTimestamp<>(intentData.version(),
sequenceNumber.getAndIncrement());
}
}
......@@ -35,7 +35,6 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -148,7 +147,7 @@ public class EventuallyConsistentMapImplTest {
clusterService, clusterCommunicator)
.withName(MAP_NAME)
.withSerializer(serializer)
.withClockService(clockService)
.withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
.withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
.build();
......@@ -805,12 +804,11 @@ public class EventuallyConsistentMapImplTest {
* @param <T> Type that the clock service will give out timestamps for
* @param <U> Second type that the clock service will give out values for
*/
private class SequentialClockService<T, U> implements ClockService<T, U> {
private class SequentialClockService<T, U> {
private static final long INITIAL_VALUE = 1;
private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
@Override
public Timestamp getTimestamp(T object, U object2) {
return new TestTimestamp(counter.getAndIncrement());
}
......
......@@ -57,7 +57,7 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallclockClockManager;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
......@@ -114,23 +114,23 @@ public class DistributedTunnelStore
tunnelIdAsKeyStore = storageService
.<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
.withName("all_tunnel").withSerializer(serializer)
.withClockService(new WallclockClockManager<>()).build();
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
tunnelNameAsKeyStore = storageService
.<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder()
.withName("tunnel_name_tunnel").withSerializer(serializer)
.withClockService(new WallclockClockManager<>()).build();
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
srcAndDstKeyStore = storageService
.<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder()
.withName("src_dst_tunnel").withSerializer(serializer)
.withClockService(new WallclockClockManager<>()).build();
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
typeKeyStore = storageService
.<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder()
.withName("type_tunnel").withSerializer(serializer)
.withClockService(new WallclockClockManager<>()).build();
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
orderRelationship = storageService
.<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder()
.withName("type_tunnel").withSerializer(serializer)
.withClockService(new WallclockClockManager<>()).build();
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
idGenerator = coreService.getIdGenerator(runnelOpTopoic);
log.info("Started");
}
......