Yuta HIGUCHI

Device Anti-Entropy

- create Advertisement
- handler for Advertisement
- register handler, background thread to send advertisement

Change-Id: I99e8a7d68747970c34b3c25c6d0489769d251446
Showing 28 changed files with 805 additions and 383 deletions
...@@ -4,6 +4,8 @@ import org.onlab.onos.net.AbstractDescription; ...@@ -4,6 +4,8 @@ import org.onlab.onos.net.AbstractDescription;
4 import org.onlab.onos.net.PortNumber; 4 import org.onlab.onos.net.PortNumber;
5 import org.onlab.onos.net.SparseAnnotations; 5 import org.onlab.onos.net.SparseAnnotations;
6 6
7 +import com.google.common.base.MoreObjects;
8 +
7 /** 9 /**
8 * Default implementation of immutable port description. 10 * Default implementation of immutable port description.
9 */ 11 */
...@@ -48,6 +50,15 @@ public class DefaultPortDescription extends AbstractDescription ...@@ -48,6 +50,15 @@ public class DefaultPortDescription extends AbstractDescription
48 return isEnabled; 50 return isEnabled;
49 } 51 }
50 52
53 + @Override
54 + public String toString() {
55 + return MoreObjects.toStringHelper(getClass())
56 + .add("number", number)
57 + .add("isEnabled", isEnabled)
58 + .add("annotations", annotations())
59 + .toString();
60 + }
61 +
51 // default constructor for serialization 62 // default constructor for serialization
52 private DefaultPortDescription() { 63 private DefaultPortDescription() {
53 this.number = null; 64 this.number = null;
......
1 -package org.onlab.onos.store.common.impl;
2 -
3 -import java.util.Map;
4 -
5 -import org.onlab.onos.cluster.NodeId;
6 -import org.onlab.onos.store.Timestamp;
7 -
8 -import com.google.common.collect.ImmutableMap;
9 -
10 -/**
11 - * Anti-Entropy advertisement message.
12 - * <p>
13 - * Message to advertise the information this node holds.
14 - *
15 - * @param <ID> ID type
16 - */
17 -public class AntiEntropyAdvertisement<ID> {
18 -
19 - private final NodeId sender;
20 - private final ImmutableMap<ID, Timestamp> advertisement;
21 -
22 - /**
23 - * Creates anti-entropy advertisement message.
24 - *
25 - * @param sender sender of this message
26 - * @param advertisement timestamp information of the data sender holds
27 - */
28 - public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
29 - this.sender = sender;
30 - this.advertisement = ImmutableMap.copyOf(advertisement);
31 - }
32 -
33 - public NodeId sender() {
34 - return sender;
35 - }
36 -
37 - public ImmutableMap<ID, Timestamp> advertisement() {
38 - return advertisement;
39 - }
40 -
41 - // Default constructor for serializer
42 - protected AntiEntropyAdvertisement() {
43 - this.sender = null;
44 - this.advertisement = null;
45 - }
46 -}
1 -package org.onlab.onos.store.common.impl;
2 -
3 -import java.util.Map;
4 -import java.util.Set;
5 -
6 -import org.onlab.onos.cluster.NodeId;
7 -import org.onlab.onos.store.device.impl.VersionedValue;
8 -
9 -import com.google.common.collect.ImmutableMap;
10 -import com.google.common.collect.ImmutableSet;
11 -
12 -/**
13 - * Anti-Entropy reply message.
14 - * <p>
15 - * Message to send in reply to advertisement or another reply.
16 - * Suggest to the sender about the more up-to-date data this node has,
17 - * and request for more recent data that the receiver has.
18 - */
19 -public class AntiEntropyReply<ID, V extends VersionedValue<?>> {
20 -
21 - private final NodeId sender;
22 - private final ImmutableMap<ID, V> suggestion;
23 - private final ImmutableSet<ID> request;
24 -
25 - /**
26 - * Creates a reply to anti-entropy message.
27 - *
28 - * @param sender sender of this message
29 - * @param suggestion collection of more recent values, sender had
30 - * @param request Collection of identifiers
31 - */
32 - public AntiEntropyReply(NodeId sender,
33 - Map<ID, V> suggestion,
34 - Set<ID> request) {
35 - this.sender = sender;
36 - this.suggestion = ImmutableMap.copyOf(suggestion);
37 - this.request = ImmutableSet.copyOf(request);
38 - }
39 -
40 - public NodeId sender() {
41 - return sender;
42 - }
43 -
44 - /**
45 - * Returns collection of values, which the recipient of this reply is likely
46 - * to be missing or has outdated version.
47 - *
48 - * @return
49 - */
50 - public ImmutableMap<ID, V> suggestion() {
51 - return suggestion;
52 - }
53 -
54 - /**
55 - * Returns collection of identifier to request.
56 - *
57 - * @return collection of identifier to request
58 - */
59 - public ImmutableSet<ID> request() {
60 - return request;
61 - }
62 -
63 - /**
64 - * Checks if reply contains any suggestion or request.
65 - *
66 - * @return true if nothing is suggested and requested
67 - */
68 - public boolean isEmpty() {
69 - return suggestion.isEmpty() && request.isEmpty();
70 - }
71 -
72 - // Default constructor for serializer
73 - protected AntiEntropyReply() {
74 - this.sender = null;
75 - this.suggestion = null;
76 - this.request = null;
77 - }
78 -}
1 +package org.onlab.onos.store.common.impl;
2 +
3 +import org.onlab.onos.cluster.ControllerNode;
4 +import org.onlab.onos.cluster.NodeId;
5 +
6 +import com.google.common.base.Function;
7 +
8 +/**
9 + * Function to convert ControllerNode to NodeId.
10 + */
11 +public final class ControllerNodeToNodeId
12 + implements Function<ControllerNode, NodeId> {
13 +
14 + private static final ControllerNodeToNodeId INSTANCE = new ControllerNodeToNodeId();
15 +
16 + @Override
17 + public NodeId apply(ControllerNode input) {
18 + return input.id();
19 + }
20 +
21 + public static ControllerNodeToNodeId toNodeId() {
22 + return INSTANCE;
23 + }
24 +}
...@@ -30,6 +30,7 @@ public final class Timestamped<T> { ...@@ -30,6 +30,7 @@ public final class Timestamped<T> {
30 30
31 /** 31 /**
32 * Returns the value. 32 * Returns the value.
33 + *
33 * @return value 34 * @return value
34 */ 35 */
35 public T value() { 36 public T value() {
...@@ -38,6 +39,7 @@ public final class Timestamped<T> { ...@@ -38,6 +39,7 @@ public final class Timestamped<T> {
38 39
39 /** 40 /**
40 * Returns the time stamp. 41 * Returns the time stamp.
42 + *
41 * @return time stamp 43 * @return time stamp
42 */ 44 */
43 public Timestamp timestamp() { 45 public Timestamp timestamp() {
...@@ -51,7 +53,16 @@ public final class Timestamped<T> { ...@@ -51,7 +53,16 @@ public final class Timestamped<T> {
51 * @return true if this instance is newer. 53 * @return true if this instance is newer.
52 */ 54 */
53 public boolean isNewer(Timestamped<T> other) { 55 public boolean isNewer(Timestamped<T> other) {
54 - return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0; 56 + return isNewer(checkNotNull(other).timestamp());
57 + }
58 +
59 + /**
60 + * Tests if this timestamp is newer thatn the specified timestamp.
61 + * @param timestamp to compare agains
62 + * @return true if this instance is newer
63 + */
64 + public boolean isNewer(Timestamp timestamp) {
65 + return this.timestamp.compareTo(checkNotNull(timestamp)) > 0;
55 } 66 }
56 67
57 @Override 68 @Override
......
1 -package org.onlab.onos.store.device.impl;
2 -
3 -import java.util.Collection;
4 -import java.util.HashMap;
5 -import java.util.Map;
6 -
7 -import org.onlab.onos.cluster.NodeId;
8 -import org.onlab.onos.net.Device;
9 -import org.onlab.onos.net.DeviceId;
10 -import org.onlab.onos.store.Timestamp;
11 -import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
12 -
13 -// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
14 -// TODO: Handle Port as part of these messages, or separate messages for Ports?
15 -
16 -public class DeviceAntiEntropyAdvertisement
17 - extends AntiEntropyAdvertisement<DeviceId> {
18 -
19 -
20 - public DeviceAntiEntropyAdvertisement(NodeId sender,
21 - Map<DeviceId, Timestamp> advertisement) {
22 - super(sender, advertisement);
23 - }
24 -
25 - // May need to add ProviderID, etc.
26 - public static DeviceAntiEntropyAdvertisement create(
27 - NodeId self,
28 - Collection<VersionedValue<Device>> localValues) {
29 -
30 - Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
31 - for (VersionedValue<Device> e : localValues) {
32 - ads.put(e.entity().id(), e.timestamp());
33 - }
34 - return new DeviceAntiEntropyAdvertisement(self, ads);
35 - }
36 -
37 - // For serializer
38 - protected DeviceAntiEntropyAdvertisement() {}
39 -}
1 -package org.onlab.onos.store.device.impl;
2 -
3 -import java.util.Collection;
4 -import java.util.HashMap;
5 -import java.util.HashSet;
6 -import java.util.Map;
7 -import java.util.Set;
8 -
9 -import org.onlab.onos.cluster.NodeId;
10 -import org.onlab.onos.net.Device;
11 -import org.onlab.onos.net.DeviceId;
12 -import org.onlab.onos.store.Timestamp;
13 -import org.onlab.onos.store.common.impl.AntiEntropyReply;
14 -
15 -import com.google.common.collect.ImmutableMap;
16 -import com.google.common.collect.ImmutableSet;
17 -
18 -public class DeviceAntiEntropyReply
19 - extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
20 -
21 -
22 - public DeviceAntiEntropyReply(NodeId sender,
23 - Map<DeviceId, VersionedValue<Device>> suggestion,
24 - Set<DeviceId> request) {
25 - super(sender, suggestion, request);
26 - }
27 -
28 - /**
29 - * Creates a reply to Anti-Entropy advertisement.
30 - *
31 - * @param advertisement to respond to
32 - * @param self node identifier representing local node
33 - * @param localValues local values held on this node
34 - * @return reply message
35 - */
36 - public static DeviceAntiEntropyReply reply(
37 - DeviceAntiEntropyAdvertisement advertisement,
38 - NodeId self,
39 - Collection<VersionedValue<Device>> localValues
40 - ) {
41 -
42 - ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
43 -
44 - ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
45 - sug = ImmutableMap.builder();
46 -
47 - Set<DeviceId> req = new HashSet<>(ads.keySet());
48 -
49 - for (VersionedValue<Device> e : localValues) {
50 - final DeviceId id = e.entity().id();
51 - final Timestamp local = e.timestamp();
52 - final Timestamp theirs = ads.get(id);
53 - if (theirs == null) {
54 - // they don't have it, suggest
55 - sug.put(id, e);
56 - // don't need theirs
57 - req.remove(id);
58 - } else if (local.compareTo(theirs) < 0) {
59 - // they got older one, suggest
60 - sug.put(id, e);
61 - // don't need theirs
62 - req.remove(id);
63 - } else if (local.equals(theirs)) {
64 - // same, don't need theirs
65 - req.remove(id);
66 - }
67 - }
68 -
69 - return new DeviceAntiEntropyReply(self, sug.build(), req);
70 - }
71 -
72 - /**
73 - * Creates a reply to request for values held locally.
74 - *
75 - * @param requests message containing the request
76 - * @param self node identifier representing local node
77 - * @param localValues local valeds held on this node
78 - * @return reply message
79 - */
80 - public static DeviceAntiEntropyReply reply(
81 - DeviceAntiEntropyReply requests,
82 - NodeId self,
83 - Map<DeviceId, VersionedValue<Device>> localValues
84 - ) {
85 -
86 - Set<DeviceId> reqs = requests.request();
87 -
88 - Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
89 - for (DeviceId id : reqs) {
90 - final VersionedValue<Device> value = localValues.get(id);
91 - if (value != null) {
92 - requested.put(id, value);
93 - }
94 - }
95 -
96 - Set<DeviceId> empty = ImmutableSet.of();
97 - return new DeviceAntiEntropyReply(self, requested, empty);
98 - }
99 -
100 - // For serializer
101 - protected DeviceAntiEntropyReply() {}
102 -}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +import static org.onlab.onos.net.DefaultAnnotations.union;
5 +
6 +import java.util.Collections;
7 +import java.util.Map;
8 +import java.util.concurrent.ConcurrentHashMap;
9 +import java.util.concurrent.ConcurrentMap;
10 +
11 +import org.onlab.onos.net.PortNumber;
12 +import org.onlab.onos.net.SparseAnnotations;
13 +import org.onlab.onos.net.device.DefaultDeviceDescription;
14 +import org.onlab.onos.net.device.DefaultPortDescription;
15 +import org.onlab.onos.net.device.DeviceDescription;
16 +import org.onlab.onos.net.device.PortDescription;
17 +import org.onlab.onos.store.Timestamp;
18 +import org.onlab.onos.store.common.impl.Timestamped;
19 +
20 +/*
21 + * Collection of Description of a Device and Ports, given from a Provider.
22 + */
23 +class DeviceDescriptions {
24 +
25 + private volatile Timestamped<DeviceDescription> deviceDesc;
26 +
27 + private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
28 +
29 + public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
30 + this.deviceDesc = checkNotNull(desc);
31 + this.portDescs = new ConcurrentHashMap<>();
32 + }
33 +
34 + public Timestamp getLatestTimestamp() {
35 + Timestamp latest = deviceDesc.timestamp();
36 + for (Timestamped<PortDescription> desc : portDescs.values()) {
37 + if (desc.timestamp().compareTo(latest) > 0) {
38 + latest = desc.timestamp();
39 + }
40 + }
41 + return latest;
42 + }
43 +
44 + public Timestamped<DeviceDescription> getDeviceDesc() {
45 + return deviceDesc;
46 + }
47 +
48 + public Timestamped<PortDescription> getPortDesc(PortNumber number) {
49 + return portDescs.get(number);
50 + }
51 +
52 + public Map<PortNumber, Timestamped<PortDescription>> getPortDescs() {
53 + return Collections.unmodifiableMap(portDescs);
54 + }
55 +
56 + /**
57 + * Puts DeviceDescription, merging annotations as necessary.
58 + *
59 + * @param newDesc new DeviceDescription
60 + */
61 + public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
62 + Timestamped<DeviceDescription> oldOne = deviceDesc;
63 + Timestamped<DeviceDescription> newOne = newDesc;
64 + if (oldOne != null) {
65 + SparseAnnotations merged = union(oldOne.value().annotations(),
66 + newDesc.value().annotations());
67 + newOne = new Timestamped<DeviceDescription>(
68 + new DefaultDeviceDescription(newDesc.value(), merged),
69 + newDesc.timestamp());
70 + }
71 + deviceDesc = newOne;
72 + }
73 +
74 + /**
75 + * Puts PortDescription, merging annotations as necessary.
76 + *
77 + * @param newDesc new PortDescription
78 + */
79 + public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
80 + Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
81 + Timestamped<PortDescription> newOne = newDesc;
82 + if (oldOne != null) {
83 + SparseAnnotations merged = union(oldOne.value().annotations(),
84 + newDesc.value().annotations());
85 + newOne = new Timestamped<PortDescription>(
86 + new DefaultPortDescription(newDesc.value(), merged),
87 + newDesc.timestamp());
88 + }
89 + portDescs.put(newOne.value().portNumber(), newOne);
90 + }
91 +}
...@@ -5,8 +5,7 @@ import com.google.common.collect.ImmutableList; ...@@ -5,8 +5,7 @@ import com.google.common.collect.ImmutableList;
5 import com.google.common.collect.Maps; 5 import com.google.common.collect.Maps;
6 import com.google.common.collect.Sets; 6 import com.google.common.collect.Sets;
7 7
8 -import org.apache.commons.lang3.concurrent.ConcurrentException; 8 +import org.apache.commons.lang3.RandomUtils;
9 -import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10 import org.apache.felix.scr.annotations.Activate; 9 import org.apache.felix.scr.annotations.Activate;
11 import org.apache.felix.scr.annotations.Component; 10 import org.apache.felix.scr.annotations.Component;
12 import org.apache.felix.scr.annotations.Deactivate; 11 import org.apache.felix.scr.annotations.Deactivate;
...@@ -14,6 +13,8 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -14,6 +13,8 @@ import org.apache.felix.scr.annotations.Reference;
14 import org.apache.felix.scr.annotations.ReferenceCardinality; 13 import org.apache.felix.scr.annotations.ReferenceCardinality;
15 import org.apache.felix.scr.annotations.Service; 14 import org.apache.felix.scr.annotations.Service;
16 import org.onlab.onos.cluster.ClusterService; 15 import org.onlab.onos.cluster.ClusterService;
16 +import org.onlab.onos.cluster.ControllerNode;
17 +import org.onlab.onos.cluster.NodeId;
17 import org.onlab.onos.net.AnnotationsUtil; 18 import org.onlab.onos.net.AnnotationsUtil;
18 import org.onlab.onos.net.DefaultAnnotations; 19 import org.onlab.onos.net.DefaultAnnotations;
19 import org.onlab.onos.net.DefaultDevice; 20 import org.onlab.onos.net.DefaultDevice;
...@@ -23,9 +24,6 @@ import org.onlab.onos.net.Device.Type; ...@@ -23,9 +24,6 @@ import org.onlab.onos.net.Device.Type;
23 import org.onlab.onos.net.DeviceId; 24 import org.onlab.onos.net.DeviceId;
24 import org.onlab.onos.net.Port; 25 import org.onlab.onos.net.Port;
25 import org.onlab.onos.net.PortNumber; 26 import org.onlab.onos.net.PortNumber;
26 -import org.onlab.onos.net.SparseAnnotations;
27 -import org.onlab.onos.net.device.DefaultDeviceDescription;
28 -import org.onlab.onos.net.device.DefaultPortDescription;
29 import org.onlab.onos.net.device.DeviceDescription; 27 import org.onlab.onos.net.device.DeviceDescription;
30 import org.onlab.onos.net.device.DeviceEvent; 28 import org.onlab.onos.net.device.DeviceEvent;
31 import org.onlab.onos.net.device.DeviceStore; 29 import org.onlab.onos.net.device.DeviceStore;
...@@ -38,18 +36,22 @@ import org.onlab.onos.store.Timestamp; ...@@ -38,18 +36,22 @@ import org.onlab.onos.store.Timestamp;
38 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 36 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 37 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 38 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
41 -import org.onlab.onos.store.common.impl.MastershipBasedTimestamp; 39 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
42 import org.onlab.onos.store.common.impl.Timestamped; 40 import org.onlab.onos.store.common.impl.Timestamped;
43 -import org.onlab.onos.store.serializers.KryoPoolUtil; 41 +import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
42 +import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
43 +import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
44 import org.onlab.onos.store.serializers.KryoSerializer; 44 import org.onlab.onos.store.serializers.KryoSerializer;
45 -import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer; 45 +import org.onlab.onos.store.serializers.DistributedStoreSerializers;
46 import org.onlab.util.KryoPool; 46 import org.onlab.util.KryoPool;
47 import org.onlab.util.NewConcurrentHashMap; 47 import org.onlab.util.NewConcurrentHashMap;
48 import org.slf4j.Logger; 48 import org.slf4j.Logger;
49 49
50 import java.io.IOException; 50 import java.io.IOException;
51 import java.util.ArrayList; 51 import java.util.ArrayList;
52 +import java.util.Collection;
52 import java.util.Collections; 53 import java.util.Collections;
54 +import java.util.HashMap;
53 import java.util.HashSet; 55 import java.util.HashSet;
54 import java.util.Iterator; 56 import java.util.Iterator;
55 import java.util.List; 57 import java.util.List;
...@@ -57,19 +59,21 @@ import java.util.Map; ...@@ -57,19 +59,21 @@ import java.util.Map;
57 import java.util.Map.Entry; 59 import java.util.Map.Entry;
58 import java.util.Objects; 60 import java.util.Objects;
59 import java.util.Set; 61 import java.util.Set;
60 -import java.util.concurrent.ConcurrentHashMap;
61 import java.util.concurrent.ConcurrentMap; 62 import java.util.concurrent.ConcurrentMap;
62 -import java.util.concurrent.atomic.AtomicReference; 63 +import java.util.concurrent.ScheduledExecutorService;
64 +import java.util.concurrent.TimeUnit;
63 65
64 import static com.google.common.base.Preconditions.checkArgument; 66 import static com.google.common.base.Preconditions.checkArgument;
65 -import static com.google.common.base.Preconditions.checkNotNull;
66 import static com.google.common.base.Predicates.notNull; 67 import static com.google.common.base.Predicates.notNull;
67 import static org.onlab.onos.net.device.DeviceEvent.Type.*; 68 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68 import static org.slf4j.LoggerFactory.getLogger; 69 import static org.slf4j.LoggerFactory.getLogger;
69 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; 70 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70 import static org.onlab.onos.net.DefaultAnnotations.merge; 71 import static org.onlab.onos.net.DefaultAnnotations.merge;
71 -import static org.onlab.onos.net.DefaultAnnotations.union;
72 import static com.google.common.base.Verify.verify; 72 import static com.google.common.base.Verify.verify;
73 +import static org.onlab.util.Tools.namedThreads;
74 +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
75 +import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
76 +import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
73 77
74 // TODO: give me a better name 78 // TODO: give me a better name
75 /** 79 /**
...@@ -86,8 +90,9 @@ public class GossipDeviceStore ...@@ -86,8 +90,9 @@ public class GossipDeviceStore
86 90
87 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; 91 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
88 92
89 - // TODO: Check if inner Map can be replaced with plain Map 93 + // TODO: Check if inner Map can be replaced with plain Map.
90 // innerMap is used to lock a Device, thus instance should never be replaced. 94 // innerMap is used to lock a Device, thus instance should never be replaced.
95 +
91 // collection of Description given from various providers 96 // collection of Description given from various providers
92 private final ConcurrentMap<DeviceId, 97 private final ConcurrentMap<DeviceId,
93 ConcurrentMap<ProviderId, DeviceDescriptions>> 98 ConcurrentMap<ProviderId, DeviceDescriptions>>
...@@ -117,21 +122,23 @@ public class GossipDeviceStore ...@@ -117,21 +122,23 @@ public class GossipDeviceStore
117 @Override 122 @Override
118 protected void setupKryoPool() { 123 protected void setupKryoPool() {
119 serializerPool = KryoPool.newBuilder() 124 serializerPool = KryoPool.newBuilder()
120 - .register(KryoPoolUtil.API) 125 + .register(DistributedStoreSerializers.COMMON)
126 +
121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer()) 127 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
122 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer()) 128 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
123 .register(InternalDeviceRemovedEvent.class) 129 .register(InternalDeviceRemovedEvent.class)
124 .register(InternalPortEvent.class, new InternalPortEventSerializer()) 130 .register(InternalPortEvent.class, new InternalPortEventSerializer())
125 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer()) 131 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
126 - .register(Timestamp.class) 132 + .register(DeviceAntiEntropyAdvertisement.class)
127 - .register(Timestamped.class) 133 + .register(DeviceFragmentId.class)
128 - .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer()) 134 + .register(PortFragmentId.class)
129 .build() 135 .build()
130 .populate(1); 136 .populate(1);
131 } 137 }
132 -
133 }; 138 };
134 139
140 + private ScheduledExecutorService executor;
141 +
135 @Activate 142 @Activate
136 public void activate() { 143 public void activate() {
137 clusterCommunicator.addSubscriber( 144 clusterCommunicator.addSubscriber(
...@@ -144,11 +151,35 @@ public class GossipDeviceStore ...@@ -144,11 +151,35 @@ public class GossipDeviceStore
144 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener()); 151 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
145 clusterCommunicator.addSubscriber( 152 clusterCommunicator.addSubscriber(
146 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 153 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
154 + clusterCommunicator.addSubscriber(
155 + GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
156 +
157 + executor =
158 + newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
159 +
160 + // TODO: Make these configurable
161 + long initialDelaySec = 5;
162 + long periodSec = 5;
163 + // start anti-entropy thread
164 + executor.scheduleAtFixedRate(new SendAdvertisementTask(),
165 + initialDelaySec, periodSec, TimeUnit.SECONDS);
166 +
147 log.info("Started"); 167 log.info("Started");
148 } 168 }
149 169
150 @Deactivate 170 @Deactivate
151 public void deactivate() { 171 public void deactivate() {
172 +
173 + executor.shutdownNow();
174 + try {
175 + boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
176 + if (timedout) {
177 + log.error("Timeout during executor shutdown");
178 + }
179 + } catch (InterruptedException e) {
180 + log.error("Error during executor shutdown", e);
181 + }
182 +
152 deviceDescs.clear(); 183 deviceDescs.clear();
153 devices.clear(); 184 devices.clear();
154 devicePorts.clear(); 185 devicePorts.clear();
...@@ -543,14 +574,14 @@ public class GossipDeviceStore ...@@ -543,14 +574,14 @@ public class GossipDeviceStore
543 574
544 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); 575 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
545 if (existingPortDesc == null || 576 if (existingPortDesc == null ||
546 - deltaDesc == existingPortDesc ||
547 deltaDesc.isNewer(existingPortDesc)) { 577 deltaDesc.isNewer(existingPortDesc)) {
548 // on new port or valid update 578 // on new port or valid update
549 // update description 579 // update description
550 descs.putPortDesc(deltaDesc); 580 descs.putPortDesc(deltaDesc);
551 newPort = composePort(device, number, descsMap); 581 newPort = composePort(device, number, descsMap);
552 } else { 582 } else {
553 - // outdated event, ignored. 583 + // same or outdated event, ignored.
584 + log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
554 return null; 585 return null;
555 } 586 }
556 587
...@@ -627,6 +658,14 @@ public class GossipDeviceStore ...@@ -627,6 +658,14 @@ public class GossipDeviceStore
627 } 658 }
628 } 659 }
629 660
661 + /**
662 + * Checks if given timestamp is superseded by removal request
663 + * with more recent timestamp.
664 + *
665 + * @param deviceId identifier of a device
666 + * @param timestampToCheck timestamp of an event to check
667 + * @return true if device is already removed
668 + */
630 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { 669 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
631 Timestamp removalTimestamp = removalRequest.get(deviceId); 670 Timestamp removalTimestamp = removalRequest.get(deviceId);
632 if (removalTimestamp != null && 671 if (removalTimestamp != null &&
...@@ -667,7 +706,7 @@ public class GossipDeviceStore ...@@ -667,7 +706,7 @@ public class GossipDeviceStore
667 continue; 706 continue;
668 } 707 }
669 // TODO: should keep track of Description timestamp 708 // TODO: should keep track of Description timestamp
670 - // and only merge conflicting keys when timestamp is newer 709 + // and only merge conflicting keys when timestamp is newer.
671 // Currently assuming there will never be a key conflict between 710 // Currently assuming there will never be a key conflict between
672 // providers 711 // providers
673 712
...@@ -708,7 +747,7 @@ public class GossipDeviceStore ...@@ -708,7 +747,7 @@ public class GossipDeviceStore
708 continue; 747 continue;
709 } 748 }
710 // TODO: should keep track of Description timestamp 749 // TODO: should keep track of Description timestamp
711 - // and only merge conflicting keys when timestamp is newer 750 + // and only merge conflicting keys when timestamp is newer.
712 // Currently assuming there will never be a key conflict between 751 // Currently assuming there will never be a key conflict between
713 // providers 752 // providers
714 753
...@@ -745,129 +784,258 @@ public class GossipDeviceStore ...@@ -745,129 +784,258 @@ public class GossipDeviceStore
745 return providerDescs.get(pid); 784 return providerDescs.get(pid);
746 } 785 }
747 786
748 - public static final class InitDeviceDescs 787 + // TODO: should we be throwing exception?
749 - implements ConcurrentInitializer<DeviceDescriptions> { 788 + private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
750 - 789 + ClusterMessage message = new ClusterMessage(
751 - private final Timestamped<DeviceDescription> deviceDesc; 790 + clusterService.getLocalNode().id(),
791 + subject,
792 + SERIALIZER.encode(event));
793 + clusterCommunicator.unicast(message, recipient);
794 + }
752 795
753 - public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) { 796 + // TODO: should we be throwing exception?
754 - this.deviceDesc = checkNotNull(deviceDesc); 797 + private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
798 + ClusterMessage message = new ClusterMessage(
799 + clusterService.getLocalNode().id(),
800 + subject,
801 + SERIALIZER.encode(event));
802 + clusterCommunicator.broadcast(message);
755 } 803 }
756 - @Override 804 +
757 - public DeviceDescriptions get() throws ConcurrentException { 805 + private void notifyPeers(InternalDeviceEvent event) throws IOException {
758 - return new DeviceDescriptions(deviceDesc); 806 + broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
759 } 807 }
808 +
809 + private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
810 + broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
760 } 811 }
761 812
813 + private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
814 + broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
815 + }
762 816
763 - /** 817 + private void notifyPeers(InternalPortEvent event) throws IOException {
764 - * Collection of Description of a Device and it's Ports given from a Provider. 818 + broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
765 - */ 819 + }
766 - public static class DeviceDescriptions {
767 820
768 - private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc; 821 + private void notifyPeers(InternalPortStatusEvent event) throws IOException {
769 - private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs; 822 + broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
823 + }
770 824
771 - public DeviceDescriptions(Timestamped<DeviceDescription> desc) { 825 + private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
772 - this.deviceDesc = new AtomicReference<>(checkNotNull(desc)); 826 + try {
773 - this.portDescs = new ConcurrentHashMap<>(); 827 + unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
828 + } catch (IOException e) {
829 + log.error("Failed to send" + event + " to " + recipient, e);
830 + }
774 } 831 }
775 832
776 - Timestamp getLatestTimestamp() { 833 + private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
777 - Timestamp latest = deviceDesc.get().timestamp(); 834 + try {
778 - for (Timestamped<PortDescription> desc : portDescs.values()) { 835 + unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
779 - if (desc.timestamp().compareTo(latest) > 0) { 836 + } catch (IOException e) {
780 - latest = desc.timestamp(); 837 + log.error("Failed to send" + event + " to " + recipient, e);
838 + }
781 } 839 }
840 +
841 + private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
842 + try {
843 + unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
844 + } catch (IOException e) {
845 + log.error("Failed to send" + event + " to " + recipient, e);
782 } 846 }
783 - return latest;
784 } 847 }
785 848
786 - public Timestamped<DeviceDescription> getDeviceDesc() { 849 + private void notifyPeer(NodeId recipient, InternalPortEvent event) {
787 - return deviceDesc.get(); 850 + try {
851 + unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
852 + } catch (IOException e) {
853 + log.error("Failed to send" + event + " to " + recipient, e);
854 + }
788 } 855 }
789 856
790 - public Timestamped<PortDescription> getPortDesc(PortNumber number) { 857 + private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
791 - return portDescs.get(number); 858 + try {
859 + unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
860 + } catch (IOException e) {
861 + log.error("Failed to send" + event + " to " + recipient, e);
862 + }
792 } 863 }
793 864
794 - /** 865 + private DeviceAntiEntropyAdvertisement createAdvertisement() {
795 - * Puts DeviceDescription, merging annotations as necessary. 866 + final NodeId self = clusterService.getLocalNode().id();
796 - * 867 +
797 - * @param newDesc new DeviceDescription 868 + Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
798 - * @return previous DeviceDescription 869 + final int portsPerDevice = 8; // random guess to minimize reallocation
799 - */ 870 + Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
800 - public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) { 871 + Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
801 - Timestamped<DeviceDescription> oldOne = deviceDesc.get(); 872 +
802 - Timestamped<DeviceDescription> newOne = newDesc; 873 + for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
803 - if (oldOne != null) { 874 + provs : deviceDescs.entrySet()) {
804 - SparseAnnotations merged = union(oldOne.value().annotations(), 875 +
805 - newDesc.value().annotations()); 876 + final DeviceId deviceId = provs.getKey();
806 - newOne = new Timestamped<DeviceDescription>( 877 + final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
807 - new DefaultDeviceDescription(newDesc.value(), merged), 878 + synchronized (devDescs) {
808 - newDesc.timestamp()); 879 +
880 + offline.put(deviceId, this.offline.get(deviceId));
881 +
882 + for (Entry<ProviderId, DeviceDescriptions>
883 + prov : devDescs.entrySet()) {
884 +
885 + final ProviderId provId = prov.getKey();
886 + final DeviceDescriptions descs = prov.getValue();
887 +
888 + devices.put(new DeviceFragmentId(deviceId, provId),
889 + descs.getDeviceDesc().timestamp());
890 +
891 + for (Entry<PortNumber, Timestamped<PortDescription>>
892 + portDesc : descs.getPortDescs().entrySet()) {
893 +
894 + final PortNumber number = portDesc.getKey();
895 + ports.put(new PortFragmentId(deviceId, provId, number),
896 + portDesc.getValue().timestamp());
897 + }
898 + }
809 } 899 }
810 - return deviceDesc.getAndSet(newOne); 900 + }
901 +
902 + return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
811 } 903 }
812 904
813 /** 905 /**
814 - * Puts PortDescription, merging annotations as necessary. 906 + * Responds to anti-entropy advertisement message.
907 + * <P>
908 + * Notify sender about out-dated information using regular replication message.
909 + * Send back advertisement to sender if not in sync.
815 * 910 *
816 - * @param newDesc new PortDescription 911 + * @param advertisement to respond to
817 - * @return previous PortDescription
818 */ 912 */
819 - public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) { 913 + private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
820 - Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber()); 914 +
821 - Timestamped<PortDescription> newOne = newDesc; 915 + final NodeId sender = advertisement.sender();
822 - if (oldOne != null) { 916 +
823 - SparseAnnotations merged = union(oldOne.value().annotations(), 917 + Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
824 - newDesc.value().annotations()); 918 + Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
825 - newOne = new Timestamped<PortDescription>( 919 + Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
826 - new DefaultPortDescription(newDesc.value(), merged), 920 +
827 - newDesc.timestamp()); 921 + // Fragments to request
922 + Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
923 + Collection<PortFragmentId> reqPorts = new ArrayList<>();
924 +
925 + for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
926 + final DeviceId deviceId = de.getKey();
927 + final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
928 +
929 + synchronized (lDevice) {
930 + // latestTimestamp across provider
931 + // Note: can be null initially
932 + Timestamp localLatest = offline.get(deviceId);
933 +
934 + // handle device Ads
935 + for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
936 + final ProviderId provId = prov.getKey();
937 + final DeviceDescriptions lDeviceDescs = prov.getValue();
938 +
939 + final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
940 +
941 +
942 + Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
943 + Timestamp advDevTimestamp = devAds.get(devFragId);
944 +
945 + if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
946 + // remote does not have it or outdated, suggest
947 + notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
948 + } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
949 + // local is outdated, request
950 + reqDevices.add(devFragId);
828 } 951 }
829 - return portDescs.put(newOne.value().portNumber(), newOne); 952 +
953 + // handle port Ads
954 + for (Entry<PortNumber, Timestamped<PortDescription>>
955 + pe : lDeviceDescs.getPortDescs().entrySet()) {
956 +
957 + final PortNumber num = pe.getKey();
958 + final Timestamped<PortDescription> lPort = pe.getValue();
959 +
960 + final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
961 +
962 + Timestamp advPortTimestamp = portAds.get(portFragId);
963 + if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
964 + // remote does not have it or outdated, suggest
965 + notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
966 + } else if (!lPort.timestamp().equals(advPortTimestamp)) {
967 + // local is outdated, request
968 + log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
969 + reqPorts.add(portFragId);
830 } 970 }
971 +
972 + // remove port Ad already processed
973 + portAds.remove(portFragId);
974 + } // end local port loop
975 +
976 + // remove device Ad already processed
977 + devAds.remove(devFragId);
978 +
979 + // find latest and update
980 + final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
981 + if (localLatest == null ||
982 + providerLatest.compareTo(localLatest) > 0) {
983 + localLatest = providerLatest;
831 } 984 }
985 + } // end local provider loop
832 986
833 - private void notifyPeers(InternalDeviceEvent event) throws IOException { 987 + // checking if remote timestamp is more recent.
834 - ClusterMessage message = new ClusterMessage( 988 + Timestamp rOffline = offlineAds.get(deviceId);
835 - clusterService.getLocalNode().id(), 989 + if (rOffline != null &&
836 - GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, 990 + rOffline.compareTo(localLatest) > 0) {
837 - SERIALIZER.encode(event)); 991 + // remote offline timestamp suggests that the
838 - clusterCommunicator.broadcast(message); 992 + // device is off-line
993 + markOfflineInternal(deviceId, rOffline);
839 } 994 }
840 995
841 - private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException { 996 + Timestamp lOffline = offline.get(deviceId);
842 - ClusterMessage message = new ClusterMessage( 997 + if (lOffline != null && rOffline == null) {
843 - clusterService.getLocalNode().id(), 998 + // locally offline, but remote is online, suggest offline
844 - GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, 999 + notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
845 - SERIALIZER.encode(event));
846 - clusterCommunicator.broadcast(message);
847 } 1000 }
848 1001
849 - private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException { 1002 + // remove device offline Ad already processed
850 - ClusterMessage message = new ClusterMessage( 1003 + offlineAds.remove(deviceId);
851 - clusterService.getLocalNode().id(), 1004 + } // end local device loop
852 - GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, 1005 + } // device lock
853 - SERIALIZER.encode(event)); 1006 +
854 - clusterCommunicator.broadcast(message); 1007 + // If there is any Ads left, request them
1008 + log.trace("Ads left {}, {}", devAds, portAds);
1009 + reqDevices.addAll(devAds.keySet());
1010 + reqPorts.addAll(portAds.keySet());
1011 +
1012 + if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1013 + log.trace("Nothing to request to remote peer {}", sender);
1014 + return;
855 } 1015 }
856 1016
857 - private void notifyPeers(InternalPortEvent event) throws IOException { 1017 + log.info("Need to sync {} {}", reqDevices, reqPorts);
858 - ClusterMessage message = new ClusterMessage( 1018 +
859 - clusterService.getLocalNode().id(), 1019 + // 2-way Anti-Entropy for now
860 - GossipDeviceStoreMessageSubjects.PORT_UPDATE, 1020 + try {
861 - SERIALIZER.encode(event)); 1021 + unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
862 - clusterCommunicator.broadcast(message); 1022 + } catch (IOException e) {
1023 + log.error("Failed to send response advertisement to " + sender, e);
863 } 1024 }
864 1025
865 - private void notifyPeers(InternalPortStatusEvent event) throws IOException { 1026 +// Sketch of 3-way Anti-Entropy
866 - ClusterMessage message = new ClusterMessage( 1027 +// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
867 - clusterService.getLocalNode().id(), 1028 +// ClusterMessage message = new ClusterMessage(
868 - GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, 1029 +// clusterService.getLocalNode().id(),
869 - SERIALIZER.encode(event)); 1030 +// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
870 - clusterCommunicator.broadcast(message); 1031 +// SERIALIZER.encode(request));
1032 +//
1033 +// try {
1034 +// clusterCommunicator.unicast(message, advertisement.sender());
1035 +// } catch (IOException e) {
1036 +// log.error("Failed to send advertisement reply to "
1037 +// + advertisement.sender(), e);
1038 +// }
871 } 1039 }
872 1040
873 private void notifyDelegateIfNotNull(DeviceEvent event) { 1041 private void notifyDelegateIfNotNull(DeviceEvent event) {
...@@ -876,6 +1044,54 @@ public class GossipDeviceStore ...@@ -876,6 +1044,54 @@ public class GossipDeviceStore
876 } 1044 }
877 } 1045 }
878 1046
1047 + private final class SendAdvertisementTask implements Runnable {
1048 +
1049 + @Override
1050 + public void run() {
1051 + if (Thread.currentThread().isInterrupted()) {
1052 + log.info("Interrupted, quitting");
1053 + return;
1054 + }
1055 +
1056 + try {
1057 + final NodeId self = clusterService.getLocalNode().id();
1058 + Set<ControllerNode> nodes = clusterService.getNodes();
1059 +
1060 + ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1061 + .transform(toNodeId())
1062 + .toList();
1063 +
1064 + if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1065 + log.info("No other peers in the cluster.");
1066 + return;
1067 + }
1068 +
1069 + NodeId peer;
1070 + do {
1071 + int idx = RandomUtils.nextInt(0, nodeIds.size());
1072 + peer = nodeIds.get(idx);
1073 + } while (peer.equals(self));
1074 +
1075 + DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1076 +
1077 + if (Thread.currentThread().isInterrupted()) {
1078 + log.info("Interrupted, quitting");
1079 + return;
1080 + }
1081 +
1082 + try {
1083 + unicastMessage(peer, DEVICE_ADVERTISE, ad);
1084 + } catch (IOException e) {
1085 + log.error("Failed to send anti-entropy advertisement", e);
1086 + return;
1087 + }
1088 + } catch (Exception e) {
1089 + // catch all Exception to avoid Scheduled task being suppressed.
1090 + log.error("Exception thrown while sending advertisement", e);
1091 + }
1092 + }
1093 + }
1094 +
879 private class InternalDeviceEventListener implements ClusterMessageHandler { 1095 private class InternalDeviceEventListener implements ClusterMessageHandler {
880 @Override 1096 @Override
881 public void handle(ClusterMessage message) { 1097 public void handle(ClusterMessage message) {
...@@ -940,6 +1156,7 @@ public class GossipDeviceStore ...@@ -940,6 +1156,7 @@ public class GossipDeviceStore
940 1156
941 log.info("Received port status update event from peer: {}", message.sender()); 1157 log.info("Received port status update event from peer: {}", message.sender());
942 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload()); 1158 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
1159 + log.info("{}", event);
943 1160
944 ProviderId providerId = event.providerId(); 1161 ProviderId providerId = event.providerId();
945 DeviceId deviceId = event.deviceId(); 1162 DeviceId deviceId = event.deviceId();
...@@ -948,4 +1165,15 @@ public class GossipDeviceStore ...@@ -948,4 +1165,15 @@ public class GossipDeviceStore
948 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription)); 1165 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
949 } 1166 }
950 } 1167 }
1168 +
1169 + private final class InternalDeviceAdvertisementListener
1170 + implements ClusterMessageHandler {
1171 +
1172 + @Override
1173 + public void handle(ClusterMessage message) {
1174 + log.info("Received Device advertisement from peer: {}", message.sender());
1175 + DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1176 + handleAdvertisement(advertisement);
1177 + }
1178 + }
951 } 1179 }
......
...@@ -2,6 +2,7 @@ package org.onlab.onos.store.device.impl; ...@@ -2,6 +2,7 @@ package org.onlab.onos.store.device.impl;
2 2
3 import org.onlab.onos.store.cluster.messaging.MessageSubject; 3 import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 4
5 +// TODO: add prefix to assure uniqueness.
5 /** 6 /**
6 * MessageSubjects used by GossipDeviceStore peer-peer communication. 7 * MessageSubjects used by GossipDeviceStore peer-peer communication.
7 */ 8 */
...@@ -14,4 +15,8 @@ public final class GossipDeviceStoreMessageSubjects { ...@@ -14,4 +15,8 @@ public final class GossipDeviceStoreMessageSubjects {
14 public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed"); 15 public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
15 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); 16 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
16 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); 17 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
18 +
19 + public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
20 + // to be used with 3-way anti-entropy process
21 + public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
17 } 22 }
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import org.apache.commons.lang3.concurrent.ConcurrentException;
6 +import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
7 +import org.onlab.onos.net.device.DeviceDescription;
8 +import org.onlab.onos.store.common.impl.Timestamped;
9 +
10 +// FIXME: consider removing this class
11 +public final class InitDeviceDescs
12 + implements ConcurrentInitializer<DeviceDescriptions> {
13 +
14 + private final Timestamped<DeviceDescription> deviceDesc;
15 +
16 + public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
17 + this.deviceDesc = checkNotNull(deviceDesc);
18 + }
19 + @Override
20 + public DeviceDescriptions get() throws ConcurrentException {
21 + return new DeviceDescriptions(deviceDesc);
22 + }
23 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -5,6 +5,8 @@ import org.onlab.onos.net.device.DeviceDescription; ...@@ -5,6 +5,8 @@ import org.onlab.onos.net.device.DeviceDescription;
5 import org.onlab.onos.net.provider.ProviderId; 5 import org.onlab.onos.net.provider.ProviderId;
6 import org.onlab.onos.store.common.impl.Timestamped; 6 import org.onlab.onos.store.common.impl.Timestamped;
7 7
8 +import com.google.common.base.MoreObjects;
9 +
8 /** 10 /**
9 * Information published by GossipDeviceStore to notify peers of a device 11 * Information published by GossipDeviceStore to notify peers of a device
10 * change event. 12 * change event.
...@@ -36,6 +38,15 @@ public class InternalDeviceEvent { ...@@ -36,6 +38,15 @@ public class InternalDeviceEvent {
36 return deviceDescription; 38 return deviceDescription;
37 } 39 }
38 40
41 + @Override
42 + public String toString() {
43 + return MoreObjects.toStringHelper(getClass())
44 + .add("providerId", providerId)
45 + .add("deviceId", deviceId)
46 + .add("deviceDescription", deviceDescription)
47 + .toString();
48 + }
49 +
39 // for serializer 50 // for serializer
40 protected InternalDeviceEvent() { 51 protected InternalDeviceEvent() {
41 this.providerId = null; 52 this.providerId = null;
......
...@@ -3,6 +3,8 @@ package org.onlab.onos.store.device.impl; ...@@ -3,6 +3,8 @@ package org.onlab.onos.store.device.impl;
3 import org.onlab.onos.net.DeviceId; 3 import org.onlab.onos.net.DeviceId;
4 import org.onlab.onos.store.Timestamp; 4 import org.onlab.onos.store.Timestamp;
5 5
6 +import com.google.common.base.MoreObjects;
7 +
6 /** 8 /**
7 * Information published by GossipDeviceStore to notify peers of a device 9 * Information published by GossipDeviceStore to notify peers of a device
8 * going offline. 10 * going offline.
...@@ -30,6 +32,14 @@ public class InternalDeviceOfflineEvent { ...@@ -30,6 +32,14 @@ public class InternalDeviceOfflineEvent {
30 return timestamp; 32 return timestamp;
31 } 33 }
32 34
35 + @Override
36 + public String toString() {
37 + return MoreObjects.toStringHelper(getClass())
38 + .add("deviceId", deviceId)
39 + .add("timestamp", timestamp)
40 + .toString();
41 + }
42 +
33 // for serializer 43 // for serializer
34 @SuppressWarnings("unused") 44 @SuppressWarnings("unused")
35 private InternalDeviceOfflineEvent() { 45 private InternalDeviceOfflineEvent() {
......
...@@ -3,6 +3,8 @@ package org.onlab.onos.store.device.impl; ...@@ -3,6 +3,8 @@ package org.onlab.onos.store.device.impl;
3 import org.onlab.onos.net.DeviceId; 3 import org.onlab.onos.net.DeviceId;
4 import org.onlab.onos.store.Timestamp; 4 import org.onlab.onos.store.Timestamp;
5 5
6 +import com.google.common.base.MoreObjects;
7 +
6 /** 8 /**
7 * Information published by GossipDeviceStore to notify peers of a device 9 * Information published by GossipDeviceStore to notify peers of a device
8 * being administratively removed. 10 * being administratively removed.
...@@ -30,6 +32,14 @@ public class InternalDeviceRemovedEvent { ...@@ -30,6 +32,14 @@ public class InternalDeviceRemovedEvent {
30 return timestamp; 32 return timestamp;
31 } 33 }
32 34
35 + @Override
36 + public String toString() {
37 + return MoreObjects.toStringHelper(getClass())
38 + .add("deviceId", deviceId)
39 + .add("timestamp", timestamp)
40 + .toString();
41 + }
42 +
33 // for serializer 43 // for serializer
34 @SuppressWarnings("unused") 44 @SuppressWarnings("unused")
35 private InternalDeviceRemovedEvent() { 45 private InternalDeviceRemovedEvent() {
......
...@@ -7,6 +7,8 @@ import org.onlab.onos.net.device.PortDescription; ...@@ -7,6 +7,8 @@ import org.onlab.onos.net.device.PortDescription;
7 import org.onlab.onos.net.provider.ProviderId; 7 import org.onlab.onos.net.provider.ProviderId;
8 import org.onlab.onos.store.common.impl.Timestamped; 8 import org.onlab.onos.store.common.impl.Timestamped;
9 9
10 +import com.google.common.base.MoreObjects;
11 +
10 /** 12 /**
11 * Information published by GossipDeviceStore to notify peers of a port 13 * Information published by GossipDeviceStore to notify peers of a port
12 * change event. 14 * change event.
...@@ -38,6 +40,15 @@ public class InternalPortEvent { ...@@ -38,6 +40,15 @@ public class InternalPortEvent {
38 return portDescriptions; 40 return portDescriptions;
39 } 41 }
40 42
43 + @Override
44 + public String toString() {
45 + return MoreObjects.toStringHelper(getClass())
46 + .add("providerId", providerId)
47 + .add("deviceId", deviceId)
48 + .add("portDescriptions", portDescriptions)
49 + .toString();
50 + }
51 +
41 // for serializer 52 // for serializer
42 protected InternalPortEvent() { 53 protected InternalPortEvent() {
43 this.providerId = null; 54 this.providerId = null;
......
...@@ -5,6 +5,8 @@ import org.onlab.onos.net.device.PortDescription; ...@@ -5,6 +5,8 @@ import org.onlab.onos.net.device.PortDescription;
5 import org.onlab.onos.net.provider.ProviderId; 5 import org.onlab.onos.net.provider.ProviderId;
6 import org.onlab.onos.store.common.impl.Timestamped; 6 import org.onlab.onos.store.common.impl.Timestamped;
7 7
8 +import com.google.common.base.MoreObjects;
9 +
8 /** 10 /**
9 * Information published by GossipDeviceStore to notify peers of a port 11 * Information published by GossipDeviceStore to notify peers of a port
10 * status change event. 12 * status change event.
...@@ -36,6 +38,15 @@ public class InternalPortStatusEvent { ...@@ -36,6 +38,15 @@ public class InternalPortStatusEvent {
36 return portDescription; 38 return portDescription;
37 } 39 }
38 40
41 + @Override
42 + public String toString() {
43 + return MoreObjects.toStringHelper(getClass())
44 + .add("providerId", providerId)
45 + .add("deviceId", deviceId)
46 + .add("portDescription", portDescription)
47 + .toString();
48 + }
49 +
39 // for serializer 50 // for serializer
40 protected InternalPortStatusEvent() { 51 protected InternalPortStatusEvent() {
41 this.providerId = null; 52 this.providerId = null;
......
...@@ -35,6 +35,7 @@ public class InternalPortStatusEventSerializer extends Serializer<InternalPortSt ...@@ -35,6 +35,7 @@ public class InternalPortStatusEventSerializer extends Serializer<InternalPortSt
35 Class<InternalPortStatusEvent> type) { 35 Class<InternalPortStatusEvent> type) {
36 ProviderId providerId = (ProviderId) kryo.readClassAndObject(input); 36 ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
37 DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input); 37 DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
38 + @SuppressWarnings("unchecked")
38 Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input); 39 Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
39 40
40 return new InternalPortStatusEvent(providerId, deviceId, portDescription); 41 return new InternalPortStatusEvent(providerId, deviceId, portDescription);
......
1 +package org.onlab.onos.store.device.impl.peermsg;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import java.util.Map;
6 +
7 +import org.onlab.onos.cluster.NodeId;
8 +import org.onlab.onos.net.DeviceId;
9 +import org.onlab.onos.store.Timestamp;
10 +
11 +
12 +/**
13 + * Device Advertisement message.
14 + */
15 +public class DeviceAntiEntropyAdvertisement {
16 +
17 + private final NodeId sender;
18 + private final Map<DeviceFragmentId, Timestamp> deviceFingerPrints;
19 + private final Map<PortFragmentId, Timestamp> portFingerPrints;
20 + private final Map<DeviceId, Timestamp> offline;
21 +
22 +
23 + public DeviceAntiEntropyAdvertisement(NodeId sender,
24 + Map<DeviceFragmentId, Timestamp> devices,
25 + Map<PortFragmentId, Timestamp> ports,
26 + Map<DeviceId, Timestamp> offline) {
27 + this.sender = checkNotNull(sender);
28 + this.deviceFingerPrints = checkNotNull(devices);
29 + this.portFingerPrints = checkNotNull(ports);
30 + this.offline = checkNotNull(offline);
31 + }
32 +
33 + public NodeId sender() {
34 + return sender;
35 + }
36 +
37 + public Map<DeviceFragmentId, Timestamp> deviceFingerPrints() {
38 + return deviceFingerPrints;
39 + }
40 +
41 + public Map<PortFragmentId, Timestamp> ports() {
42 + return portFingerPrints;
43 + }
44 +
45 + public Map<DeviceId, Timestamp> offline() {
46 + return offline;
47 + }
48 +
49 + // For serializer
50 + @SuppressWarnings("unused")
51 + private DeviceAntiEntropyAdvertisement() {
52 + this.sender = null;
53 + this.deviceFingerPrints = null;
54 + this.portFingerPrints = null;
55 + this.offline = null;
56 + }
57 +}
1 +package org.onlab.onos.store.device.impl.peermsg;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import java.util.Collection;
6 +
7 +import org.onlab.onos.cluster.NodeId;
8 +
9 +/**
10 + * Message to request for other peers information.
11 + */
12 +public class DeviceAntiEntropyRequest {
13 +
14 + private final NodeId sender;
15 + private final Collection<DeviceFragmentId> devices;
16 + private final Collection<PortFragmentId> ports;
17 +
18 + public DeviceAntiEntropyRequest(NodeId sender,
19 + Collection<DeviceFragmentId> devices,
20 + Collection<PortFragmentId> ports) {
21 +
22 + this.sender = checkNotNull(sender);
23 + this.devices = checkNotNull(devices);
24 + this.ports = checkNotNull(ports);
25 + }
26 +
27 + public NodeId sender() {
28 + return sender;
29 + }
30 +
31 + public Collection<DeviceFragmentId> devices() {
32 + return devices;
33 + }
34 +
35 + public Collection<PortFragmentId> ports() {
36 + return ports;
37 + }
38 +
39 + // For serializer
40 + @SuppressWarnings("unused")
41 + private DeviceAntiEntropyRequest() {
42 + this.sender = null;
43 + this.devices = null;
44 + this.ports = null;
45 + }
46 +}
1 +package org.onlab.onos.store.device.impl.peermsg;
2 +
3 +import java.util.Objects;
4 +
5 +import org.onlab.onos.net.DeviceId;
6 +import org.onlab.onos.net.provider.ProviderId;
7 +
8 +import com.google.common.base.MoreObjects;
9 +
10 +/**
11 + * Identifier for DeviceDesctiption from a Provider.
12 + */
13 +public final class DeviceFragmentId {
14 + public final ProviderId providerId;
15 + public final DeviceId deviceId;
16 +
17 + public DeviceFragmentId(DeviceId deviceId, ProviderId providerId) {
18 + this.providerId = providerId;
19 + this.deviceId = deviceId;
20 + }
21 +
22 + @Override
23 + public int hashCode() {
24 + return Objects.hash(providerId, deviceId);
25 + }
26 +
27 + @Override
28 + public boolean equals(Object obj) {
29 + if (this == obj) {
30 + return true;
31 + }
32 + if (!(obj instanceof DeviceFragmentId)) {
33 + return false;
34 + }
35 + DeviceFragmentId that = (DeviceFragmentId) obj;
36 + return Objects.equals(this.deviceId, that.deviceId) &&
37 + Objects.equals(this.providerId, that.providerId);
38 + }
39 +
40 + @Override
41 + public String toString() {
42 + return MoreObjects.toStringHelper(getClass())
43 + .add("providerId", providerId)
44 + .add("deviceId", deviceId)
45 + .toString();
46 + }
47 +
48 + // for serializer
49 + @SuppressWarnings("unused")
50 + private DeviceFragmentId() {
51 + this.providerId = null;
52 + this.deviceId = null;
53 + }
54 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.device.impl.peermsg;
2 +
3 +import java.util.Objects;
4 +
5 +import org.onlab.onos.net.DeviceId;
6 +import org.onlab.onos.net.PortNumber;
7 +import org.onlab.onos.net.provider.ProviderId;
8 +
9 +import com.google.common.base.MoreObjects;
10 +
11 +/**
12 + * Identifier for PortDescription from a Provider.
13 + */
14 +public final class PortFragmentId {
15 + public final ProviderId providerId;
16 + public final DeviceId deviceId;
17 + public final PortNumber portNumber;
18 +
19 + public PortFragmentId(DeviceId deviceId, ProviderId providerId,
20 + PortNumber portNumber) {
21 + this.providerId = providerId;
22 + this.deviceId = deviceId;
23 + this.portNumber = portNumber;
24 + }
25 +
26 + @Override
27 + public int hashCode() {
28 + return Objects.hash(providerId, deviceId, portNumber);
29 + };
30 +
31 + @Override
32 + public boolean equals(Object obj) {
33 + if (this == obj) {
34 + return true;
35 + }
36 + if (!(obj instanceof PortFragmentId)) {
37 + return false;
38 + }
39 + PortFragmentId that = (PortFragmentId) obj;
40 + return Objects.equals(this.deviceId, that.deviceId) &&
41 + Objects.equals(this.portNumber, that.portNumber) &&
42 + Objects.equals(this.providerId, that.providerId);
43 + }
44 +
45 + @Override
46 + public String toString() {
47 + return MoreObjects.toStringHelper(getClass())
48 + .add("providerId", providerId)
49 + .add("deviceId", deviceId)
50 + .add("portNumber", portNumber)
51 + .toString();
52 + }
53 +
54 + // for serializer
55 + @SuppressWarnings("unused")
56 + private PortFragmentId() {
57 + this.providerId = null;
58 + this.deviceId = null;
59 + this.portNumber = null;
60 + }
61 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/**
2 + * Structure and utilities used for inter-Node messaging.
3 + */
4 +package org.onlab.onos.store.device.impl.peermsg;
...@@ -31,7 +31,6 @@ import org.onlab.onos.net.provider.ProviderId; ...@@ -31,7 +31,6 @@ import org.onlab.onos.net.provider.ProviderId;
31 import org.onlab.onos.store.AbstractStore; 31 import org.onlab.onos.store.AbstractStore;
32 import org.onlab.onos.store.ClockService; 32 import org.onlab.onos.store.ClockService;
33 import org.onlab.onos.store.Timestamp; 33 import org.onlab.onos.store.Timestamp;
34 -import org.onlab.onos.store.device.impl.VersionedValue;
35 import org.slf4j.Logger; 34 import org.slf4j.Logger;
36 35
37 import com.google.common.collect.HashMultimap; 36 import com.google.common.collect.HashMultimap;
......
1 -package org.onlab.onos.store.device.impl; 1 +package org.onlab.onos.store.link.impl;
2 2
3 import java.util.Objects; 3 import java.util.Objects;
4 4
5 import org.onlab.onos.store.Timestamp; 5 import org.onlab.onos.store.Timestamp;
6 6
7 +// TODO: remove once we stop using this
7 /** 8 /**
8 * Wrapper class for a entity that is versioned 9 * Wrapper class for a entity that is versioned
9 * and can either be up or down. 10 * and can either be up or down.
......
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
4 +import org.onlab.onos.store.common.impl.Timestamped;
5 +import org.onlab.util.KryoPool;
6 +
7 +public final class DistributedStoreSerializers {
8 +
9 + /**
10 + * KryoPool which can serialize ON.lab misc classes.
11 + */
12 + public static final KryoPool COMMON = KryoPool.newBuilder()
13 + .register(KryoPoolUtil.API)
14 + .register(Timestamped.class)
15 + .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
16 + .build();
17 +
18 + // avoid instantiation
19 + private DistributedStoreSerializers() {}
20 +}
...@@ -25,6 +25,7 @@ import org.onlab.onos.net.PortNumber; ...@@ -25,6 +25,7 @@ import org.onlab.onos.net.PortNumber;
25 import org.onlab.onos.net.device.DefaultDeviceDescription; 25 import org.onlab.onos.net.device.DefaultDeviceDescription;
26 import org.onlab.onos.net.device.DefaultPortDescription; 26 import org.onlab.onos.net.device.DefaultPortDescription;
27 import org.onlab.onos.net.provider.ProviderId; 27 import org.onlab.onos.net.provider.ProviderId;
28 +import org.onlab.onos.store.Timestamp;
28 import org.onlab.packet.IpAddress; 29 import org.onlab.packet.IpAddress;
29 import org.onlab.packet.IpPrefix; 30 import org.onlab.packet.IpPrefix;
30 import org.onlab.util.KryoPool; 31 import org.onlab.util.KryoPool;
...@@ -63,7 +64,9 @@ public final class KryoPoolUtil { ...@@ -63,7 +64,9 @@ public final class KryoPoolUtil {
63 Port.class, 64 Port.class,
64 DefaultPortDescription.class, 65 DefaultPortDescription.class,
65 Element.class, 66 Element.class,
66 - Link.Type.class 67 + Link.Type.class,
68 + Timestamp.class
69 +
67 ) 70 )
68 .register(URI.class, new URISerializer()) 71 .register(URI.class, new URISerializer())
69 .register(NodeId.class, new NodeIdSerializer()) 72 .register(NodeId.class, new NodeIdSerializer())
......
1 package org.onlab.onos.store.serializers; 1 package org.onlab.onos.store.serializers;
2 2
3 import org.onlab.util.KryoPool; 3 import org.onlab.util.KryoPool;
4 -import org.slf4j.Logger;
5 -import org.slf4j.LoggerFactory;
6 -
7 import java.nio.ByteBuffer; 4 import java.nio.ByteBuffer;
8 5
9 /** 6 /**
...@@ -11,10 +8,8 @@ import java.nio.ByteBuffer; ...@@ -11,10 +8,8 @@ import java.nio.ByteBuffer;
11 */ 8 */
12 public class KryoSerializer implements StoreSerializer { 9 public class KryoSerializer implements StoreSerializer {
13 10
14 - private final Logger log = LoggerFactory.getLogger(getClass());
15 protected KryoPool serializerPool; 11 protected KryoPool serializerPool;
16 12
17 -
18 public KryoSerializer() { 13 public KryoSerializer() {
19 setupKryoPool(); 14 setupKryoPool();
20 } 15 }
......