HIGUCHI Yuta
Committed by Gerrit Code Review

Some improvements around EventuallyConsistentMapBuilder serializer handling.

- Allow directly passing KryoNamespace
- Add some registration id gap before ECMap's internal registration
- Some improvements for ease of registration issue investigation
-- Add friendly name to ECMap's internal KryoNamespace
-- Add backtrace information

Change-Id: I7c87b3aefbaea4b2ed12b38c3e0813e9d195c7a9
......@@ -182,6 +182,11 @@ public final class VtnEventuallyConsistentMapTest<K, V> extends VtnEventuallyCon
}
@Override
public EventuallyConsistentMapBuilder<K, V> withSerializer(KryoNamespace serializer) {
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V>
withTimestampProvider(BiFunction<K, V, Timestamp> timestampProvider) {
return this;
......
......@@ -67,6 +67,20 @@ public interface EventuallyConsistentMapBuilder<K, V> {
KryoNamespace.Builder serializerBuilder);
/**
* Sets a serializer that can be used to create a serializer that
* can serialize both the keys and values put into the map. The serializer
* builder should be pre-populated with any classes that will be put into
* the map.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this EventuallyConsistentMapBuilder
*/
EventuallyConsistentMapBuilder<K, V> withSerializer(KryoNamespace serializer);
/**
* Sets the function to use for generating timestamps for map updates.
* <p>
* The client must provide an {@code BiFunction<K, V, Timestamp>}
......
......@@ -179,6 +179,11 @@ public final class TestEventuallyConsistentMap<K, V> extends EventuallyConsisten
}
@Override
public EventuallyConsistentMapBuilder<K, V> withSerializer(KryoNamespace serializer) {
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V>
withTimestampProvider(BiFunction<K, V, Timestamp> timestampProvider) {
return this;
......
......@@ -42,6 +42,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
private final ClusterCommunicationService clusterCommunicator;
private String name;
private KryoNamespace serializer;
private KryoNamespace.Builder serializerBuilder;
private ExecutorService eventExecutor;
private ExecutorService communicationExecutor;
......@@ -85,6 +86,12 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
}
@Override
public EventuallyConsistentMapBuilder<K, V> withSerializer(KryoNamespace serializer) {
this.serializer = checkNotNull(serializer);
return this;
}
@Override
public EventuallyConsistentMapBuilder<K, V> withTimestampProvider(
BiFunction<K, V, Timestamp> timestampProvider) {
this.timestampProvider = checkNotNull(timestampProvider);
......@@ -147,13 +154,16 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
@Override
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
checkNotNull(timestampProvider, "timestampProvider is a mandatory parameter");
if (serializer == null && serializerBuilder != null) {
serializer = serializerBuilder.build(name);
}
checkNotNull(serializer, "serializer is a mandatory parameter");
return new EventuallyConsistentMapImpl<>(name,
clusterService,
clusterCommunicator,
serializerBuilder,
serializer,
timestampProvider,
peerUpdateFunction,
eventExecutor,
......
......@@ -58,7 +58,6 @@ import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -138,7 +137,7 @@ public class EventuallyConsistentMapImpl<K, V>
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* @param serializer a Kryo namespace that can serialize
* both K and V
* @param timestampProvider provider of timestamps for K and V
* @param peerUpdateFunction function that provides a set of nodes to immediately
......@@ -159,7 +158,7 @@ public class EventuallyConsistentMapImpl<K, V>
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
ExecutorService eventExecutor,
......@@ -172,25 +171,14 @@ public class EventuallyConsistentMapImpl<K, V>
boolean persistent,
PersistenceService persistenceService) {
this.mapName = mapName;
this.serializer = createSerializer(serializerBuilder);
this.serializer = createSerializer(ns);
this.persistenceService = persistenceService;
this.persistent =
persistent;
if (persistent) {
items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
.withName(PERSISTENT_LOCAL_MAP_NAME)
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
return EventuallyConsistentMapImpl.this.serializer.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
}
})
.withSerializer(this.serializer)
.build();
} else {
items = Maps.newConcurrentMap();
......@@ -268,18 +256,21 @@ public class EventuallyConsistentMapImpl<K, V>
this.lightweightAntiEntropy = !convergeFaster;
}
private StoreSerializer createSerializer(KryoNamespace.Builder builder) {
return StoreSerializer.using(builder
.register(KryoNamespaces.BASIC)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(AntiEntropyAdvertisement.class)
.register(AntiEntropyResponse.class)
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
.build(name()));
private StoreSerializer createSerializer(KryoNamespace ns) {
return StoreSerializer.using(KryoNamespace.newBuilder()
.register(ns)
// not so robust way to avoid collision with other
// user supplied registrations
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
.register(KryoNamespaces.BASIC)
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(AntiEntropyAdvertisement.class)
.register(AntiEntropyResponse.class)
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
.build(name() + "-ecmap"));
}
@Override
......
......@@ -20,11 +20,12 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.service.Serializer;
/**
* Service to serialize Objects into byte array.
*/
public interface StoreSerializer {
public interface StoreSerializer extends Serializer {
/**
* Serializes the specified object into bytes.
......@@ -32,6 +33,7 @@ public interface StoreSerializer {
* @param obj object to be serialized
* @return serialized bytes
*/
@Override
byte[] encode(final Object obj);
/**
......@@ -57,6 +59,7 @@ public interface StoreSerializer {
* @return deserialized object
* @param <T> decoded type
*/
@Override
<T> T decode(final byte[] bytes);
/**
......
......@@ -125,9 +125,11 @@ public final class KryoNamespace implements KryoFactory, KryoPool {
if (!types.isEmpty()) {
if (id != FLOATING_ID && id < blockHeadId + types.size()) {
log.warn("requested nextId {} could potentially overlap " +
"with existing registrations {}+{} ",
id, blockHeadId, types.size());
if (log.isWarnEnabled()) {
log.warn("requested nextId {} could potentially overlap " +
"with existing registrations {}+{} ",
id, blockHeadId, types.size(), new RuntimeException());
}
}
blocks.add(new RegistrationBlock(this.blockHeadId, types));
types = new ArrayList<>();
......