Slice out SerializationService from StoreService
Change-Id: I23a3616179fd5626277037fd0014670da8e15da4
Showing
14 changed files
with
187 additions
and
123 deletions
| ... | @@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription; | ... | @@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription; |
| 33 | import org.onlab.onos.net.provider.AbstractProvider; | 33 | import org.onlab.onos.net.provider.AbstractProvider; |
| 34 | import org.onlab.onos.net.provider.ProviderId; | 34 | import org.onlab.onos.net.provider.ProviderId; |
| 35 | import org.onlab.onos.store.common.StoreManager; | 35 | import org.onlab.onos.store.common.StoreManager; |
| 36 | +import org.onlab.onos.store.common.StoreService; | ||
| 36 | import org.onlab.onos.store.common.TestStoreManager; | 37 | import org.onlab.onos.store.common.TestStoreManager; |
| 37 | import org.onlab.onos.store.device.impl.DistributedDeviceStore; | 38 | import org.onlab.onos.store.device.impl.DistributedDeviceStore; |
| 39 | +import org.onlab.onos.store.serializers.KryoSerializationManager; | ||
| 40 | +import org.onlab.onos.store.serializers.KryoSerializationService; | ||
| 38 | import org.onlab.packet.IpPrefix; | 41 | import org.onlab.packet.IpPrefix; |
| 39 | 42 | ||
| 40 | import java.util.ArrayList; | 43 | import java.util.ArrayList; |
| ... | @@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest { | ... | @@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest { |
| 92 | private DistributedDeviceStore dstore; | 95 | private DistributedDeviceStore dstore; |
| 93 | private TestMastershipManager masterManager; | 96 | private TestMastershipManager masterManager; |
| 94 | private EventDeliveryService eventService; | 97 | private EventDeliveryService eventService; |
| 98 | + private KryoSerializationManager serializationMgr; | ||
| 95 | 99 | ||
| 96 | @Before | 100 | @Before |
| 97 | public void setUp() { | 101 | public void setUp() { |
| ... | @@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest { | ... | @@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest { |
| 107 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); | 111 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); |
| 108 | storeManager.activate(); | 112 | storeManager.activate(); |
| 109 | 113 | ||
| 110 | - dstore = new TestDistributedDeviceStore(); | 114 | + serializationMgr = new KryoSerializationManager(); |
| 115 | + serializationMgr.activate(); | ||
| 116 | + | ||
| 117 | + dstore = new TestDistributedDeviceStore(storeManager, serializationMgr); | ||
| 111 | dstore.activate(); | 118 | dstore.activate(); |
| 112 | 119 | ||
| 113 | mgr.store = dstore; | 120 | mgr.store = dstore; |
| ... | @@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest { | ... | @@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest { |
| 133 | mgr.deactivate(); | 140 | mgr.deactivate(); |
| 134 | 141 | ||
| 135 | dstore.deactivate(); | 142 | dstore.deactivate(); |
| 143 | + serializationMgr.deactivate(); | ||
| 136 | storeManager.deactivate(); | 144 | storeManager.deactivate(); |
| 137 | } | 145 | } |
| 138 | 146 | ||
| ... | @@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest { | ... | @@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest { |
| 298 | 306 | ||
| 299 | private class TestDistributedDeviceStore extends DistributedDeviceStore { | 307 | private class TestDistributedDeviceStore extends DistributedDeviceStore { |
| 300 | 308 | ||
| 301 | - public TestDistributedDeviceStore() { | 309 | + public TestDistributedDeviceStore(StoreService storeService, |
| 302 | - this.storeService = storeManager; | 310 | + KryoSerializationService kryoSerializationService) { |
| 311 | + this.storeService = storeService; | ||
| 312 | + this.kryoSerializationService = kryoSerializationService; | ||
| 303 | } | 313 | } |
| 304 | } | 314 | } |
| 305 | 315 | ... | ... |
| ... | @@ -49,6 +49,7 @@ public class DistributedClusterStore | ... | @@ -49,6 +49,7 @@ public class DistributedClusterStore |
| 49 | private final MembershipListener listener = new InternalMembershipListener(); | 49 | private final MembershipListener listener = new InternalMembershipListener(); |
| 50 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); | 50 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); |
| 51 | 51 | ||
| 52 | + @Override | ||
| 52 | @Activate | 53 | @Activate |
| 53 | public void activate() { | 54 | public void activate() { |
| 54 | super.activate(); | 55 | super.activate(); |
| ... | @@ -56,7 +57,7 @@ public class DistributedClusterStore | ... | @@ -56,7 +57,7 @@ public class DistributedClusterStore |
| 56 | 57 | ||
| 57 | rawNodes = theInstance.getMap("nodes"); | 58 | rawNodes = theInstance.getMap("nodes"); |
| 58 | OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader | 59 | OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader |
| 59 | - = new OptionalCacheLoader<>(storeService, rawNodes); | 60 | + = new OptionalCacheLoader<>(kryoSerializationService, rawNodes); |
| 60 | nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); | 61 | nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); |
| 61 | rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true); | 62 | rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true); |
| 62 | 63 | ... | ... |
| ... | @@ -52,7 +52,7 @@ implements MastershipStore { | ... | @@ -52,7 +52,7 @@ implements MastershipStore { |
| 52 | 52 | ||
| 53 | rawMasters = theInstance.getMap("masters"); | 53 | rawMasters = theInstance.getMap("masters"); |
| 54 | OptionalCacheLoader<DeviceId, NodeId> nodeLoader | 54 | OptionalCacheLoader<DeviceId, NodeId> nodeLoader |
| 55 | - = new OptionalCacheLoader<>(storeService, rawMasters); | 55 | + = new OptionalCacheLoader<>(kryoSerializationService, rawMasters); |
| 56 | masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); | 56 | masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); |
| 57 | rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true); | 57 | rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true); |
| 58 | 58 | ... | ... |
| ... | @@ -15,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; | ... | @@ -15,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 15 | import org.onlab.onos.event.Event; | 15 | import org.onlab.onos.event.Event; |
| 16 | import org.onlab.onos.store.AbstractStore; | 16 | import org.onlab.onos.store.AbstractStore; |
| 17 | import org.onlab.onos.store.StoreDelegate; | 17 | import org.onlab.onos.store.StoreDelegate; |
| 18 | +import org.onlab.onos.store.serializers.KryoSerializationService; | ||
| 18 | import org.slf4j.Logger; | 19 | import org.slf4j.Logger; |
| 19 | 20 | ||
| 20 | import static com.google.common.base.Preconditions.checkNotNull; | 21 | import static com.google.common.base.Preconditions.checkNotNull; |
| ... | @@ -32,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel | ... | @@ -32,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel |
| 32 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 33 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 33 | protected StoreService storeService; | 34 | protected StoreService storeService; |
| 34 | 35 | ||
| 36 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
| 37 | + protected KryoSerializationService kryoSerializationService; | ||
| 38 | + | ||
| 35 | protected HazelcastInstance theInstance; | 39 | protected HazelcastInstance theInstance; |
| 36 | 40 | ||
| 37 | @Activate | 41 | @Activate |
| ... | @@ -46,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel | ... | @@ -46,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel |
| 46 | * @return serialized object | 50 | * @return serialized object |
| 47 | */ | 51 | */ |
| 48 | protected byte[] serialize(Object obj) { | 52 | protected byte[] serialize(Object obj) { |
| 49 | - return storeService.serialize(obj); | 53 | + return kryoSerializationService.serialize(obj); |
| 50 | } | 54 | } |
| 51 | 55 | ||
| 52 | /** | 56 | /** |
| ... | @@ -57,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel | ... | @@ -57,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel |
| 57 | * @return deserialized object | 61 | * @return deserialized object |
| 58 | */ | 62 | */ |
| 59 | protected <T> T deserialize(byte[] bytes) { | 63 | protected <T> T deserialize(byte[] bytes) { |
| 60 | - return storeService.deserialize(bytes); | 64 | + return kryoSerializationService.deserialize(bytes); |
| 61 | } | 65 | } |
| 62 | 66 | ||
| 63 | 67 | ... | ... |
| ... | @@ -2,6 +2,8 @@ package org.onlab.onos.store.common; | ... | @@ -2,6 +2,8 @@ package org.onlab.onos.store.common; |
| 2 | 2 | ||
| 3 | import static com.google.common.base.Preconditions.checkNotNull; | 3 | import static com.google.common.base.Preconditions.checkNotNull; |
| 4 | 4 | ||
| 5 | +import org.onlab.onos.store.serializers.KryoSerializationService; | ||
| 6 | + | ||
| 5 | import com.google.common.base.Optional; | 7 | import com.google.common.base.Optional; |
| 6 | import com.google.common.cache.CacheLoader; | 8 | import com.google.common.cache.CacheLoader; |
| 7 | import com.hazelcast.core.IMap; | 9 | import com.hazelcast.core.IMap; |
| ... | @@ -16,28 +18,28 @@ import com.hazelcast.core.IMap; | ... | @@ -16,28 +18,28 @@ import com.hazelcast.core.IMap; |
| 16 | public final class OptionalCacheLoader<K, V> extends | 18 | public final class OptionalCacheLoader<K, V> extends |
| 17 | CacheLoader<K, Optional<V>> { | 19 | CacheLoader<K, Optional<V>> { |
| 18 | 20 | ||
| 19 | - private final StoreService storeService; | 21 | + private final KryoSerializationService kryoSerializationService; |
| 20 | private IMap<byte[], byte[]> rawMap; | 22 | private IMap<byte[], byte[]> rawMap; |
| 21 | 23 | ||
| 22 | /** | 24 | /** |
| 23 | * Constructor. | 25 | * Constructor. |
| 24 | * | 26 | * |
| 25 | - * @param storeService to use for serialization | 27 | + * @param kryoSerializationService to use for serialization |
| 26 | * @param rawMap underlying IMap | 28 | * @param rawMap underlying IMap |
| 27 | */ | 29 | */ |
| 28 | - public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) { | 30 | + public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) { |
| 29 | - this.storeService = checkNotNull(storeService); | 31 | + this.kryoSerializationService = checkNotNull(kryoSerializationService); |
| 30 | this.rawMap = checkNotNull(rawMap); | 32 | this.rawMap = checkNotNull(rawMap); |
| 31 | } | 33 | } |
| 32 | 34 | ||
| 33 | @Override | 35 | @Override |
| 34 | public Optional<V> load(K key) throws Exception { | 36 | public Optional<V> load(K key) throws Exception { |
| 35 | - byte[] keyBytes = storeService.serialize(key); | 37 | + byte[] keyBytes = kryoSerializationService.serialize(key); |
| 36 | byte[] valBytes = rawMap.get(keyBytes); | 38 | byte[] valBytes = rawMap.get(keyBytes); |
| 37 | if (valBytes == null) { | 39 | if (valBytes == null) { |
| 38 | return Optional.absent(); | 40 | return Optional.absent(); |
| 39 | } | 41 | } |
| 40 | - V dev = storeService.deserialize(valBytes); | 42 | + V dev = kryoSerializationService.deserialize(valBytes); |
| 41 | return Optional.of(dev); | 43 | return Optional.of(dev); |
| 42 | } | 44 | } |
| 43 | } | 45 | } | ... | ... |
| ... | @@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig; | ... | @@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig; |
| 5 | import com.hazelcast.core.Hazelcast; | 5 | import com.hazelcast.core.Hazelcast; |
| 6 | import com.hazelcast.core.HazelcastInstance; | 6 | import com.hazelcast.core.HazelcastInstance; |
| 7 | 7 | ||
| 8 | -import de.javakaffee.kryoserializers.URISerializer; | ||
| 9 | - | ||
| 10 | import org.apache.felix.scr.annotations.Activate; | 8 | import org.apache.felix.scr.annotations.Activate; |
| 11 | import org.apache.felix.scr.annotations.Component; | 9 | import org.apache.felix.scr.annotations.Component; |
| 12 | import org.apache.felix.scr.annotations.Deactivate; | 10 | import org.apache.felix.scr.annotations.Deactivate; |
| 13 | import org.apache.felix.scr.annotations.Service; | 11 | import org.apache.felix.scr.annotations.Service; |
| 14 | -import org.onlab.onos.cluster.ControllerNode; | ||
| 15 | -import org.onlab.onos.cluster.DefaultControllerNode; | ||
| 16 | -import org.onlab.onos.cluster.NodeId; | ||
| 17 | -import org.onlab.onos.net.ConnectPoint; | ||
| 18 | -import org.onlab.onos.net.DefaultDevice; | ||
| 19 | -import org.onlab.onos.net.DefaultLink; | ||
| 20 | -import org.onlab.onos.net.DefaultPort; | ||
| 21 | -import org.onlab.onos.net.Device; | ||
| 22 | -import org.onlab.onos.net.DeviceId; | ||
| 23 | -import org.onlab.onos.net.Element; | ||
| 24 | -import org.onlab.onos.net.Link; | ||
| 25 | -import org.onlab.onos.net.LinkKey; | ||
| 26 | -import org.onlab.onos.net.MastershipRole; | ||
| 27 | -import org.onlab.onos.net.Port; | ||
| 28 | -import org.onlab.onos.net.PortNumber; | ||
| 29 | -import org.onlab.onos.net.provider.ProviderId; | ||
| 30 | -import org.onlab.onos.store.serializers.ConnectPointSerializer; | ||
| 31 | -import org.onlab.onos.store.serializers.DefaultLinkSerializer; | ||
| 32 | -import org.onlab.onos.store.serializers.DefaultPortSerializer; | ||
| 33 | -import org.onlab.onos.store.serializers.DeviceIdSerializer; | ||
| 34 | -import org.onlab.onos.store.serializers.IpPrefixSerializer; | ||
| 35 | -import org.onlab.onos.store.serializers.LinkKeySerializer; | ||
| 36 | -import org.onlab.onos.store.serializers.NodeIdSerializer; | ||
| 37 | -import org.onlab.onos.store.serializers.PortNumberSerializer; | ||
| 38 | -import org.onlab.onos.store.serializers.ProviderIdSerializer; | ||
| 39 | -import org.onlab.packet.IpPrefix; | ||
| 40 | -import org.onlab.util.KryoPool; | ||
| 41 | import org.slf4j.Logger; | 12 | import org.slf4j.Logger; |
| 42 | import org.slf4j.LoggerFactory; | 13 | import org.slf4j.LoggerFactory; |
| 43 | 14 | ||
| 44 | import java.io.FileNotFoundException; | 15 | import java.io.FileNotFoundException; |
| 45 | -import java.net.URI; | ||
| 46 | -import java.util.ArrayList; | ||
| 47 | -import java.util.HashMap; | ||
| 48 | 16 | ||
| 49 | /** | 17 | /** |
| 50 | * Auxiliary bootstrap of distributed store. | 18 | * Auxiliary bootstrap of distributed store. |
| ... | @@ -58,55 +26,18 @@ public class StoreManager implements StoreService { | ... | @@ -58,55 +26,18 @@ public class StoreManager implements StoreService { |
| 58 | private final Logger log = LoggerFactory.getLogger(getClass()); | 26 | private final Logger log = LoggerFactory.getLogger(getClass()); |
| 59 | 27 | ||
| 60 | protected HazelcastInstance instance; | 28 | protected HazelcastInstance instance; |
| 61 | - private KryoPool serializerPool; | ||
| 62 | - | ||
| 63 | 29 | ||
| 64 | @Activate | 30 | @Activate |
| 65 | public void activate() { | 31 | public void activate() { |
| 66 | try { | 32 | try { |
| 67 | Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE); | 33 | Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE); |
| 68 | instance = Hazelcast.newHazelcastInstance(config); | 34 | instance = Hazelcast.newHazelcastInstance(config); |
| 69 | - setupKryoPool(); | ||
| 70 | log.info("Started"); | 35 | log.info("Started"); |
| 71 | } catch (FileNotFoundException e) { | 36 | } catch (FileNotFoundException e) { |
| 72 | log.error("Unable to configure Hazelcast", e); | 37 | log.error("Unable to configure Hazelcast", e); |
| 73 | } | 38 | } |
| 74 | } | 39 | } |
| 75 | 40 | ||
| 76 | - /** | ||
| 77 | - * Sets up the common serialzers pool. | ||
| 78 | - */ | ||
| 79 | - protected void setupKryoPool() { | ||
| 80 | - // FIXME Slice out types used in common to separate pool/namespace. | ||
| 81 | - serializerPool = KryoPool.newBuilder() | ||
| 82 | - .register(ArrayList.class, | ||
| 83 | - HashMap.class, | ||
| 84 | - | ||
| 85 | - ControllerNode.State.class, | ||
| 86 | - Device.Type.class, | ||
| 87 | - | ||
| 88 | - DefaultControllerNode.class, | ||
| 89 | - DefaultDevice.class, | ||
| 90 | - MastershipRole.class, | ||
| 91 | - Port.class, | ||
| 92 | - Element.class, | ||
| 93 | - | ||
| 94 | - Link.Type.class | ||
| 95 | - ) | ||
| 96 | - .register(IpPrefix.class, new IpPrefixSerializer()) | ||
| 97 | - .register(URI.class, new URISerializer()) | ||
| 98 | - .register(NodeId.class, new NodeIdSerializer()) | ||
| 99 | - .register(ProviderId.class, new ProviderIdSerializer()) | ||
| 100 | - .register(DeviceId.class, new DeviceIdSerializer()) | ||
| 101 | - .register(PortNumber.class, new PortNumberSerializer()) | ||
| 102 | - .register(DefaultPort.class, new DefaultPortSerializer()) | ||
| 103 | - .register(LinkKey.class, new LinkKeySerializer()) | ||
| 104 | - .register(ConnectPoint.class, new ConnectPointSerializer()) | ||
| 105 | - .register(DefaultLink.class, new DefaultLinkSerializer()) | ||
| 106 | - .build() | ||
| 107 | - .populate(10); | ||
| 108 | - } | ||
| 109 | - | ||
| 110 | @Deactivate | 41 | @Deactivate |
| 111 | public void deactivate() { | 42 | public void deactivate() { |
| 112 | instance.shutdown(); | 43 | instance.shutdown(); |
| ... | @@ -118,18 +49,4 @@ public class StoreManager implements StoreService { | ... | @@ -118,18 +49,4 @@ public class StoreManager implements StoreService { |
| 118 | return instance; | 49 | return instance; |
| 119 | } | 50 | } |
| 120 | 51 | ||
| 121 | - | ||
| 122 | - @Override | ||
| 123 | - public byte[] serialize(final Object obj) { | ||
| 124 | - return serializerPool.serialize(obj); | ||
| 125 | - } | ||
| 126 | - | ||
| 127 | - @Override | ||
| 128 | - public <T> T deserialize(final byte[] bytes) { | ||
| 129 | - if (bytes == null) { | ||
| 130 | - return null; | ||
| 131 | - } | ||
| 132 | - return serializerPool.deserialize(bytes); | ||
| 133 | - } | ||
| 134 | - | ||
| 135 | } | 52 | } | ... | ... |
| ... | @@ -15,22 +15,4 @@ public interface StoreService { | ... | @@ -15,22 +15,4 @@ public interface StoreService { |
| 15 | */ | 15 | */ |
| 16 | HazelcastInstance getHazelcastInstance(); | 16 | HazelcastInstance getHazelcastInstance(); |
| 17 | 17 | ||
| 18 | - /** | ||
| 19 | - * Serializes the specified object into bytes using one of the | ||
| 20 | - * pre-registered serializers. | ||
| 21 | - * | ||
| 22 | - * @param obj object to be serialized | ||
| 23 | - * @return serialized bytes | ||
| 24 | - */ | ||
| 25 | - public byte[] serialize(final Object obj); | ||
| 26 | - | ||
| 27 | - /** | ||
| 28 | - * Deserializes the specified bytes into an object using one of the | ||
| 29 | - * pre-registered serializers. | ||
| 30 | - * | ||
| 31 | - * @param bytes bytes to be deserialized | ||
| 32 | - * @return deserialized object | ||
| 33 | - */ | ||
| 34 | - public <T> T deserialize(final byte[] bytes); | ||
| 35 | - | ||
| 36 | } | 18 | } | ... | ... |
| ... | @@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager { | ... | @@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager { |
| 46 | this.instance = instance; | 46 | this.instance = instance; |
| 47 | } | 47 | } |
| 48 | 48 | ||
| 49 | - // Hazelcast setup removed from original code. | ||
| 50 | @Override | 49 | @Override |
| 51 | public void activate() { | 50 | public void activate() { |
| 52 | - setupKryoPool(); | 51 | + // Hazelcast setup removed from original code. |
| 53 | } | 52 | } |
| 54 | } | 53 | } | ... | ... |
| ... | @@ -87,7 +87,7 @@ public class DistributedDeviceStore | ... | @@ -87,7 +87,7 @@ public class DistributedDeviceStore |
| 87 | // TODO decide on Map name scheme to avoid collision | 87 | // TODO decide on Map name scheme to avoid collision |
| 88 | rawDevices = theInstance.getMap("devices"); | 88 | rawDevices = theInstance.getMap("devices"); |
| 89 | final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader | 89 | final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader |
| 90 | - = new OptionalCacheLoader<>(storeService, rawDevices); | 90 | + = new OptionalCacheLoader<>(kryoSerializationService, rawDevices); |
| 91 | devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader)); | 91 | devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader)); |
| 92 | // refresh/populate cache based on notification from other instance | 92 | // refresh/populate cache based on notification from other instance |
| 93 | devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue); | 93 | devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue); |
| ... | @@ -97,7 +97,7 @@ public class DistributedDeviceStore | ... | @@ -97,7 +97,7 @@ public class DistributedDeviceStore |
| 97 | 97 | ||
| 98 | rawDevicePorts = theInstance.getMap("devicePorts"); | 98 | rawDevicePorts = theInstance.getMap("devicePorts"); |
| 99 | final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader | 99 | final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader |
| 100 | - = new OptionalCacheLoader<>(storeService, rawDevicePorts); | 100 | + = new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts); |
| 101 | devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader)); | 101 | devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader)); |
| 102 | // refresh/populate cache based on notification from other instance | 102 | // refresh/populate cache based on notification from other instance |
| 103 | portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue); | 103 | portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue); | ... | ... |
| ... | @@ -70,7 +70,7 @@ public class DistributedLinkStore | ... | @@ -70,7 +70,7 @@ public class DistributedLinkStore |
| 70 | // TODO decide on Map name scheme to avoid collision | 70 | // TODO decide on Map name scheme to avoid collision |
| 71 | rawLinks = theInstance.getMap("links"); | 71 | rawLinks = theInstance.getMap("links"); |
| 72 | final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader | 72 | final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader |
| 73 | - = new OptionalCacheLoader<>(storeService, rawLinks); | 73 | + = new OptionalCacheLoader<>(kryoSerializationService, rawLinks); |
| 74 | links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader)); | 74 | links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader)); |
| 75 | // refresh/populate cache based on notification from other instance | 75 | // refresh/populate cache based on notification from other instance |
| 76 | linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue); | 76 | linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue); | ... | ... |
| ... | @@ -36,6 +36,8 @@ import org.onlab.onos.net.provider.ProviderId; | ... | @@ -36,6 +36,8 @@ import org.onlab.onos.net.provider.ProviderId; |
| 36 | import org.onlab.onos.store.common.StoreManager; | 36 | import org.onlab.onos.store.common.StoreManager; |
| 37 | import org.onlab.onos.store.common.StoreService; | 37 | import org.onlab.onos.store.common.StoreService; |
| 38 | import org.onlab.onos.store.common.TestStoreManager; | 38 | import org.onlab.onos.store.common.TestStoreManager; |
| 39 | +import org.onlab.onos.store.serializers.KryoSerializationManager; | ||
| 40 | +import org.onlab.onos.store.serializers.KryoSerializationService; | ||
| 39 | 41 | ||
| 40 | import com.google.common.collect.Iterables; | 42 | import com.google.common.collect.Iterables; |
| 41 | import com.google.common.collect.Sets; | 43 | import com.google.common.collect.Sets; |
| ... | @@ -61,6 +63,7 @@ public class DistributedDeviceStoreTest { | ... | @@ -61,6 +63,7 @@ public class DistributedDeviceStoreTest { |
| 61 | private static final PortNumber P3 = PortNumber.portNumber(3); | 63 | private static final PortNumber P3 = PortNumber.portNumber(3); |
| 62 | 64 | ||
| 63 | private DistributedDeviceStore deviceStore; | 65 | private DistributedDeviceStore deviceStore; |
| 66 | + private KryoSerializationManager serializationMgr; | ||
| 64 | 67 | ||
| 65 | private StoreManager storeManager; | 68 | private StoreManager storeManager; |
| 66 | 69 | ||
| ... | @@ -82,7 +85,10 @@ public class DistributedDeviceStoreTest { | ... | @@ -82,7 +85,10 @@ public class DistributedDeviceStoreTest { |
| 82 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); | 85 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); |
| 83 | storeManager.activate(); | 86 | storeManager.activate(); |
| 84 | 87 | ||
| 85 | - deviceStore = new TestDistributedDeviceStore(storeManager); | 88 | + serializationMgr = new KryoSerializationManager(); |
| 89 | + serializationMgr.activate(); | ||
| 90 | + | ||
| 91 | + deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr); | ||
| 86 | deviceStore.activate(); | 92 | deviceStore.activate(); |
| 87 | } | 93 | } |
| 88 | 94 | ||
| ... | @@ -90,6 +96,8 @@ public class DistributedDeviceStoreTest { | ... | @@ -90,6 +96,8 @@ public class DistributedDeviceStoreTest { |
| 90 | public void tearDown() throws Exception { | 96 | public void tearDown() throws Exception { |
| 91 | deviceStore.deactivate(); | 97 | deviceStore.deactivate(); |
| 92 | 98 | ||
| 99 | + serializationMgr.deactivate(); | ||
| 100 | + | ||
| 93 | storeManager.deactivate(); | 101 | storeManager.deactivate(); |
| 94 | } | 102 | } |
| 95 | 103 | ||
| ... | @@ -384,8 +392,10 @@ public class DistributedDeviceStoreTest { | ... | @@ -384,8 +392,10 @@ public class DistributedDeviceStoreTest { |
| 384 | } | 392 | } |
| 385 | 393 | ||
| 386 | private class TestDistributedDeviceStore extends DistributedDeviceStore { | 394 | private class TestDistributedDeviceStore extends DistributedDeviceStore { |
| 387 | - public TestDistributedDeviceStore(StoreService storeService) { | 395 | + public TestDistributedDeviceStore(StoreService storeService, |
| 396 | + KryoSerializationService kryoSerializationService) { | ||
| 388 | this.storeService = storeService; | 397 | this.storeService = storeService; |
| 398 | + this.kryoSerializationService = kryoSerializationService; | ||
| 389 | } | 399 | } |
| 390 | } | 400 | } |
| 391 | } | 401 | } | ... | ... |
| ... | @@ -30,6 +30,8 @@ import org.onlab.onos.net.provider.ProviderId; | ... | @@ -30,6 +30,8 @@ import org.onlab.onos.net.provider.ProviderId; |
| 30 | import org.onlab.onos.store.common.StoreManager; | 30 | import org.onlab.onos.store.common.StoreManager; |
| 31 | import org.onlab.onos.store.common.StoreService; | 31 | import org.onlab.onos.store.common.StoreService; |
| 32 | import org.onlab.onos.store.common.TestStoreManager; | 32 | import org.onlab.onos.store.common.TestStoreManager; |
| 33 | +import org.onlab.onos.store.serializers.KryoSerializationManager; | ||
| 34 | +import org.onlab.onos.store.serializers.KryoSerializationService; | ||
| 33 | 35 | ||
| 34 | import com.google.common.collect.Iterables; | 36 | import com.google.common.collect.Iterables; |
| 35 | import com.hazelcast.config.Config; | 37 | import com.hazelcast.config.Config; |
| ... | @@ -49,6 +51,7 @@ public class DistributedLinkStoreTest { | ... | @@ -49,6 +51,7 @@ public class DistributedLinkStoreTest { |
| 49 | private static final PortNumber P3 = PortNumber.portNumber(3); | 51 | private static final PortNumber P3 = PortNumber.portNumber(3); |
| 50 | 52 | ||
| 51 | private StoreManager storeManager; | 53 | private StoreManager storeManager; |
| 54 | + private KryoSerializationManager serializationMgr; | ||
| 52 | 55 | ||
| 53 | private DistributedLinkStore linkStore; | 56 | private DistributedLinkStore linkStore; |
| 54 | 57 | ||
| ... | @@ -68,13 +71,17 @@ public class DistributedLinkStoreTest { | ... | @@ -68,13 +71,17 @@ public class DistributedLinkStoreTest { |
| 68 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); | 71 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); |
| 69 | storeManager.activate(); | 72 | storeManager.activate(); |
| 70 | 73 | ||
| 71 | - linkStore = new TestDistributedLinkStore(storeManager); | 74 | + serializationMgr = new KryoSerializationManager(); |
| 75 | + serializationMgr.activate(); | ||
| 76 | + | ||
| 77 | + linkStore = new TestDistributedLinkStore(storeManager, serializationMgr); | ||
| 72 | linkStore.activate(); | 78 | linkStore.activate(); |
| 73 | } | 79 | } |
| 74 | 80 | ||
| 75 | @After | 81 | @After |
| 76 | public void tearDown() throws Exception { | 82 | public void tearDown() throws Exception { |
| 77 | linkStore.deactivate(); | 83 | linkStore.deactivate(); |
| 84 | + serializationMgr.deactivate(); | ||
| 78 | storeManager.deactivate(); | 85 | storeManager.deactivate(); |
| 79 | } | 86 | } |
| 80 | 87 | ||
| ... | @@ -354,8 +361,10 @@ public class DistributedLinkStoreTest { | ... | @@ -354,8 +361,10 @@ public class DistributedLinkStoreTest { |
| 354 | 361 | ||
| 355 | 362 | ||
| 356 | class TestDistributedLinkStore extends DistributedLinkStore { | 363 | class TestDistributedLinkStore extends DistributedLinkStore { |
| 357 | - TestDistributedLinkStore(StoreService storeService) { | 364 | + TestDistributedLinkStore(StoreService storeService, |
| 365 | + KryoSerializationService kryoSerializationService) { | ||
| 358 | this.storeService = storeService; | 366 | this.storeService = storeService; |
| 367 | + this.kryoSerializationService = kryoSerializationService; | ||
| 359 | } | 368 | } |
| 360 | } | 369 | } |
| 361 | } | 370 | } | ... | ... |
core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
0 → 100644
| 1 | +package org.onlab.onos.store.serializers; | ||
| 2 | + | ||
| 3 | +import java.net.URI; | ||
| 4 | +import java.util.ArrayList; | ||
| 5 | +import java.util.HashMap; | ||
| 6 | + | ||
| 7 | +import org.apache.felix.scr.annotations.Activate; | ||
| 8 | +import org.apache.felix.scr.annotations.Component; | ||
| 9 | +import org.apache.felix.scr.annotations.Deactivate; | ||
| 10 | +import org.apache.felix.scr.annotations.Service; | ||
| 11 | +import org.onlab.onos.cluster.ControllerNode; | ||
| 12 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
| 13 | +import org.onlab.onos.cluster.NodeId; | ||
| 14 | +import org.onlab.onos.net.ConnectPoint; | ||
| 15 | +import org.onlab.onos.net.DefaultDevice; | ||
| 16 | +import org.onlab.onos.net.DefaultLink; | ||
| 17 | +import org.onlab.onos.net.DefaultPort; | ||
| 18 | +import org.onlab.onos.net.Device; | ||
| 19 | +import org.onlab.onos.net.DeviceId; | ||
| 20 | +import org.onlab.onos.net.Element; | ||
| 21 | +import org.onlab.onos.net.Link; | ||
| 22 | +import org.onlab.onos.net.LinkKey; | ||
| 23 | +import org.onlab.onos.net.MastershipRole; | ||
| 24 | +import org.onlab.onos.net.Port; | ||
| 25 | +import org.onlab.onos.net.PortNumber; | ||
| 26 | +import org.onlab.onos.net.provider.ProviderId; | ||
| 27 | +import org.onlab.packet.IpPrefix; | ||
| 28 | +import org.onlab.util.KryoPool; | ||
| 29 | +import org.slf4j.Logger; | ||
| 30 | +import org.slf4j.LoggerFactory; | ||
| 31 | + | ||
| 32 | +import de.javakaffee.kryoserializers.URISerializer; | ||
| 33 | + | ||
| 34 | +/** | ||
| 35 | + * Serialization service using Kryo. | ||
| 36 | + */ | ||
| 37 | +@Component(immediate = true) | ||
| 38 | +@Service | ||
| 39 | +public class KryoSerializationManager implements KryoSerializationService { | ||
| 40 | + | ||
| 41 | + private final Logger log = LoggerFactory.getLogger(getClass()); | ||
| 42 | + private KryoPool serializerPool; | ||
| 43 | + | ||
| 44 | + | ||
| 45 | + @Activate | ||
| 46 | + public void activate() { | ||
| 47 | + setupKryoPool(); | ||
| 48 | + log.info("Started"); | ||
| 49 | + } | ||
| 50 | + | ||
| 51 | + @Deactivate | ||
| 52 | + public void deactivate() { | ||
| 53 | + log.info("Stopped"); | ||
| 54 | + } | ||
| 55 | + | ||
| 56 | + /** | ||
| 57 | + * Sets up the common serialzers pool. | ||
| 58 | + */ | ||
| 59 | + protected void setupKryoPool() { | ||
| 60 | + // FIXME Slice out types used in common to separate pool/namespace. | ||
| 61 | + serializerPool = KryoPool.newBuilder() | ||
| 62 | + .register(ArrayList.class, | ||
| 63 | + HashMap.class, | ||
| 64 | + | ||
| 65 | + ControllerNode.State.class, | ||
| 66 | + Device.Type.class, | ||
| 67 | + | ||
| 68 | + DefaultControllerNode.class, | ||
| 69 | + DefaultDevice.class, | ||
| 70 | + MastershipRole.class, | ||
| 71 | + Port.class, | ||
| 72 | + Element.class, | ||
| 73 | + | ||
| 74 | + Link.Type.class | ||
| 75 | + ) | ||
| 76 | + .register(IpPrefix.class, new IpPrefixSerializer()) | ||
| 77 | + .register(URI.class, new URISerializer()) | ||
| 78 | + .register(NodeId.class, new NodeIdSerializer()) | ||
| 79 | + .register(ProviderId.class, new ProviderIdSerializer()) | ||
| 80 | + .register(DeviceId.class, new DeviceIdSerializer()) | ||
| 81 | + .register(PortNumber.class, new PortNumberSerializer()) | ||
| 82 | + .register(DefaultPort.class, new DefaultPortSerializer()) | ||
| 83 | + .register(LinkKey.class, new LinkKeySerializer()) | ||
| 84 | + .register(ConnectPoint.class, new ConnectPointSerializer()) | ||
| 85 | + .register(DefaultLink.class, new DefaultLinkSerializer()) | ||
| 86 | + .build() | ||
| 87 | + .populate(1); | ||
| 88 | + } | ||
| 89 | + | ||
| 90 | + @Override | ||
| 91 | + public byte[] serialize(final Object obj) { | ||
| 92 | + return serializerPool.serialize(obj); | ||
| 93 | + } | ||
| 94 | + | ||
| 95 | + @Override | ||
| 96 | + public <T> T deserialize(final byte[] bytes) { | ||
| 97 | + if (bytes == null) { | ||
| 98 | + return null; | ||
| 99 | + } | ||
| 100 | + return serializerPool.deserialize(bytes); | ||
| 101 | + } | ||
| 102 | + | ||
| 103 | +} |
core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
0 → 100644
| 1 | +package org.onlab.onos.store.serializers; | ||
| 2 | + | ||
| 3 | +// TODO: To be replaced with SerializationService from IOLoop activity | ||
| 4 | +/** | ||
| 5 | + * Service to serialize Objects into byte array. | ||
| 6 | + */ | ||
| 7 | +public interface KryoSerializationService { | ||
| 8 | + | ||
| 9 | + /** | ||
| 10 | + * Serializes the specified object into bytes using one of the | ||
| 11 | + * pre-registered serializers. | ||
| 12 | + * | ||
| 13 | + * @param obj object to be serialized | ||
| 14 | + * @return serialized bytes | ||
| 15 | + */ | ||
| 16 | + public byte[] serialize(final Object obj); | ||
| 17 | + | ||
| 18 | + /** | ||
| 19 | + * Deserializes the specified bytes into an object using one of the | ||
| 20 | + * pre-registered serializers. | ||
| 21 | + * | ||
| 22 | + * @param bytes bytes to be deserialized | ||
| 23 | + * @return deserialized object | ||
| 24 | + */ | ||
| 25 | + public <T> T deserialize(final byte[] bytes); | ||
| 26 | + | ||
| 27 | +} |
-
Please register or login to post a comment