HIGUCHI Yuta
Committed by Thomas Vachuska

Deprecate KryoSerializer.

Change-Id: I2403b95c2d7a8af69dff55a0e40a35b223127c85
Showing 16 changed files with 97 additions and 156 deletions
......@@ -43,7 +43,7 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
......@@ -90,16 +90,12 @@ public class DistributedClusterStore
label = "the value of Phi threshold to detect accrual failure")
private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
private static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(HeartbeatMessage.class)
.build()
.populate(1);
}
};
.build("ClusterStore"));
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
......
......@@ -63,7 +63,7 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -157,17 +157,13 @@ public class ECDeviceStore
private final SetEventListener<DeviceId> deviceStatusTracker =
new InternalDeviceStatusTracker();
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(DeviceInjectedEvent.class)
.register(PortInjectedEvent.class)
.build();
}
};
.build("ECDevice"));
protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
......
......@@ -72,7 +72,7 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
......@@ -170,10 +170,7 @@ public class GossipDeviceStore
protected MastershipTermService termService;
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
......@@ -186,9 +183,7 @@ public class GossipDeviceStore
.register(PortFragmentId.class)
.register(DeviceInjectedEvent.class)
.register(PortInjectedEvent.class)
.build();
}
};
.build("GossipDevice"));
private ExecutorService executor;
......
......@@ -67,7 +67,6 @@ package org.onosproject.store.flow.impl;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -176,15 +175,11 @@ public class DistributedFlowRuleStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.build();
}
};
.build("FlowRuleStore"));
protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
......
......@@ -61,7 +61,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
......@@ -155,16 +155,12 @@ public class ECLinkStore
protected LinkDiscoveryMode linkDiscoveryMode = LinkDiscoveryMode.STRICT;
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(Provided.class)
.build();
}
};
.build("ECLink"));
@Activate
public void activate() {
......
......@@ -63,7 +63,7 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.slf4j.Logger;
......@@ -145,10 +145,8 @@ public class GossipLinkStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalLinkEvent.class)
......@@ -156,9 +154,7 @@ public class GossipLinkStore
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
.register(LinkInjectedEvent.class)
.build();
}
};
.build("GossipLink"));
private ExecutorService executor;
......
......@@ -58,7 +58,6 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.slf4j.Logger;
......@@ -107,17 +106,13 @@ public class ConsistentDeviceMastershipStore
private static final String DEVICE_ID_NULL = "Device ID cannot be null";
private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
public static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
public static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipRole.class)
.register(MastershipEvent.class)
.register(MastershipEvent.Type.class)
.build();
}
};
.build("MastershipStore"));
@Activate
public void activate() {
......
......@@ -26,7 +26,6 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
......@@ -41,7 +40,7 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
......@@ -95,15 +94,7 @@ public class DistributedPacketStore
private static final MessageSubject PACKET_OUT_SUBJECT =
new MessageSubject("packet-out");
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
private ExecutorService messageHandlingExecutor;
......
......@@ -37,7 +37,7 @@ import org.onosproject.net.proxyarp.ProxyArpStoreDelegate;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -60,16 +60,13 @@ public class DistributedProxyArpStore implements ProxyArpStore {
private static final MessageSubject ARP_RESPONSE_MESSAGE =
new MessageSubject("onos-arp-response");
protected final KryoSerializer serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
protected final StoreSerializer serializer = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(ArpResponseMessage.class)
.register(ByteBuffer.class)
.build();
}
};
.build("ProxyArpStore"));
private ProxyArpStoreDelegate delegate;
......
......@@ -25,7 +25,6 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
......@@ -40,7 +39,7 @@ import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
......@@ -90,16 +89,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
.build();
}
};
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
private NodeId local;
private ExecutorService messageHandlingExecutor;
......
......@@ -25,7 +25,6 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
......@@ -40,7 +39,7 @@ import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
......@@ -95,16 +94,7 @@ public class DistributedStatisticStore implements StatisticStore {
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
.build();
}
};
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
private ExecutorService messageHandlingExecutor;
......
......@@ -54,7 +54,7 @@ import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
......@@ -83,7 +83,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final StoreSerializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
......@@ -268,12 +268,8 @@ public class EventuallyConsistentMapImpl<K, V>
this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
private StoreSerializer createSerializer(KryoNamespace.Builder builder) {
return StoreSerializer.using(builder
.register(KryoNamespaces.BASIC)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(LogicalTimestamp.class)
......@@ -283,9 +279,7 @@ public class EventuallyConsistentMapImpl<K, V>
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
.build();
}
};
.build(name()));
}
@Override
......@@ -340,7 +334,7 @@ public class EventuallyConsistentMapImpl<K, V>
MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
if (putInternal(key, newValue)) {
notifyPeers(new UpdateEntry<K, V>(key, newValue), peerUpdateFunction.apply(key, value));
notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
}
}
......@@ -595,7 +589,7 @@ public class EventuallyConsistentMapImpl<K, V>
}
private AntiEntropyAdvertisement<K> createAdvertisement() {
return new AntiEntropyAdvertisement<K>(localNodeId,
return new AntiEntropyAdvertisement<>(localNodeId,
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
......
......@@ -37,7 +37,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
......@@ -58,19 +57,15 @@ import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractEvent;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.persistence.TestPersistenceService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.WallClockTimestamp;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
......@@ -105,31 +100,6 @@ public class EventuallyConsistentMapImplTest {
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
/*
* Serialization is a bit tricky here. We need to serialize in the tests
* to set the expectations, which will use this serializer here, but the
* EventuallyConsistentMap will use its own internal serializer. This means
* this serializer must be set up exactly the same as map's internal
* serializer.
*/
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
// Classes we give to the map
.register(KryoNamespaces.API)
.register(TestTimestamp.class)
// Below is the classes that the map internally registers
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.register(Optional.class)
.build();
}
};
@Before
public void setUp() throws Exception {
clusterService = createMock(ClusterService.class);
......
......@@ -25,7 +25,10 @@ import com.google.common.base.MoreObjects;
/**
* StoreSerializer implementation using Kryo.
*
* @deprecated in Goldeneye (1.6.0)
*/
@Deprecated
public class KryoSerializer implements StoreSerializer {
protected KryoNamespace serializerPool;
......
......@@ -19,7 +19,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
import org.onlab.util.KryoNamespace;
/**
* Service to serialize Objects into byte array.
*/
......@@ -84,4 +85,50 @@ public interface StoreSerializer {
* @param <T> object type
*/
<T> T copy(final T object);
/**
* Creates a new StoreSerializer instance from a KryoNamespace.
*
* @param ns kryo namespace
* @return StoreSerializer instance
*/
static StoreSerializer using(KryoNamespace ns) {
return new StoreSerializer() {
@Override
public void encode(Object obj, OutputStream stream) {
ns.serialize(obj, stream);
}
@Override
public void encode(Object obj, ByteBuffer buffer) {
ns.serialize(obj, buffer);
}
@Override
public byte[] encode(Object obj) {
return ns.serialize(obj);
}
@Override
public <T> T decode(InputStream stream) {
return ns.deserialize(stream);
}
@Override
public <T> T decode(ByteBuffer buffer) {
return ns.deserialize(buffer);
}
@Override
public <T> T decode(byte[] bytes) {
return ns.deserialize(bytes);
}
@Override
public <T> T copy(T object) {
return ns.run(kryo -> kryo.copy(object));
}
};
}
}
......
......@@ -84,7 +84,6 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.Ip4Prefix;
import org.onlab.packet.Ip6Prefix;
import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.resource.VlanCodec;
import java.nio.ByteBuffer;
......@@ -130,7 +129,7 @@ public class KryoSerializerTest {
GridType.DWDM, ChannelSpacing.CHL_100GHZ, -8, 4);
private static final VlanId VLAN1 = VlanId.vlanId((short) 100);
private KryoSerializer serializer;
private StoreSerializer serializer;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
......@@ -138,16 +137,7 @@ public class KryoSerializerTest {
@Before
public void setUp() throws Exception {
serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
serializer = StoreSerializer.using(KryoNamespaces.API);
}
@After
......