Yuta HIGUCHI

AntiEntropy sketch + required Serializer work

Change-Id: Ibac5f4eede6b420202683114c3262e01b7264eff
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.Timestamp;
import com.google.common.collect.ImmutableMap;
/**
* Anti-Entropy advertisement message.
*
* @param <ID> ID type
*/
public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, Timestamp> advertisement;
/**
* Creates anti-entropy advertisement message.
*
* @param sender sender of this message
* @param advertisement timestamp information of the data sender holds
*/
public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
super(AE_ADVERTISEMENT);
this.sender = sender;
this.advertisement = ImmutableMap.copyOf(advertisement);
}
public NodeId sender() {
return sender;
}
public ImmutableMap<ID, Timestamp> advertisement() {
return advertisement;
}
// Default constructor for serializer
protected AntiEntropyAdvertisement() {
super(AE_ADVERTISEMENT);
this.sender = null;
this.advertisement = null;
}
}
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.device.impl.VersionedValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
private final ImmutableSet<ID> request;
/**
* Creates a reply to anti-entropy message.
*
* @param sender sender of this message
* @param suggestion collection of more recent values, sender had
* @param request Collection of identifiers
*/
public AntiEntropyReply(NodeId sender,
Map<ID, VersionedValue<VALUE>> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
this.suggestion = ImmutableMap.copyOf(suggestion);
this.request = ImmutableSet.copyOf(request);
}
public NodeId sender() {
return sender;
}
public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
return suggestion;
}
public ImmutableSet<ID> request() {
return request;
}
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
this.sender = null;
this.suggestion = null;
this.request = null;
}
}
......@@ -15,6 +15,12 @@ public enum MessageSubject {
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
ECHO,
/** Anti-Entropy advertisement message. */
AE_ADVERTISEMENT,
/** Anti-Entropy reply message. */
AE_REPLY,
}
......
......@@ -42,4 +42,12 @@ public class VersionedValue<T> {
public Timestamp timestamp() {
return timestamp;
}
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
this.isUp = false;
this.timestamp = null;
}
}
......
package org.onlab.onos.store.serializers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
import com.google.common.collect.ImmutableMap;
/**
* Kryo Serializer for {@link ImmutableMap}.
*/
public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
private final MapSerializer mapSerializer = new MapSerializer();
public ImmutableMapSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
// wrapping with unmodifiableMap proxy
// to avoid Kryo from writing only the reference marker of this instance,
// which will be embedded right before this method call.
kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
}
@Override
public ImmutableMap<?, ?> read(Kryo kryo, Input input,
Class<ImmutableMap<?, ?>> type) {
Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
return ImmutableMap.copyOf(map);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableMap.of().getClass(), this);
kryo.register(ImmutableMap.of(1, 2).getClass(), this);
kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
// TODO register required ImmutableMap variants
}
}
package org.onlab.onos.store.serializers;
import java.util.ArrayList;
import java.util.List;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableSet;
/**
* Kryo Serializer for {@link ImmutableSet}.
*/
public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
private final CollectionSerializer serializer = new CollectionSerializer();
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
kryo.writeObject(output, object.asList(), serializer);
}
@Override
public ImmutableSet<?> read(Kryo kryo, Input input,
Class<ImmutableSet<?>> type) {
List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
return ImmutableSet.copyOf(elms);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableSet.of().getClass(), this);
kryo.register(ImmutableSet.of(1).getClass(), this);
kryo.register(ImmutableSet.of(1, 2).getClass(), this);
// TODO register required ImmutableSet variants
}
}
package org.onlab.onos.store.serializers;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
import de.javakaffee.kryoserializers.URISerializer;
public class KryoSerializerTests {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber P1 = portNumber(1);
private static final PortNumber P2 = portNumber(2);
private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
private static final String MFR = "whitebox";
private static final String HW = "1.1.x";
private static final String SW1 = "3.8.1";
private static final String SW2 = "3.9.5";
private static final String SN = "43311-12345";
private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
private static KryoPool kryos;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kryos = KryoPool.newBuilder()
.register(
ArrayList.class,
HashMap.class
)
.register(
Device.Type.class,
Link.Type.class
// ControllerNode.State.class,
// DefaultControllerNode.class,
// MastershipRole.class,
// Port.class,
// Element.class,
)
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(IpPrefix.class, new IpPrefixSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DefaultDevice.class)
.register(URI.class, new URISerializer())
.build();
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
// removing Kryo instance to use fresh Kryo on each tests
kryos.getKryo();
}
private static <T> void testSerialized(T original) {
ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
kryos.serialize(original, buffer);
buffer.flip();
T copy = kryos.deserialize(buffer);
new EqualsTester()
.addEqualityGroup(original, copy)
.testEquals();
}
@Test
public final void test() {
testSerialized(new ConnectPoint(DID1, P1));
testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
testSerialized(new DefaultPort(DEV1, P1, true));
testSerialized(DID1);
testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
testSerialized(ImmutableMap.of(DID1, DEV1));
testSerialized(ImmutableMap.of());
testSerialized(ImmutableSet.of(DID1, DID2));
testSerialized(ImmutableSet.of(DID1));
testSerialized(ImmutableSet.of());
testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
testSerialized(new LinkKey(CP1, CP2));
testSerialized(new NodeId("SomeNodeIdentifier"));
testSerialized(P1);
testSerialized(PID);
}
}
......@@ -239,12 +239,41 @@ public final class KryoPool {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
if (registry.getRight() == null) {
final Serializer<?> serializer = registry.getRight();
if (serializer == null) {
kryo.register(registry.getLeft());
} else {
kryo.register(registry.getLeft(), registry.getRight());
kryo.register(registry.getLeft(), serializer);
if (serializer instanceof FamilySerializer) {
FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
fser.registerFamilies(kryo);
}
}
}
return kryo;
}
/**
* Serializer implementation, which required registration of family of Classes.
* @param <T> base type of this serializer.
*/
public abstract static class FamilySerializer<T> extends Serializer<T> {
public FamilySerializer(boolean acceptsNull) {
super(acceptsNull);
}
public FamilySerializer(boolean acceptsNull, boolean immutable) {
super(acceptsNull, immutable);
}
/**
* Registers other classes this Serializer supports.
*
* @param kryo instance to register classes to
*/
public void registerFamilies(Kryo kryo) {
}
}
}
......