Jonathan Hart
Committed by Ray Milkey

Implemented a Builder pattern for EventuallyConsistentMaps.

EventuallyConsistentMap has been moved to the API package so is now available outside the stores.

ONOS-1357

Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
Showing 17 changed files with 589 additions and 237 deletions
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
package org.onosproject.store.service;
import org.onosproject.store.Timestamp;
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
package org.onosproject.store.service;
import java.util.Collection;
import java.util.Map;
......@@ -114,7 +114,7 @@ public interface EventuallyConsistentMap<K, V> {
* Removes the given key-value mapping from the map, if it exists.
* <p>
* This actually means remove any values up to and including the timestamp
* given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
* given by {@link org.onosproject.store.service.ClockService#getTimestamp(Object, Object)}.
* Any mappings that produce an earlier timestamp than this given key-value
* pair will be removed, and any mappings that produce a later timestamp
* will supersede this remove.
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
/**
* Builder for eventually consistent maps.
*
* @param <K> type for map keys
* @param <V> type for map values
*/
public interface EventuallyConsistentMapBuilder<K, V> {
/**
* Sets the name of the map.
* <p>
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the map
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withName(String name);
/**
* Sets a serializer builder that can be used to create a serializer that
* can serialize both the keys and values put into the map. The serializer
* builder should be pre-populated with any classes that will be put into
* the map.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializerBuilder serializer builder
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withSerializer(
KryoNamespace.Builder serializerBuilder);
/**
* Sets the clock service to use for generating timestamps for map updates.
* <p>
* The client must provide an {@link org.onosproject.store.service.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
* be used to serialize updates to the map so they must be strict enough
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param clockService clock service
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService);
/**
* Sets the executor to use for processing events coming in from peers.
*
* @param executor event executor
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withEventExecutor(
ExecutorService executor);
/**
* Sets the executor to use for sending events to peers.
*
* @param executor event executor
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
ExecutorService executor);
/**
* Sets the executor to use for background anti-entropy tasks.
*
* @param executor event executor
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(
ScheduledExecutorService executor);
/**
* Sets a function that can determine which peers to replicate updates to.
* <p>
* The default function replicates to all nodes.
* </p>
*
* @param peerUpdateFunction function that takes a K, V input and returns
* a collection of NodeIds to replicate the event
* to
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction);
/**
* Prevents this map from writing tombstones of items that have been
* removed. This may result in zombie items reappearing after they have
* been removed.
* <p>
* The default behavior is tombstones are enabled.
* </p>
*
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled();
/**
* Configures how often to run the anti-entropy background task.
* <p>
* The default anti-entropy period is 5 seconds.
* </p>
*
* @param period anti-entropy period
* @param unit time unit for the period
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(
long period, TimeUnit unit);
/**
* Configure anti-entropy to converge faster at the cost of doing more work
* for each anti-entropy cycle. Suited to maps with low update rate where
* convergence time is more important than throughput.
* <p>
* The default behavior is to do less anti-entropy work at the cost of
* slower convergence.
* </p>
*
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
/**
* Builds an eventually consistent map based on the configuration options
* supplied to this builder.
*
* @return new eventually consistent map
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
public EventuallyConsistentMap<K, V> build();
}
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
package org.onosproject.store.service;
import com.google.common.base.MoreObjects;
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
package org.onosproject.store.service;
/**
* Listener interested in receiving modification events for an
......
......@@ -30,6 +30,7 @@ public interface StorageService {
/**
* Creates a ConsistentMap.
*
* @param name map name
* @param serializer serializer to use for serializing keys and values
* @return consistent map.
......@@ -40,6 +41,7 @@ public interface StorageService {
/**
* Creates a AsyncConsistentMap.
*
* @param name map name
* @param serializer serializer to use for serializing keys and values
* @return async consistent map
......@@ -50,7 +52,18 @@ public interface StorageService {
/**
* Creates a new transaction context.
*
* @return transaction context
*/
TransactionContext createTransactionContext();
}
\ No newline at end of file
/**
* Creates a new EventuallyConsistentMapBuilder.
*
* @param <K> key type
* @param <V> value type
* @return builder for an eventually consistent map
*/
<K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
}
......
......@@ -43,14 +43,14 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
......@@ -66,10 +66,16 @@ import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.app.ApplicationEvent.Type.*;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -106,6 +112,9 @@ public class GossipApplicationStore extends ApplicationArchive
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
private final AtomicLong sequence = new AtomicLong();
......@@ -130,27 +139,29 @@ public class GossipApplicationStore extends ApplicationArchive
new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
sequence.incrementAndGet());
apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
clusterCommunicator,
serializer,
appsClockService);
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
.withClockService(appsClockService)
.build();
ClockService<Application, InternalState> statesClockService = (app, state) ->
new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
sequence.incrementAndGet());
states = new EventuallyConsistentMapImpl<>("app-states",
clusterService,
clusterCommunicator,
serializer,
statesClockService);
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
.withClockService(statesClockService)
.build();
states.addListener(new InternalAppStatesListener());
permissions = new EventuallyConsistentMapImpl<>("app-permissions",
clusterService,
clusterCommunicator,
serializer,
new WallclockClockManager<>());
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
.withClockService(new WallclockClockManager<>())
.build();
log.info("Started");
}
......
......@@ -25,21 +25,19 @@ import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigEvent;
import org.onosproject.cfg.ComponentConfigStore;
import org.onosproject.cfg.ComponentConfigStoreDelegate;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET;
import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -59,20 +57,19 @@ public class GossipComponentConfigStore
private EventuallyConsistentMap<String, String> properties;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
protected StorageService storageService;
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
properties = new EventuallyConsistentMapImpl<>("cfg", clusterService,
clusterCommunicator,
serializer,
new WallclockClockManager<>());
properties = storageService.<String, String>eventuallyConsistentMapBuilder()
.withName("cfg")
.withSerializer(serializer)
.withClockService(new WallclockClockManager<>())
.build();
properties.addListener(new InternalPropertiesListener());
log.info("Started");
}
......
......@@ -17,13 +17,11 @@
package org.onosproject.store.consistent.impl;
import com.google.common.collect.Sets;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.log.FileLog;
import net.kuujo.copycat.netty.NettyTcpProtocol;
import net.kuujo.copycat.protocol.Consistency;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -32,8 +30,11 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
......@@ -71,6 +72,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
protected String nodeToUri(NodeInfo node) {
return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
}
......@@ -169,12 +173,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
return new DefaultConsistentMap<K, V>(name, partitionedDatabase, serializer);
return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
}
@Override
public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
return new DefaultAsyncConsistentMap<K, V>(name, partitionedDatabase, serializer);
return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
}
@Override
......@@ -207,4 +211,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
database.cluster().leader() != null ?
database.cluster().leader().uri() : null);
}
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
return new EventuallyConsistentMapBuilderImpl<>(clusterService,
clusterCommunicator);
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Eventually consistent map builder.
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private String name;
private KryoNamespace.Builder serializerBuilder;
private ExecutorService eventExecutor;
private ExecutorService communicationExecutor;
private ScheduledExecutorService backgroundExecutor;
private ClockService<K, V> clockService;
private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
private boolean tombstonesDisabled = false;
private long antiEntropyPeriod = 5;
private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
private boolean convergeFaster = false;
/**
* Creates a new eventually consistent map builder.
*
* @param clusterService cluster service
* @param clusterCommunicator cluster communication service
*/
public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
}
@Override
public EventuallyConsistentMapBuilder withName(String name) {
this.name = checkNotNull(name);
return this;
}
@Override
public EventuallyConsistentMapBuilder withSerializer(
KryoNamespace.Builder serializerBuilder) {
this.serializerBuilder = checkNotNull(serializerBuilder);
return this;
}
@Override
public EventuallyConsistentMapBuilder withClockService(
ClockService<K, V> clockService) {
this.clockService = checkNotNull(clockService);
return this;
}
@Override
public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
this.eventExecutor = checkNotNull(executor);
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
ExecutorService executor) {
communicationExecutor = checkNotNull(executor);
return this;
}
@Override
public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
this.backgroundExecutor = checkNotNull(executor);
return this;
}
@Override
public EventuallyConsistentMapBuilder withPeerUpdateFunction(
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() {
tombstonesDisabled = true;
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) {
checkArgument(period > 0, "anti-entropy period must be greater than 0");
antiEntropyPeriod = period;
antiEntropyTimeUnit = checkNotNull(unit);
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() {
convergeFaster = true;
return this;
}
@Override
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
checkNotNull(clockService, "clockService is a mandatory parameter");
return new EventuallyConsistentMapImpl<>(name,
clusterService,
clusterCommunicator,
serializerBuilder,
clockService,
peerUpdateFunction,
eventExecutor,
communicationExecutor,
backgroundExecutor,
tombstonesDisabled,
antiEntropyPeriod,
antiEntropyTimeUnit,
convergeFaster);
}
}
......@@ -32,10 +32,13 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -54,7 +57,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
......@@ -93,8 +95,8 @@ public class EventuallyConsistentMapImpl<K, V>
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
private ExecutorService communicationExecutor;
private Map<NodeId, EventAccumulator> senderPending;
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
......@@ -103,130 +105,115 @@ public class EventuallyConsistentMapImpl<K, V>
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
private long periodSec = 5;
private boolean lightweightAntiEntropy = true;
private boolean tombstonesDisabled = false;
private final long initialDelaySec = 5;
private final boolean lightweightAntiEntropy;
private final boolean tombstonesDisabled;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
AtomicLong operations = new AtomicLong();
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
* </p>
* <p>
* The client is expected to provide an
* {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
* will be stored in this map have been registered (including referenced
* classes). This serializer will be used to serialize both K and V for
* inter-node notifications.
* </p>
* <p>
* The client must provide an {@link org.onosproject.store.impl.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
* be used to serialize updates to the map so they must be strict enough
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
* See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
* for more description of the parameters expected by the map.
* </p>
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
* @param peerUpdateFunction function that provides a set of nodes to immediately
* update to when there writes to the map
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K and V
* @param peerUpdateFunction function that provides a set of nodes to immediately
* update to when there writes to the map
* @param eventExecutor executor to use for processing incoming
* events from peers
* @param communicationExecutor executor to use for sending events to peers
* @param backgroundExecutor executor to use for background anti-entropy
* tasks
* @param tombstonesDisabled true if this map should not maintain
* tombstones
* @param antiEntropyPeriod period that the anti-entropy task should run
* in seconds
* @param convergeFaster make anti-entropy try to converge faster
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
serializer = createSerializer(checkNotNull(serializerBuilder));
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
ExecutorService eventExecutor,
ExecutorService communicationExecutor,
ScheduledExecutorService backgroundExecutor,
boolean tombstonesDisabled,
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
this.clockService = checkNotNull(clockService);
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
this.serializer = createSerializer(serializerBuilder);
// should be a normal executor; it's used for receiving messages
//TODO make # of threads configurable
executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
this.clockService = clockService;
// sending executor; should be capped
//TODO make # of threads configurable
//TODO this probably doesn't need to be bounded anymore
communicationExecutor =
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
senderPending = Maps.newConcurrentMap();
if (peerUpdateFunction != null) {
this.peerUpdateFunction = peerUpdateFunction;
} else {
this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
.map(ControllerNode::id)
.filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
.collect(Collectors.toList());
}
if (eventExecutor != null) {
this.executor = eventExecutor;
} else {
// should be a normal executor; it's used for receiving messages
this.executor =
Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
}
if (communicationExecutor != null) {
this.communicationExecutor = communicationExecutor;
} else {
// sending executor; should be capped
//TODO this probably doesn't need to be bounded anymore
this.communicationExecutor =
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
backgroundExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
} else {
this.backgroundExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
}
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, antiEntropyPeriod,
antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
new InternalEventListener(), executor);
new InternalEventListener(), this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
new InternalAntiEntropyListener(), backgroundExecutor);
}
new InternalAntiEntropyListener(), this.backgroundExecutor);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
* Take a look at the other constructor for usage information. The only difference
* is that a BiFunction is provided that returns all nodes in the cluster, so
* all nodes will be sent write updates immediately.
* </p>
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService) {
this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
(key, value) -> clusterService.getNodes().stream()
.map(ControllerNode::id)
.filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
.collect(Collectors.toList()));
}
public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
tombstonesDisabled = status;
return this;
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
......@@ -246,19 +233,6 @@ public class EventuallyConsistentMapImpl<K, V>
};
}
/**
* Sets the executor to use for broadcasting messages and returns this
* instance for method chaining.
*
* @param executor executor service
* @return this instance
*/
public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
checkNotNull(executor, "Null executor");
communicationExecutor = executor;
return this;
}
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
......
......@@ -15,23 +15,8 @@
*/
package org.onosproject.store.group.impl;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -75,17 +60,32 @@ import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of group entries using trivial in-memory implementation.
......@@ -108,6 +108,9 @@ public class DistributedGroupStore
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
......@@ -192,31 +195,38 @@ public class DistributedGroupStore
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
groupStoreEntriesByKey =
new EventuallyConsistentMapImpl<>("groupstorekeymap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
groupStoreEntriesByKey = keyMapBuilder
.withName("groupstorekeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Creating EC map groupstoreidmap");
groupStoreEntriesById =
new EventuallyConsistentMapImpl<>("groupstoreidmap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
idMapBuilder = storageService.eventuallyConsistentMapBuilder();
groupStoreEntriesById = idMapBuilder
.withName("groupstoreidmap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
log.trace("Current size {}", groupStoreEntriesById.size());
log.debug("Creating EC map pendinggroupkeymap");
auditPendingReqQueue =
new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
auditPendingReqQueue = auditMapBuilder
.withName("pendinggroupkeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
log.trace("Current size {}", auditPendingReqQueue.size());
log.info("Started");
......@@ -819,11 +829,11 @@ public class DistributedGroupStore
* Map handler to receive any events when the group map is updated.
*/
private class GroupStoreIdMapListener implements
EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
StoredGroupEntry> mapEvent) {
StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
log.trace("GroupStoreIdMapListener: received groupid map event {}",
mapEvent.type());
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.impl;
import org.onosproject.store.Timestamp;
import org.onosproject.store.service.ClockService;
/**
* A clock service which hands out wallclock-based timestamps.
......
......@@ -36,14 +36,13 @@ import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Collection;
......@@ -52,7 +51,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.intent.IntentState.*;
import static org.onosproject.net.intent.IntentState.PURGE_REQ;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -61,7 +60,7 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
//FIXME we should listen for leadership changes. if the local instance has just
// ... become a leader, scan the pending map and process those
@Component(immediate = false, enabled = true)
@Component(immediate = true, enabled = true)
@Service
public class GossipIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate>
......@@ -76,10 +75,10 @@ public class GossipIntentStore
private EventuallyConsistentMap<Key, IntentData> pendingMap;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionService partitionService;
......@@ -92,19 +91,19 @@ public class GossipIntentStore
.register(MultiValuedTimestamp.class)
.register(WallClockTimestamp.class);
currentMap = new EventuallyConsistentMapImpl<>("intent-current",
clusterService,
clusterCommunicator,
intentSerializer,
new IntentDataLogicalClockManager<>(),
(key, intentData) -> getPeerNodes(key, intentData));
pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer,
new IntentDataClockManager<>(),
(key, intentData) -> getPeerNodes(key, intentData));
currentMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
.withName("intent-current")
.withSerializer(intentSerializer)
.withClockService(new IntentDataLogicalClockManager<>())
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
.build();
pendingMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
.withName("intent-pending")
.withSerializer(intentSerializer)
.withClockService(new IntentDataClockManager<>())
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
.build();
currentMap.addListener(new InternalCurrentListener());
pendingMap.addListener(new InternalPendingListener());
......
......@@ -17,7 +17,7 @@ package org.onosproject.store.intent.impl;
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
/**
......
......@@ -17,7 +17,7 @@ package org.onosproject.store.intent.impl;
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import java.util.concurrent.atomic.AtomicLong;
......
......@@ -34,10 +34,13 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import java.io.IOException;
import java.util.ArrayList;
......@@ -134,10 +137,13 @@ public class EventuallyConsistentMapImplTest {
.register(KryoNamespaces.API)
.register(TestTimestamp.class);
ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
clusterCommunicator,
serializer, clockService)
.withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
ecMap = new EventuallyConsistentMapBuilderImpl<>(
clusterService, clusterCommunicator)
.withName(MAP_NAME)
.withSerializer(serializer)
.withClockService(clockService)
.withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
.build();
// Reset ready for tests to add their own expectations
reset(clusterCommunicator);
......