Ayaka Koshibe

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

1 /** 1 /**
2 * Distributed cluster store and messaging subsystem implementation. 2 * Distributed cluster store and messaging subsystem implementation.
3 */ 3 */
4 -package org.onlab.onos.store.cluster.impl;
...\ No newline at end of file ...\ No newline at end of file
4 +package org.onlab.onos.store.cluster.impl;
......
...@@ -10,6 +10,8 @@ import com.google.common.collect.ImmutableMap; ...@@ -10,6 +10,8 @@ import com.google.common.collect.ImmutableMap;
10 10
11 /** 11 /**
12 * Anti-Entropy advertisement message. 12 * Anti-Entropy advertisement message.
13 + * <p>
14 + * Message to advertise the information this node holds.
13 * 15 *
14 * @param <ID> ID type 16 * @param <ID> ID type
15 */ 17 */
......
...@@ -11,10 +11,17 @@ import org.onlab.onos.store.device.impl.VersionedValue; ...@@ -11,10 +11,17 @@ import org.onlab.onos.store.device.impl.VersionedValue;
11 import com.google.common.collect.ImmutableMap; 11 import com.google.common.collect.ImmutableMap;
12 import com.google.common.collect.ImmutableSet; 12 import com.google.common.collect.ImmutableSet;
13 13
14 -public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { 14 +/**
15 + * Anti-Entropy reply message.
16 + * <p>
17 + * Message to send in reply to advertisement or another reply.
18 + * Suggest to the sender about the more up-to-date data this node has,
19 + * and request for more recent data that the receiver has.
20 + */
21 +public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
15 22
16 private final NodeId sender; 23 private final NodeId sender;
17 - private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion; 24 + private final ImmutableMap<ID, V> suggestion;
18 private final ImmutableSet<ID> request; 25 private final ImmutableSet<ID> request;
19 26
20 /** 27 /**
...@@ -25,7 +32,7 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { ...@@ -25,7 +32,7 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
25 * @param request Collection of identifiers 32 * @param request Collection of identifiers
26 */ 33 */
27 public AntiEntropyReply(NodeId sender, 34 public AntiEntropyReply(NodeId sender,
28 - Map<ID, VersionedValue<VALUE>> suggestion, 35 + Map<ID, V> suggestion,
29 Set<ID> request) { 36 Set<ID> request) {
30 super(AE_REPLY); 37 super(AE_REPLY);
31 this.sender = sender; 38 this.sender = sender;
...@@ -37,14 +44,34 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { ...@@ -37,14 +44,34 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
37 return sender; 44 return sender;
38 } 45 }
39 46
40 - public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() { 47 + /**
48 + * Returns collection of values, which the recipient of this reply is likely
49 + * to be missing or has outdated version.
50 + *
51 + * @return
52 + */
53 + public ImmutableMap<ID, V> suggestion() {
41 return suggestion; 54 return suggestion;
42 } 55 }
43 56
57 + /**
58 + * Returns collection of identifier to request.
59 + *
60 + * @return collection of identifier to request
61 + */
44 public ImmutableSet<ID> request() { 62 public ImmutableSet<ID> request() {
45 return request; 63 return request;
46 } 64 }
47 65
66 + /**
67 + * Checks if reply contains any suggestion or request.
68 + *
69 + * @return true if nothing is suggested and requested
70 + */
71 + public boolean isEmpty() {
72 + return suggestion.isEmpty() && request.isEmpty();
73 + }
74 +
48 // Default constructor for serializer 75 // Default constructor for serializer
49 protected AntiEntropyReply() { 76 protected AntiEntropyReply() {
50 super(AE_REPLY); 77 super(AE_REPLY);
......
1 /** 1 /**
2 * Cluster messaging APIs for the use by the various distributed stores. 2 * Cluster messaging APIs for the use by the various distributed stores.
3 */ 3 */
4 -package org.onlab.onos.store.cluster.messaging;
...\ No newline at end of file ...\ No newline at end of file
4 +package org.onlab.onos.store.cluster.messaging;
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.Collection;
4 +import java.util.HashMap;
5 +import java.util.Map;
6 +
7 +import org.onlab.onos.cluster.NodeId;
8 +import org.onlab.onos.net.Device;
9 +import org.onlab.onos.net.DeviceId;
10 +import org.onlab.onos.store.Timestamp;
11 +import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
12 +
13 +// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
14 +// TODO: Handle Port as part of these messages, or separate messages for Ports?
15 +
16 +public class DeviceAntiEntropyAdvertisement
17 + extends AntiEntropyAdvertisement<DeviceId> {
18 +
19 +
20 + public DeviceAntiEntropyAdvertisement(NodeId sender,
21 + Map<DeviceId, Timestamp> advertisement) {
22 + super(sender, advertisement);
23 + }
24 +
25 + // May need to add ProviderID, etc.
26 + public static DeviceAntiEntropyAdvertisement create(
27 + NodeId self,
28 + Collection<VersionedValue<Device>> localValues) {
29 +
30 + Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
31 + for (VersionedValue<Device> e : localValues) {
32 + ads.put(e.entity().id(), e.timestamp());
33 + }
34 + return new DeviceAntiEntropyAdvertisement(self, ads);
35 + }
36 +
37 + // For serializer
38 + protected DeviceAntiEntropyAdvertisement() {}
39 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.Collection;
4 +import java.util.HashMap;
5 +import java.util.HashSet;
6 +import java.util.Map;
7 +import java.util.Set;
8 +
9 +import org.onlab.onos.cluster.NodeId;
10 +import org.onlab.onos.net.Device;
11 +import org.onlab.onos.net.DeviceId;
12 +import org.onlab.onos.store.Timestamp;
13 +import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
14 +
15 +import com.google.common.collect.ImmutableMap;
16 +import com.google.common.collect.ImmutableSet;
17 +
18 +public class DeviceAntiEntropyReply
19 + extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
20 +
21 +
22 + public DeviceAntiEntropyReply(NodeId sender,
23 + Map<DeviceId, VersionedValue<Device>> suggestion,
24 + Set<DeviceId> request) {
25 + super(sender, suggestion, request);
26 + }
27 +
28 + /**
29 + * Creates a reply to Anti-Entropy advertisement.
30 + *
31 + * @param advertisement to respond to
32 + * @param self node identifier representing local node
33 + * @param localValues local values held on this node
34 + * @return reply message
35 + */
36 + public static DeviceAntiEntropyReply reply(
37 + DeviceAntiEntropyAdvertisement advertisement,
38 + NodeId self,
39 + Collection<VersionedValue<Device>> localValues
40 + ) {
41 +
42 + ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
43 +
44 + ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
45 + sug = ImmutableMap.builder();
46 +
47 + Set<DeviceId> req = new HashSet<>(ads.keySet());
48 +
49 + for (VersionedValue<Device> e : localValues) {
50 + final DeviceId id = e.entity().id();
51 + final Timestamp local = e.timestamp();
52 + final Timestamp theirs = ads.get(id);
53 + if (theirs == null) {
54 + // they don't have it, suggest
55 + sug.put(id, e);
56 + // don't need theirs
57 + req.remove(id);
58 + } else if (local.compareTo(theirs) < 0) {
59 + // they got older one, suggest
60 + sug.put(id, e);
61 + // don't need theirs
62 + req.remove(id);
63 + } else if (local.equals(theirs)) {
64 + // same, don't need theirs
65 + req.remove(id);
66 + }
67 + }
68 +
69 + return new DeviceAntiEntropyReply(self, sug.build(), req);
70 + }
71 +
72 + /**
73 + * Creates a reply to request for values held locally.
74 + *
75 + * @param requests message containing the request
76 + * @param self node identifier representing local node
77 + * @param localValues local valeds held on this node
78 + * @return reply message
79 + */
80 + public static DeviceAntiEntropyReply reply(
81 + DeviceAntiEntropyReply requests,
82 + NodeId self,
83 + Map<DeviceId, VersionedValue<Device>> localValues
84 + ) {
85 +
86 + Set<DeviceId> reqs = requests.request();
87 +
88 + Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
89 + for (DeviceId id : reqs) {
90 + final VersionedValue<Device> value = localValues.get(id);
91 + if (value != null) {
92 + requested.put(id, value);
93 + }
94 + }
95 +
96 + Set<DeviceId> empty = ImmutableSet.of();
97 + return new DeviceAntiEntropyReply(self, requested, empty);
98 + }
99 +
100 + // For serializer
101 + protected DeviceAntiEntropyReply() {}
102 +}
...@@ -40,6 +40,7 @@ import java.util.Map; ...@@ -40,6 +40,7 @@ import java.util.Map;
40 import java.util.Objects; 40 import java.util.Objects;
41 import java.util.Set; 41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
43 +import java.util.concurrent.ConcurrentMap;
43 44
44 import static com.google.common.base.Preconditions.checkArgument; 45 import static com.google.common.base.Preconditions.checkArgument;
45 import static org.onlab.onos.net.device.DeviceEvent.Type.*; 46 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
...@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore ...@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore
59 60
60 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; 61 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
61 62
62 - private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices; 63 + private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
63 - private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts; 64 + private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
64 65
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClockService clockService; 67 protected ClockService clockService;
......
1 package org.onlab.onos.store.device.impl; 1 package org.onlab.onos.store.device.impl;
2 2
3 +import java.util.Objects;
4 +
3 import org.onlab.onos.store.Timestamp; 5 import org.onlab.onos.store.Timestamp;
4 6
5 /** 7 /**
...@@ -44,6 +46,29 @@ public class VersionedValue<T> { ...@@ -44,6 +46,29 @@ public class VersionedValue<T> {
44 } 46 }
45 47
46 48
49 + @Override
50 + public int hashCode() {
51 + return Objects.hash(entity, timestamp, isUp);
52 + }
53 +
54 + @Override
55 + public boolean equals(Object obj) {
56 + if (this == obj) {
57 + return true;
58 + }
59 + if (obj == null) {
60 + return false;
61 + }
62 + if (getClass() != obj.getClass()) {
63 + return false;
64 + }
65 + @SuppressWarnings("unchecked")
66 + VersionedValue<T> that = (VersionedValue<T>) obj;
67 + return Objects.equals(this.entity, that.entity) &&
68 + Objects.equals(this.timestamp, that.timestamp) &&
69 + Objects.equals(this.isUp, that.isUp);
70 + }
71 +
47 // Default constructor for serializer 72 // Default constructor for serializer
48 protected VersionedValue() { 73 protected VersionedValue() {
49 this.entity = null; 74 this.entity = null;
......
1 package org.onlab.onos.store.serializers; 1 package org.onlab.onos.store.serializers;
2 2
3 import java.net.URI; 3 import java.net.URI;
4 +import java.nio.ByteBuffer;
4 import java.util.ArrayList; 5 import java.util.ArrayList;
5 import java.util.HashMap; 6 import java.util.HashMap;
6 7
...@@ -100,4 +101,14 @@ public class KryoSerializationManager implements KryoSerializationService { ...@@ -100,4 +101,14 @@ public class KryoSerializationManager implements KryoSerializationService {
100 return serializerPool.deserialize(bytes); 101 return serializerPool.deserialize(bytes);
101 } 102 }
102 103
104 + @Override
105 + public void serialize(Object obj, ByteBuffer buffer) {
106 + serializerPool.serialize(obj, buffer);
107 + }
108 +
109 + @Override
110 + public <T> T deserialize(ByteBuffer buffer) {
111 + return serializerPool.deserialize(buffer);
112 + }
113 +
103 } 114 }
......
1 package org.onlab.onos.store.serializers; 1 package org.onlab.onos.store.serializers;
2 2
3 +import java.nio.ByteBuffer;
4 +
3 // TODO: To be replaced with SerializationService from IOLoop activity 5 // TODO: To be replaced with SerializationService from IOLoop activity
4 /** 6 /**
5 * Service to serialize Objects into byte array. 7 * Service to serialize Objects into byte array.
...@@ -16,6 +18,15 @@ public interface KryoSerializationService { ...@@ -16,6 +18,15 @@ public interface KryoSerializationService {
16 public byte[] serialize(final Object obj); 18 public byte[] serialize(final Object obj);
17 19
18 /** 20 /**
21 + * Serializes the specified object into bytes using one of the
22 + * pre-registered serializers.
23 + *
24 + * @param obj object to be serialized
25 + * @param buffer to write serialized bytes
26 + */
27 + public void serialize(final Object obj, ByteBuffer buffer);
28 +
29 + /**
19 * Deserializes the specified bytes into an object using one of the 30 * Deserializes the specified bytes into an object using one of the
20 * pre-registered serializers. 31 * pre-registered serializers.
21 * 32 *
...@@ -24,4 +35,12 @@ public interface KryoSerializationService { ...@@ -24,4 +35,12 @@ public interface KryoSerializationService {
24 */ 35 */
25 public <T> T deserialize(final byte[] bytes); 36 public <T> T deserialize(final byte[] bytes);
26 37
38 + /**
39 + * Deserializes the specified bytes into an object using one of the
40 + * pre-registered serializers.
41 + *
42 + * @param buffer bytes to be deserialized
43 + * @return deserialized object
44 + */
45 + public <T> T deserialize(final ByteBuffer buffer);
27 } 46 }
......