Yuta HIGUCHI

Kryo related changes.

Change-Id: I5b4fab63d6ece084b65aa712971a22d953d0caf0
......@@ -7,6 +7,9 @@ import java.net.URI;
*/
public final class DeviceId extends ElementId {
// Default constructor for serialization
protected DeviceId() {}
// Public construction is prohibited
private DeviceId(URI uri) {
super(uri);
......
......@@ -10,6 +10,11 @@ public abstract class ElementId {
private final URI uri;
// Default constructor for serialization
protected ElementId() {
this.uri = null;
}
/**
* Creates an element identifier using the supplied URI.
*
......
......@@ -12,6 +12,12 @@ public class ProviderId {
private final String scheme;
private final String id;
// Default constructor for serialization
protected ProviderId() {
scheme = null;
id = null;
}
/**
* Creates a new provider identifier from the specified string.
* The providers are expected to follow the reverse DNS convention, e.g.
......
......@@ -25,6 +25,14 @@
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
......
package org.onlab.onos.store.device.impl;
import java.util.ArrayList;
import java.util.Collection;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.PortNumber;
import org.onlab.packet.IpPrefix;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableSet;
// TODO move to util, etc.
public final class DefaultPortSerializer extends
Serializer<DefaultPort> {
private final CollectionSerializer ipAddrSerializer
= new CollectionSerializer(IpPrefix.class,
new IpPrefixSerializer(), false);
public DefaultPortSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, DefaultPort object) {
kryo.writeClassAndObject(output, object.element());
kryo.writeObject(output, object.number());
output.writeBoolean(object.isEnabled());
kryo.writeObject(output, object.ipAddresses(),
ipAddrSerializer);
}
@Override
public DefaultPort read(Kryo kryo, Input input,
Class<DefaultPort> type) {
Element element = (Element) kryo.readClassAndObject(input);
PortNumber number = kryo.readObject(input, PortNumber.class);
boolean isEnabled = input.readBoolean();
@SuppressWarnings("unchecked")
Collection<IpPrefix> ipAddresses = kryo.readObject(
input, ArrayList.class, ipAddrSerializer);
return new DefaultPort(element, number, isEnabled,
ImmutableSet.copyOf(ipAddresses));
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.packet.IpPrefix;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
// TODO move to util, etc.
public final class IpPrefixSerializer extends Serializer<IpPrefix> {
public IpPrefixSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output,
IpPrefix object) {
byte[] octs = object.toOctets();
output.writeInt(octs.length);
output.writeBytes(octs);
output.writeInt(object.prefixLength());
}
@Override
public IpPrefix read(Kryo kryo, Input input,
Class<IpPrefix> type) {
int octLen = input.readInt();
byte[] octs = new byte[octLen];
input.read(octs);
int prefLen = input.readInt();
return IpPrefix.valueOf(octs, prefLen);
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.PortNumber;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
// TODO move to util, etc.
public final class PortNumberSerializer extends
Serializer<PortNumber> {
public PortNumberSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, PortNumber object) {
output.writeLong(object.toLong());
}
@Override
public PortNumber read(Kryo kryo, Input input,
Class<PortNumber> type) {
return PortNumber.portNumber(input.readLong());
}
}
......@@ -128,6 +128,28 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.24.0</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.27</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!-- ONOS related -->
<dependency>
<groupId>org.onlab.onos</groupId>
......
......@@ -30,6 +30,14 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
......
package org.onlab.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.Pair;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
// TODO Add tests for this class.
/**
* Pool of Kryo instances, with classes pre-registered.
*/
//@ThreadSafe
public final class KryoPool {
/**
* Default buffer size used for serialization.
*
* @see #serialize(Object)
*/
public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
private final boolean registrationRequired;
/**
* KryoPool builder.
*/
//@NotThreadSafe
public static final class Builder {
private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
/**
* Builds a {@link KryoPool} instance.
*
* @return KryoPool
*/
public KryoPool build() {
return new KryoPool(types);
}
/**
* Registers classes to be serialized using Kryo default serializer.
*
* @param expectedTypes list of classes
* @return this
*/
public Builder register(final Class<?>... expectedTypes) {
for (Class<?> clazz : expectedTypes) {
types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
}
return this;
}
/**
* Registers a class and it's serializer.
*
* @param clazz the class to register
* @param serializer serializer to use for the class
* @return this
*/
public Builder register(final Class<?> clazz, Serializer<?> serializer) {
types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
return this;
}
/**
* Registers all the class registered to given KryoPool.
*
* @param pool KryoPool
* @return this
*/
public Builder register(final KryoPool pool) {
types.addAll(pool.registeredTypes);
return this;
}
}
/**
* Creates a new {@link KryoPool} builder.
*
* @return builder
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Creates a Kryo instance pool.
*
* @param registerdTypes types to register
*/
private KryoPool(final List<Pair<Class<?>, Serializer<?>>> registerdTypes) {
this.registeredTypes = ImmutableList.copyOf(registerdTypes);
// always true for now
this.registrationRequired = true;
}
/**
* Populates the Kryo pool.
*
* @param instances to add to the pool
* @return this
*/
public KryoPool populate(int instances) {
List<Kryo> kryos = new ArrayList<>(instances);
for (int i = 0; i < instances; ++i) {
kryos.add(newKryoInstance());
}
pool.addAll(kryos);
return this;
}
/**
* Gets a Kryo instance from the pool.
*
* @return Kryo instance
*/
public Kryo getKryo() {
Kryo kryo = pool.poll();
if (kryo == null) {
return newKryoInstance();
}
return kryo;
}
/**
* Returns a Kryo instance to the pool.
*
* @param kryo instance obtained from this pool.
*/
public void putKryo(Kryo kryo) {
if (kryo != null) {
pool.add(kryo);
}
}
/**
* Serializes given object to byte array using Kryo instance in pool.
* <p>
* Note: Serialized bytes must be smaller than {@link #DEFAULT_BUFFER_SIZE}.
*
* @param obj Object to serialize
* @return serialized bytes
*/
public byte[] serialize(final Object obj) {
return serialize(obj, DEFAULT_BUFFER_SIZE);
}
/**
* Serializes given object to byte array using Kryo instance in pool.
*
* @param obj Object to serialize
* @param bufferSize maximum size of serialized bytes
* @return serialized bytes
*/
public byte[] serialize(final Object obj, final int bufferSize) {
Output out = new Output(bufferSize);
Kryo kryo = getKryo();
try {
kryo.writeClassAndObject(out, obj);
return out.toBytes();
} finally {
putKryo(kryo);
}
}
/**
* Deserializes given byte array to Object using Kryo instance in pool.
*
* @param bytes serialized bytes
* @param <T> deserialized Object type
* @return deserialized Object
*/
public <T> T deserialize(final byte[] bytes) {
Input in = new Input(bytes);
Kryo kryo = getKryo();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
}
}
/**
* Creates a Kryo instance with {@link #registeredTypes} pre-registered.
*
* @return Kryo instance
*/
private Kryo newKryoInstance() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
if (registry.getRight() == null) {
kryo.register(registry.getLeft());
} else {
kryo.register(registry.getLeft(), registry.getRight());
}
}
return kryo;
}
}