Yuta HIGUCHI

Fix for Kryo related issue crossing OSGi bundle boundaries.

Change-Id: I121dfe360de14a5b4760e5d2fd8cb2db93e0be63
......@@ -7,9 +7,6 @@ import java.net.URI;
*/
public final class DeviceId extends ElementId {
// Default constructor for serialization
protected DeviceId() {}
// Public construction is prohibited
private DeviceId(URI uri) {
super(uri);
......
......@@ -12,12 +12,6 @@ public class ProviderId {
private final String scheme;
private final String id;
// Default constructor for serialization
protected ProviderId() {
scheme = null;
id = null;
}
/**
* Creates a new provider identifier from the specified string.
* The providers are expected to follow the reverse DNS convention, e.g.
......@@ -40,6 +34,15 @@ public class ProviderId {
return scheme;
}
/**
* Returns the device URI scheme specific id portion.
*
* @return id
*/
public String id() {
return id;
}
@Override
public int hashCode() {
return Objects.hash(scheme, id);
......
......@@ -112,7 +112,7 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
dstore.theInstance.shutdown();
((TestDistributedDeviceStore) dstore).shutdownHz();
}
private void connectDevice(DeviceId deviceId, String swVersion) {
......@@ -290,5 +290,12 @@ public class DistributedDeviceManagerTest {
}
};
}
/**
* Shutdowns the hazelcast instance.
*/
public void shutdownHz() {
theInstance.shutdown();
}
}
}
......
......@@ -16,6 +16,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableSet;
// TODO move to util, etc.
/**
* Kryo Serializer for {@link DefaultPort}.
*/
public final class DefaultPortSerializer extends
Serializer<DefaultPort> {
......@@ -23,6 +26,9 @@ public final class DefaultPortSerializer extends
= new CollectionSerializer(IpPrefix.class,
new IpPrefixSerializer(), false);
/**
* Default constructor.
*/
public DefaultPortSerializer() {
// non-null, immutable
super(false, true);
......
......@@ -44,6 +44,10 @@ import org.onlab.onos.store.StoreService;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
......@@ -68,103 +72,26 @@ import de.javakaffee.kryoserializers.URISerializer;
@Service
public class DistributedDeviceStore implements DeviceStore {
/**
* An IMap EntryListener, which reflects each remote event to cache.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public static final class RemoteEventHandler<K, V> extends
EntryAdapter<byte[], byte[]> {
private LoadingCache<K, Optional<V>> cache;
/**
* Constructor.
*
* @param cache cache to update
*/
public RemoteEventHandler(
LoadingCache<K, Optional<V>> cache) {
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
cache.invalidateAll();
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
cache.put(POOL.<K>deserialize(event.getKey()),
Optional.of(POOL.<V>deserialize(
event.getValue())));
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
cache.invalidate(POOL.<DeviceId>deserialize(event.getKey()));
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
entryUpdated(event);
}
}
/**
* CacheLoader to wrap Map value with Optional,
* to handle negative hit on underlying IMap.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public static final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(IMap<byte[], byte[]> rawMap) {
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = deserialize(valBytes);
return Optional.of(dev);
}
}
private final Logger log = getLogger(getClass());
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
// FIXME Slice out types used in common to separate pool/namespace.
private static final KryoPool POOL = KryoPool.newBuilder()
.register(URI.class, new URISerializer())
.register(
ArrayList.class,
HashMap.class,
ProviderId.class,
Device.Type.class,
DeviceId.class,
DefaultDevice.class,
MastershipRole.class,
HashMap.class,
Port.class,
Element.class
)
.register(URI.class, new URISerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.build()
......@@ -190,7 +117,7 @@ public class DistributedDeviceStore implements DeviceStore {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
/*protected*/public HazelcastInstance theInstance;
protected HazelcastInstance theInstance;
@Activate
......@@ -517,4 +444,94 @@ public class DistributedDeviceStore implements DeviceStore {
return POOL.deserialize(bytes);
}
public static final class DeviceIdSerializer extends Serializer<DeviceId> {
@Override
public void write(Kryo kryo, Output output, DeviceId object) {
kryo.writeObject(output, object.uri());
}
@Override
public DeviceId read(Kryo kryo, Input input, Class<DeviceId> type) {
final URI uri = kryo.readObject(input, URI.class);
return DeviceId.deviceId(uri);
}
}
/**
* An IMap EntryListener, which reflects each remote event to cache.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public static final class RemoteEventHandler<K, V> extends
EntryAdapter<byte[], byte[]> {
private LoadingCache<K, Optional<V>> cache;
/**
* Constructor.
*
* @param cache cache to update
*/
public RemoteEventHandler(
LoadingCache<K, Optional<V>> cache) {
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
cache.invalidateAll();
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
cache.put(POOL.<K>deserialize(event.getKey()),
Optional.of(POOL.<V>deserialize(
event.getValue())));
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
cache.invalidate(POOL.<DeviceId>deserialize(event.getKey()));
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
entryUpdated(event);
}
}
/**
* CacheLoader to wrap Map value with Optional,
* to handle negative hit on underlying IMap.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public static final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(IMap<byte[], byte[]> rawMap) {
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = deserialize(valBytes);
return Optional.of(dev);
}
}
}
......
......@@ -8,8 +8,14 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
// TODO move to util, etc.
/**
* Kryo Serializer for {@link IpPrefix}.
*/
public final class IpPrefixSerializer extends Serializer<IpPrefix> {
/**
* Default constructor.
*/
public IpPrefixSerializer() {
// non-null, immutable
super(false, true);
......
......@@ -8,9 +8,15 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
// TODO move to util, etc.
/**
* Serializer for {@link PortNumber}.
*/
public final class PortNumberSerializer extends
Serializer<PortNumber> {
/**
* Default constructor.
*/
public PortNumberSerializer() {
// non-null, immutable
super(false, true);
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.provider.ProviderId;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
//TODO move to util, etc.
/**
* Serializer for {@link ProviderId}.
*/
public class ProviderIdSerializer extends Serializer<ProviderId> {
/**
* Default constructor.
*/
public ProviderIdSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ProviderId object) {
output.writeString(object.scheme());
output.writeString(object.id());
}
@Override
public ProviderId read(Kryo kryo, Input input, Class<ProviderId> type) {
String scheme = input.readString();
String id = input.readString();
return new ProviderId(scheme, id);
}
}