Yuta HIGUCHI
Committed by Gerrit Code Review

Kryo related fixes

- KryoNamespace to allow control over registration id

Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
Showing 38 changed files with 541 additions and 355 deletions
......@@ -18,6 +18,9 @@ package org.onlab.onos.store.cluster.messaging;
import java.io.IOException;
import org.onlab.onos.cluster.NodeId;
import org.onlab.util.ByteArraySizeHashPrinter;
import com.google.common.base.MoreObjects;
// TODO: Should payload type be ByteBuffer?
/**
......@@ -78,4 +81,13 @@ public class ClusterMessage {
public void respond(byte[] data) throws IOException {
throw new IllegalStateException("One can only repond to message recived from others.");
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("sender", sender)
.add("subject", subject)
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
}
......
......@@ -34,7 +34,6 @@ import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
......@@ -67,12 +66,10 @@ public class ClusterCommunicationManager
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
.register(byte[].class)
.register(MessageSubject.class, new MessageSubjectSerializer())
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(new ClusterMessageSerializer(), ClusterMessage.class)
.register(new MessageSubjectSerializer(), MessageSubject.class)
.build();
}
};
......@@ -194,11 +191,17 @@ public class ClusterCommunicationManager
@Override
public void handle(Message message) {
final ClusterMessage clusterMessage;
try {
clusterMessage = SERIALIZER.decode(message.payload());
} catch (Exception e) {
log.error("Failed decoding ClusterMessage", e);
throw e;
}
try {
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(new InternalClusterMessage(clusterMessage, message));
} catch (Exception e) {
log.error("Exception caught during ClusterMessageHandler", e);
log.error("Exception caught handling {}", clusterMessage, e);
throw e;
}
}
......
......@@ -65,8 +65,8 @@ public class DistributedApplicationIdStore
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
......
......@@ -141,18 +141,17 @@ public class GossipDeviceStore
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
.register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
.register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(new InternalPortEventSerializer(), InternalPortEvent.class)
.register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
.build()
.populate(1);
.build();
}
};
......
......@@ -156,9 +156,9 @@ public class DistributedFlowRuleStore
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.build()
.populate(1);
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.build();
}
};
......
......@@ -123,13 +123,13 @@ public class GossipHostStore
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
.register(HostFragmentId.class)
.register(HostAntiEntropyAdvertisement.class)
.build()
.populate(1);
.build();
}
};
......
......@@ -93,8 +93,8 @@ public class DistributedIntentStore
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
......
......@@ -86,8 +86,8 @@ public class HazelcastIntentStore
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
......
......@@ -129,13 +129,13 @@ public class GossipLinkStore
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalLinkEvent.class)
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
.build()
.populate(1);
.build();
}
};
......
......@@ -84,10 +84,9 @@ public class DistributedMastershipStore
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(RoleValue.class, new RoleValueSerializer())
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(new RoleValueSerializer(), RoleValue.class)
.build();
}
};
......
......@@ -72,8 +72,8 @@ public class DistributedPacketStore
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
......
......@@ -23,13 +23,17 @@ import org.onlab.util.KryoNamespace;
public final class DistributedStoreSerializers {
public static final int STORE_CUSTOM_BEGIN = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 10;
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
public static final KryoNamespace STORE_COMMON = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.register(WallClockTimestamp.class)
.build();
......
......@@ -55,8 +55,8 @@ public class CMap<K, V> {
* Creates a CMap instance.
* It will create the table if necessary.
*
* @param dbAdminService
* @param dbService
* @param dbAdminService DatabaseAdminService to use for this instance
* @param dbService DatabaseService to use for this instance
* @param tableName table which this Map corresponds to
* @param serializer Value serializer
*/
......
......@@ -2,13 +2,6 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Vector;
import net.kuujo.copycat.cluster.TcpClusterConfig;
......@@ -40,29 +33,14 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ImmutableListSerializer;
import org.onlab.onos.store.serializers.ImmutableMapSerializer;
import org.onlab.onos.store.serializers.ImmutableSetSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.ReadStatus;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.WriteStatus;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* ONOS Cluster messaging based Copycat protocol.
*/
......@@ -88,7 +66,11 @@ public class ClusterMessagingProtocol
public static final MessageSubject COPYCAT_SUBMIT =
new MessageSubject("copycat-raft-consensus-submit");
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(PingRequest.class)
.register(PingResponse.class)
.register(PollRequest.class)
......@@ -105,53 +87,23 @@ public class ClusterMessagingProtocol
.register(TcpClusterConfig.class)
.register(TcpMember.class)
.register(LeaderElectEvent.class)
.build();
private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
.register(ReadRequest.class)
.register(WriteRequest.class)
.register(WriteRequest.Type.class)
.register(WriteResult.class)
.register(ReadResult.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
.register(WriteStatus.class)
.register(VersionedValue.class)
.build();
public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
.register(Arrays.asList().getClass(), new CollectionSerializer() {
@Override
@SuppressWarnings("rawtypes")
protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
return new ArrayList();
}
})
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
Vector.class,
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
LinkedList.class,
Collections.singletonList("").getClass(),
byte[].class)
.register(Vector.class)
.build();
// serializer used for CopyCat Protocol
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
.register(COMMON)
.register(DATABASE)
.build()
.populate(1);
.nextId(AFTER_COPYCAT)
// for snapshot
.register(State.class)
.register(TableMetadata.class)
// TODO: Move this out ?
.register(TableModificationEvent.class)
.register(TableModificationEvent.Type.class)
.build();
}
};
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Verify.verifyNotNull;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -139,7 +139,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
new ClusterMessage(
localNode.id(),
messageType(request),
verifyNotNull(SERIALIZER.encode(request)));
verifyNotNull(DB_SERIALIZER.encode(request)));
this.future = future;
}
......@@ -158,7 +158,8 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
if (!connectionOK.getAndSet(true)) {
log.info("Connectivity to {} restored", remoteNode);
}
future.complete(verifyNotNull(SERIALIZER.decode(response)));
future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
} catch (IOException | TimeoutException e) {
if (connectionOK.getAndSet(false)) {
log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
......
......@@ -66,7 +66,7 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
@Override
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
if (handler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
......@@ -117,7 +117,7 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
} else {
try {
log.trace("responding to {}", message.subject());
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to " + response.getClass().getName(), e);
}
......
......@@ -54,7 +54,7 @@ public class DatabaseClient implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
LeaderElectEvent event =
ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
TcpMember newLeader = event.leader();
long newLeaderTerm = event.term();
if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
......
......@@ -101,7 +101,7 @@ public class DatabaseEntryExpirationTracker implements
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database row deleted event.", e);
}
......
......@@ -173,7 +173,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
ClusterMessagingProtocol.DB_SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
......@@ -432,7 +432,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
ClusterMessagingProtocol.SERIALIZER.encode(event)));
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
}
} catch (Exception e) {
log.debug("LeaderAdvertiser failed with exception", e);
......@@ -454,7 +454,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
ClusterMessagingProtocol.SERIALIZER.encode(event)));
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} else {
if (myLeaderEvent != null) {
log.debug("This node is no longer the Leader");
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
......@@ -20,7 +21,6 @@ import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
......@@ -30,7 +30,6 @@ import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
......@@ -39,7 +38,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
/**
* StateMachine whose transitions are coordinated/replicated
......@@ -59,33 +57,13 @@ public class DatabaseStateMachine implements StateMachine {
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
// serializer used for snapshot
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
.register(TableMetadata.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
.register(WriteStatus.class)
// TODO: Move this out ?
.register(TableModificationEvent.class)
.register(TableModificationEvent.Type.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
}
};
private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
private boolean compressSnapshot = false;
// TODO make this configurable
private boolean compressSnapshot = true;
@Command
public boolean createTable(String tableName) {
......@@ -402,14 +380,14 @@ public class DatabaseStateMachine implements StateMachine {
public byte[] takeSnapshot() {
try {
if (compressSnapshot) {
byte[] input = SERIALIZER.encode(state);
byte[] input = DB_SERIALIZER.encode(state);
ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
compressor.write(input, 0, input.length);
compressor.close();
return comp.toByteArray();
} else {
return SERIALIZER.encode(state);
return DB_SERIALIZER.encode(state);
}
} catch (Exception e) {
log.error("Failed to take snapshot", e);
......@@ -423,10 +401,9 @@ public class DatabaseStateMachine implements StateMachine {
if (compressSnapshot) {
ByteArrayInputStream in = new ByteArrayInputStream(data);
InflaterInputStream decompressor = new InflaterInputStream(in);
ByteStreams.toByteArray(decompressor);
this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
this.state = DB_SERIALIZER.decode(decompressor);
} else {
this.state = SERIALIZER.decode(data);
this.state = DB_SERIALIZER.decode(data);
}
updatesExecutor.submit(new Runnable() {
......
......@@ -149,7 +149,7 @@ public class DistributedLockManager implements LockService {
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
.decode(message.payload());
if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
......
......@@ -90,9 +90,9 @@ public class DistributedStatisticStore implements StatisticStore {
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
.build()
.populate(1);
.build();
}
};;
......
......@@ -95,7 +95,7 @@ public class MastershipBasedTimestampTest {
public final void testKryoSerializableWithHandcraftedSerializer() {
final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
final KryoNamespace kryos = KryoNamespace.newBuilder()
.register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.build();
kryos.serialize(TS_1_2, buffer);
......
......@@ -22,7 +22,7 @@ import com.google.common.testing.EqualsTester;
*/
public class MapDBLogTest {
private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.DB_SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.serializers;
import java.util.ArrayList;
import java.util.List;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link java.util.Arrays#asList(Object...)}.
*/
public final class ArraysAsListSerializer extends Serializer<List<?>> {
@Override
public void write(Kryo kryo, Output output, List<?> object) {
output.writeInt(object.size(), true);
for (Object elm : object) {
kryo.writeClassAndObject(output, elm);
}
}
@Override
public List<?> read(Kryo kryo, Input input, Class<List<?>> type) {
final int size = input.readInt(true);
List<Object> list = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
list.add(kryo.readClassAndObject(input));
}
return list;
}
}
......@@ -45,7 +45,7 @@ public class DefaultLinkSerializer extends Serializer<DefaultLink> {
kryo.writeClassAndObject(output, object.dst());
kryo.writeClassAndObject(output, object.type());
kryo.writeClassAndObject(output, object.state());
kryo.writeClassAndObject(output, object.isDurable());
output.writeBoolean(object.isDurable());
}
@Override
......@@ -55,7 +55,7 @@ public class DefaultLinkSerializer extends Serializer<DefaultLink> {
ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
Type linkType = (Type) kryo.readClassAndObject(input);
State state = (State) kryo.readClassAndObject(input);
boolean isDurable = (boolean) kryo.readClassAndObject(input);
boolean isDurable = input.readBoolean();
return new DefaultLink(providerId, src, dst, linkType, state, isDurable);
}
}
......
......@@ -15,9 +15,8 @@
*/
package org.onlab.onos.store.serializers;
import org.onlab.util.KryoNamespace.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
......@@ -26,7 +25,7 @@ import com.google.common.collect.ImmutableList.Builder;
/**
* Creates {@link ImmutableList} serializer instance.
*/
public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
public class ImmutableListSerializer extends Serializer<ImmutableList<?>> {
/**
* Creates {@link ImmutableList} serializer instance.
......@@ -53,12 +52,4 @@ public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>>
}
return builder.build();
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableList.of(1).getClass(), this);
kryo.register(ImmutableList.of(1, 2).getClass(), this);
// TODO register required ImmutableList variants
}
}
......
......@@ -19,9 +19,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.onlab.util.KryoNamespace.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
......@@ -30,7 +29,7 @@ import com.google.common.collect.ImmutableMap;
/**
* Kryo Serializer for {@link ImmutableMap}.
*/
public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
public class ImmutableMapSerializer extends Serializer<ImmutableMap<?, ?>> {
private final MapSerializer mapSerializer = new MapSerializer();
......@@ -56,12 +55,4 @@ public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>>
Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
return ImmutableMap.copyOf(map);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableMap.of().getClass(), this);
kryo.register(ImmutableMap.of(1, 2).getClass(), this);
kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
// TODO register required ImmutableMap variants
}
}
......
......@@ -18,9 +18,8 @@ package org.onlab.onos.store.serializers;
import java.util.ArrayList;
import java.util.List;
import org.onlab.util.KryoNamespace.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
......@@ -29,7 +28,7 @@ import com.google.common.collect.ImmutableSet;
/**
* Kryo Serializer for {@link ImmutableSet}.
*/
public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
public class ImmutableSetSerializer extends Serializer<ImmutableSet<?>> {
private final CollectionSerializer serializer = new CollectionSerializer();
......@@ -39,6 +38,7 @@ public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
serializer.setElementsCanBeNull(false);
}
@Override
......@@ -52,12 +52,4 @@ public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
return ImmutableSet.copyOf(elms);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableSet.of().getClass(), this);
kryo.register(ImmutableSet.of(1).getClass(), this);
kryo.register(ImmutableSet.of(1, 2).getClass(), this);
// TODO register required ImmutableSet variants
}
}
......
......@@ -19,6 +19,7 @@ import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
......@@ -99,6 +100,15 @@ import org.onlab.onos.net.resource.LambdaResourceAllocation;
import org.onlab.onos.net.resource.LambdaResourceRequest;
import org.onlab.onos.net.resource.LinkResourceRequest;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.ReadStatus;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.WriteStatus;
import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress;
import org.onlab.packet.Ip4Address;
......@@ -117,41 +127,62 @@ import com.google.common.collect.ImmutableSet;
public final class KryoNamespaces {
public static final KryoNamespace BASIC = KryoNamespace.newBuilder()
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
.nextId(KryoNamespace.FLOATING_ID)
.register(byte[].class)
.register(new ImmutableListSerializer(),
ImmutableList.class,
ImmutableList.of(1).getClass(),
ImmutableList.of(1, 2).getClass())
.register(new ImmutableSetSerializer(),
ImmutableSet.class,
ImmutableSet.of().getClass(),
ImmutableSet.of(1).getClass(),
ImmutableSet.of(1, 2).getClass())
.register(new ImmutableMapSerializer(),
ImmutableMap.class,
ImmutableMap.of().getClass(),
ImmutableMap.of("a", 1).getClass(),
ImmutableMap.of("R", 2, "D", 2).getClass())
.register(HashMap.class)
.register(ArrayList.class,
LinkedList.class,
byte[].class,
Duration.class
HashSet.class
)
.register(new ArraysAsListSerializer(), Arrays.asList().getClass())
.register(Collections.singletonList(1).getClass())
.register(Duration.class)
.build();
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
public static final KryoNamespace MISC = KryoNamespace.newBuilder()
.register(IpPrefix.class, new IpPrefixSerializer())
.register(Ip4Prefix.class, new Ip4PrefixSerializer())
.register(Ip6Prefix.class, new Ip6PrefixSerializer())
.register(IpAddress.class, new IpAddressSerializer())
.register(Ip4Address.class, new Ip4AddressSerializer())
.register(Ip6Address.class, new Ip6AddressSerializer())
.register(MacAddress.class, new MacAddressSerializer())
.nextId(KryoNamespace.FLOATING_ID)
.register(new IpPrefixSerializer(), IpPrefix.class)
.register(new Ip4PrefixSerializer(), Ip4Prefix.class)
.register(new Ip6PrefixSerializer(), Ip6Prefix.class)
.register(new IpAddressSerializer(), IpAddress.class)
.register(new Ip4AddressSerializer(), Ip4Address.class)
.register(new Ip6AddressSerializer(), Ip6Address.class)
.register(new MacAddressSerializer(), MacAddress.class)
.register(VlanId.class)
.build();
/**
* Kryo registration Id for user custom registration.
*/
public static final int BEGIN_USER_CUSTOM_ID = 300;
// TODO: Populate other classes
/**
* KryoNamespace which can serialize API bundle classes.
*/
public static final KryoNamespace API = KryoNamespace.newBuilder()
.register(MISC)
.nextId(KryoNamespace.INITIAL_ID)
.register(BASIC)
.nextId(KryoNamespace.INITIAL_ID + 30)
.register(MISC)
.nextId(KryoNamespace.INITIAL_ID + 30 + 10)
.register(
ControllerNode.State.class,
Device.Type.class,
......@@ -242,19 +273,29 @@ public final class KryoNamespaces {
AnnotationConstraint.class,
BooleanConstraint.class
)
.register(DefaultApplicationId.class, new DefaultApplicationIdSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.register(HostLocation.class, new HostLocationSerializer())
.register(DefaultOutboundPacket.class, new DefaultOutboundPacketSerializer())
.register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
.register(new URISerializer(), URI.class)
.register(new NodeIdSerializer(), NodeId.class)
.register(new ProviderIdSerializer(), ProviderId.class)
.register(new DeviceIdSerializer(), DeviceId.class)
.register(new PortNumberSerializer(), PortNumber.class)
.register(new DefaultPortSerializer(), DefaultPort.class)
.register(new LinkKeySerializer(), LinkKey.class)
.register(new ConnectPointSerializer(), ConnectPoint.class)
.register(new DefaultLinkSerializer(), DefaultLink.class)
.register(new MastershipTermSerializer(), MastershipTerm.class)
.register(new HostLocationSerializer(), HostLocation.class)
.register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
.register(ReadRequest.class)
.register(WriteRequest.class)
.register(WriteRequest.Type.class)
.register(WriteResult.class)
.register(ReadResult.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
.register(WriteStatus.class)
.register(VersionedValue.class)
.build();
......
......@@ -15,10 +15,14 @@
*/
package org.onlab.onos.store.serializers;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
import com.google.common.base.MoreObjects;
/**
* StoreSerializer implementation using Kryo.
*/
......@@ -36,8 +40,8 @@ public class KryoSerializer implements StoreSerializer {
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
@Override
......@@ -63,4 +67,20 @@ public class KryoSerializer implements StoreSerializer {
return serializerPool.deserialize(buffer);
}
@Override
public void encode(Object obj, OutputStream stream) {
serializerPool.serialize(obj, stream);
}
@Override
public <T> T decode(InputStream stream) {
return serializerPool.deserialize(stream);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("serializerPool", serializerPool)
.toString();
}
}
......
......@@ -15,6 +15,8 @@
*/
package org.onlab.onos.store.serializers;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
......@@ -40,6 +42,14 @@ public interface StoreSerializer {
public void encode(final Object obj, ByteBuffer buffer);
/**
* Serializes the specified object into bytes.
*
* @param obj object to be serialized
* @param stream to write serialized bytes
*/
public void encode(final Object obj, final OutputStream stream);
/**
* Deserializes the specified bytes into an object.
*
* @param bytes bytes to be deserialized
......@@ -56,4 +66,13 @@ public interface StoreSerializer {
* @param <T> decoded type
*/
public <T> T decode(final ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object.
*
* @param stream stream containing the bytes to be deserialized
* @return deserialized object
* @param <T> decoded type
*/
public <T> T decode(final InputStream stream);
}
......
......@@ -70,6 +70,7 @@ import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
......@@ -121,8 +122,8 @@ public class KryoSerializerTest {
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
}
......@@ -326,6 +327,11 @@ public class KryoSerializerTest {
}
@Test
public void testArraysAsList() {
testSerializedEquals(Arrays.asList(1, 2, 3));
}
@Test
public void testAnnotationConstraint() {
testSerializable(new AnnotationConstraint("distance", 100.0));
}
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-of-providers</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-of-provider-host</artifactId>
<packaging>bundle</packaging>
<description>ONOS OpenFlow protocol host provider</description>
</project>
......@@ -15,10 +15,11 @@
*/
package org.onlab.util;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.Pair;
......@@ -27,15 +28,17 @@ import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.pool.KryoCallback;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
// TODO Add tests for this class.
/**
* Pool of Kryo instances, with classes pre-registered.
*/
//@ThreadSafe
public final class KryoNamespace implements KryoFactory {
public final class KryoNamespace implements KryoFactory, KryoPool {
/**
* Default buffer size used for serialization.
......@@ -45,17 +48,35 @@ public final class KryoNamespace implements KryoFactory {
public static final int DEFAULT_BUFFER_SIZE = 4096;
public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
/**
* ID to use if this KryoNamespace does not define registration id.
*/
public static final int FLOATING_ID = -1;
/**
* Smallest ID free to use for user defined registrations.
*/
public static final int INITIAL_ID = 11;
private final KryoPool pool = new KryoPool.Builder(this)
.softReferences()
.build();
private final ImmutableList<RegistrationBlock> registeredBlocks;
private final boolean registrationRequired;
/**
* KryoNamespace builder.
*/
//@NotThreadSafe
public static final class Builder {
private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
private int blockHeadId = INITIAL_ID;
private List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
private List<RegistrationBlock> blocks = new ArrayList<>();
private boolean registrationRequired = true;
/**
......@@ -64,7 +85,27 @@ public final class KryoNamespace implements KryoFactory {
* @return KryoNamespace
*/
public KryoNamespace build() {
return new KryoNamespace(types, registrationRequired);
if (!types.isEmpty()) {
blocks.add(new RegistrationBlock(this.blockHeadId, types));
}
return new KryoNamespace(blocks, registrationRequired).populate(1);
}
/**
* Sets the next Kryo registration Id for following register entries.
*
* @param id Kryo registration Id
* @return this
*
* @see Kryo#register(Class, Serializer, int)
*/
public Builder nextId(final int id) {
if (!types.isEmpty()) {
blocks.add(new RegistrationBlock(this.blockHeadId, types));
types = new ArrayList<>();
}
this.blockHeadId = id;
return this;
}
/**
......@@ -75,7 +116,7 @@ public final class KryoNamespace implements KryoFactory {
*/
public Builder register(final Class<?>... expectedTypes) {
for (Class<?> clazz : expectedTypes) {
types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
types.add(Pair.of(clazz, null));
}
return this;
}
......@@ -83,26 +124,54 @@ public final class KryoNamespace implements KryoFactory {
/**
* Registers a class and it's serializer.
*
* @param clazz the class to register
* @param classes list of classes to register
* @param serializer serializer to use for the class
* @return this
*/
public Builder register(final Class<?> clazz, Serializer<?> serializer) {
types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
public Builder register(Serializer<?> serializer, final Class<?>... classes) {
for (Class<?> clazz : classes) {
types.add(Pair.of(clazz, serializer));
}
return this;
}
private Builder register(RegistrationBlock block) {
if (block.begin() != FLOATING_ID) {
// flush pending types
nextId(block.begin());
blocks.add(block);
nextId(block.begin() + block.types().size());
} else {
// flush pending types
final int addedBlockBegin = blockHeadId + types.size();
nextId(addedBlockBegin);
blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
nextId(addedBlockBegin + block.types().size());
}
return this;
}
/**
* Registers all the class registered to given KryoNamespace.
*
* @param pool KryoNamespace
* @param ns KryoNamespace
* @return this
*/
public Builder register(final KryoNamespace pool) {
types.addAll(pool.registeredTypes);
public Builder register(final KryoNamespace ns) {
for (RegistrationBlock block : ns.registeredBlocks) {
this.register(block);
}
return this;
}
/**
* Sets the registrationRequired flag.
*
* @param registrationRequired Kryo's registrationRequired flag
* @return this
*
* @see Kryo#setRegistrationRequired(boolean)
*/
public Builder setRegistrationRequired(boolean registrationRequired) {
this.registrationRequired = registrationRequired;
return this;
......@@ -124,8 +193,8 @@ public final class KryoNamespace implements KryoFactory {
* @param registeredTypes types to register
* @param registrationRequired
*/
private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registeredTypes, boolean registrationRequired) {
this.registeredTypes = ImmutableList.copyOf(registeredTypes);
private KryoNamespace(final List<RegistrationBlock> registeredTypes, boolean registrationRequired) {
this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
this.registrationRequired = registrationRequired;
}
......@@ -136,39 +205,14 @@ public final class KryoNamespace implements KryoFactory {
* @return this
*/
public KryoNamespace populate(int instances) {
List<Kryo> kryos = new ArrayList<>(instances);
for (int i = 0; i < instances; ++i) {
kryos.add(create());
release(create());
}
pool.addAll(kryos);
return this;
}
/**
* Gets a Kryo instance from the pool.
*
* @return Kryo instance
*/
public Kryo getKryo() {
Kryo kryo = pool.poll();
if (kryo == null) {
return create();
}
return kryo;
}
/**
* Returns a Kryo instance to the pool.
*
* @param kryo instance obtained from this pool.
*/
public void putKryo(Kryo kryo) {
if (kryo != null) {
pool.add(kryo);
}
}
/**
* Serializes given object to byte array using Kryo instance in pool.
* <p>
* Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
......@@ -189,13 +233,13 @@ public final class KryoNamespace implements KryoFactory {
*/
public byte[] serialize(final Object obj, final int bufferSize) {
ByteBufferOutput out = new ByteBufferOutput(bufferSize, MAX_BUFFER_SIZE);
Kryo kryo = getKryo();
Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
return out.toBytes();
} finally {
putKryo(kryo);
release(kryo);
}
}
......@@ -207,12 +251,40 @@ public final class KryoNamespace implements KryoFactory {
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
Kryo kryo = getKryo();
Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
} finally {
release(kryo);
}
}
/**
* Serializes given object to OutputStream using Kryo instance in pool.
*
* @param obj Object to serialize
* @param stream to write to
*/
public void serialize(final Object obj, final OutputStream stream) {
serialize(obj, stream, DEFAULT_BUFFER_SIZE);
}
/**
* Serializes given object to OutputStream using Kryo instance in pool.
*
* @param obj Object to serialize
* @param stream to write to
* @param bufferSize size of the buffer in front of the stream
*/
public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
} finally {
putKryo(kryo);
release(kryo);
}
}
......@@ -225,13 +297,13 @@ public final class KryoNamespace implements KryoFactory {
*/
public <T> T deserialize(final byte[] bytes) {
Input in = new Input(bytes);
Kryo kryo = getKryo();
Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
release(kryo);
}
}
......@@ -244,18 +316,49 @@ public final class KryoNamespace implements KryoFactory {
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
Kryo kryo = getKryo();
Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
release(kryo);
}
}
/**
* Deserializes given InputStream to an Object using Kryo instance in pool.
*
* @param stream input stream
* @param <T> deserialized Object type
* @return deserialized Object
*/
public <T> T deserialize(final InputStream stream) {
return deserialize(stream, DEFAULT_BUFFER_SIZE);
}
/**
* Deserializes given InputStream to an Object using Kryo instance in pool.
*
* @param stream input stream
* @param <T> deserialized Object type
* @return deserialized Object
* @param bufferSize size of the buffer in front of the stream
*/
public <T> T deserialize(final InputStream stream, final int bufferSize) {
ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
release(kryo);
}
}
/**
* Creates a Kryo instance with {@link #registeredTypes} pre-registered.
* Creates a Kryo instance.
*
* @return Kryo instance
*/
......@@ -263,42 +366,68 @@ public final class KryoNamespace implements KryoFactory {
public Kryo create() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
final Serializer<?> serializer = registry.getRight();
for (RegistrationBlock block : registeredBlocks) {
int id = block.begin();
if (id == FLOATING_ID) {
id = kryo.getNextRegistrationId();
}
for (Pair<Class<?>, Serializer<?>> entry : block.types()) {
final Serializer<?> serializer = entry.getRight();
if (serializer == null) {
kryo.register(registry.getLeft());
kryo.register(entry.getLeft(), id++);
} else {
kryo.register(registry.getLeft(), serializer);
if (serializer instanceof FamilySerializer) {
FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
fser.registerFamilies(kryo);
kryo.register(entry.getLeft(), serializer, id++);
}
}
}
return kryo;
}
/**
* Serializer implementation, which required registration of family of Classes.
* @param <T> base type of this serializer.
*/
public abstract static class FamilySerializer<T> extends Serializer<T> {
@Override
public Kryo borrow() {
return pool.borrow();
}
@Override
public void release(Kryo kryo) {
pool.release(kryo);
}
@Override
public <T> T run(KryoCallback<T> callback) {
return pool.run(callback);
}
public FamilySerializer(boolean acceptsNull) {
super(acceptsNull);
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("registeredBlocks", registeredBlocks)
.toString();
}
public FamilySerializer(boolean acceptsNull, boolean immutable) {
super(acceptsNull, immutable);
static final class RegistrationBlock {
private final int begin;
private final ImmutableList<Pair<Class<?>, Serializer<?>>> types;
public RegistrationBlock(int begin, List<Pair<Class<?>, Serializer<?>>> types) {
this.begin = begin;
this.types = ImmutableList.copyOf(types);
}
/**
* Registers other classes this Serializer supports.
*
* @param kryo instance to register classes to
*/
public void registerFamilies(Kryo kryo) {
public int begin() {
return begin;
}
public ImmutableList<Pair<Class<?>, Serializer<?>>> types() {
return types;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("begin", begin)
.add("types", types)
.toString();
}
}
}
......
......@@ -22,20 +22,11 @@ import com.google.common.base.MoreObjects;
/**
* Representation of a TCP/UDP communication end point.
*/
public class Endpoint {
public final class Endpoint {
private final int port;
private final String host;
/**
* Used for serialization.
*/
@SuppressWarnings("unused")
private Endpoint() {
port = 0;
host = null;
}
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
......
......@@ -34,6 +34,13 @@ public final class InternalMessage implements Message {
// Must be created using the Builder.
private InternalMessage() {}
InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
this.payload = payload;
}
public long id() {
return id;
}
......
......@@ -17,11 +17,13 @@ package org.onlab.netty;
import org.onlab.util.KryoNamespace;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
//FIXME: Should be move out to test or app
/**
* Kryo Serializer.
*/
......@@ -37,17 +39,11 @@ public class KryoSerializer {
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoNamespace.newBuilder()
.register(ArrayList.class,
HashMap.class,
ArrayList.class,
InternalMessage.class,
Endpoint.class,
byte[].class
)
.build()
.populate(1);
.register(byte[].class)
.register(new InternalMessageSerializer(), InternalMessage.class)
.register(new EndPointSerializer(), Endpoint.class)
.build();
}
......@@ -66,4 +62,45 @@ public class KryoSerializer {
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
public static final class InternalMessageSerializer
extends Serializer<InternalMessage> {
@Override
public void write(Kryo kryo, Output output, InternalMessage object) {
output.writeLong(object.id());
kryo.writeClassAndObject(output, object.sender());
output.writeString(object.type());
output.writeInt(object.payload().length, true);
output.writeBytes(object.payload());
}
@Override
public InternalMessage read(Kryo kryo, Input input,
Class<InternalMessage> type) {
long id = input.readLong();
Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
String msgtype = input.readString();
int length = input.readInt(true);
byte[] payload = input.readBytes(length);
return new InternalMessage(id, sender, msgtype, payload);
}
}
public static final class EndPointSerializer extends Serializer<Endpoint> {
@Override
public void write(Kryo kryo, Output output, Endpoint object) {
output.writeString(object.host());
output.writeInt(object.port());
}
@Override
public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
String host = input.readString();
int port = input.readInt();
return new Endpoint(host, port);
}
}
}
......