Yuta HIGUCHI

initial Device Anti-Entropy

Change-Id: I2c9928b3dd195d857815b9d94cbf0f79f26e4435
...@@ -18,10 +18,10 @@ import com.google.common.collect.ImmutableSet; ...@@ -18,10 +18,10 @@ import com.google.common.collect.ImmutableSet;
18 * Suggest to the sender about the more up-to-date data this node has, 18 * Suggest to the sender about the more up-to-date data this node has,
19 * and request for more recent data that the receiver has. 19 * and request for more recent data that the receiver has.
20 */ 20 */
21 -public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { 21 +public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
22 22
23 private final NodeId sender; 23 private final NodeId sender;
24 - private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion; 24 + private final ImmutableMap<ID, V> suggestion;
25 private final ImmutableSet<ID> request; 25 private final ImmutableSet<ID> request;
26 26
27 /** 27 /**
...@@ -32,7 +32,7 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { ...@@ -32,7 +32,7 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
32 * @param request Collection of identifiers 32 * @param request Collection of identifiers
33 */ 33 */
34 public AntiEntropyReply(NodeId sender, 34 public AntiEntropyReply(NodeId sender,
35 - Map<ID, VersionedValue<VALUE>> suggestion, 35 + Map<ID, V> suggestion,
36 Set<ID> request) { 36 Set<ID> request) {
37 super(AE_REPLY); 37 super(AE_REPLY);
38 this.sender = sender; 38 this.sender = sender;
...@@ -44,14 +44,34 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage { ...@@ -44,14 +44,34 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
44 return sender; 44 return sender;
45 } 45 }
46 46
47 - public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() { 47 + /**
48 + * Returns collection of values, which the recipient of this reply is likely
49 + * to be missing or has outdated version.
50 + *
51 + * @return
52 + */
53 + public ImmutableMap<ID, V> suggestion() {
48 return suggestion; 54 return suggestion;
49 } 55 }
50 56
57 + /**
58 + * Returns collection of identifier to request.
59 + *
60 + * @return collection of identifier to request
61 + */
51 public ImmutableSet<ID> request() { 62 public ImmutableSet<ID> request() {
52 return request; 63 return request;
53 } 64 }
54 65
66 + /**
67 + * Checks if reply contains any suggestion or request.
68 + *
69 + * @return true if nothing is suggested and requested
70 + */
71 + public boolean isEmpty() {
72 + return suggestion.isEmpty() && request.isEmpty();
73 + }
74 +
55 // Default constructor for serializer 75 // Default constructor for serializer
56 protected AntiEntropyReply() { 76 protected AntiEntropyReply() {
57 super(AE_REPLY); 77 super(AE_REPLY);
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.Collection;
4 +import java.util.HashMap;
5 +import java.util.Map;
6 +
7 +import org.onlab.onos.cluster.NodeId;
8 +import org.onlab.onos.net.Device;
9 +import org.onlab.onos.net.DeviceId;
10 +import org.onlab.onos.store.Timestamp;
11 +import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
12 +
13 +// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
14 +// TODO: Handle Port as part of these messages, or separate messages for Ports?
15 +
16 +public class DeviceAntiEntropyAdvertisement
17 + extends AntiEntropyAdvertisement<DeviceId> {
18 +
19 +
20 + public DeviceAntiEntropyAdvertisement(NodeId sender,
21 + Map<DeviceId, Timestamp> advertisement) {
22 + super(sender, advertisement);
23 + }
24 +
25 + // May need to add ProviderID, etc.
26 + public static DeviceAntiEntropyAdvertisement create(
27 + NodeId self,
28 + Collection<VersionedValue<Device>> localValues) {
29 +
30 + Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
31 + for (VersionedValue<Device> e : localValues) {
32 + ads.put(e.entity().id(), e.timestamp());
33 + }
34 + return new DeviceAntiEntropyAdvertisement(self, ads);
35 + }
36 +
37 + // For serializer
38 + protected DeviceAntiEntropyAdvertisement() {}
39 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.Collection;
4 +import java.util.HashMap;
5 +import java.util.HashSet;
6 +import java.util.Map;
7 +import java.util.Set;
8 +
9 +import org.onlab.onos.cluster.NodeId;
10 +import org.onlab.onos.net.Device;
11 +import org.onlab.onos.net.DeviceId;
12 +import org.onlab.onos.store.Timestamp;
13 +import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
14 +
15 +import com.google.common.collect.ImmutableMap;
16 +import com.google.common.collect.ImmutableSet;
17 +
18 +public class DeviceAntiEntropyReply
19 + extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
20 +
21 +
22 + public DeviceAntiEntropyReply(NodeId sender,
23 + Map<DeviceId, VersionedValue<Device>> suggestion,
24 + Set<DeviceId> request) {
25 + super(sender, suggestion, request);
26 + }
27 +
28 + /**
29 + * Creates a reply to Anti-Entropy advertisement.
30 + *
31 + * @param advertisement to respond to
32 + * @param self node identifier representing local node
33 + * @param localValues local values held on this node
34 + * @return reply message
35 + */
36 + public static DeviceAntiEntropyReply reply(
37 + DeviceAntiEntropyAdvertisement advertisement,
38 + NodeId self,
39 + Collection<VersionedValue<Device>> localValues
40 + ) {
41 +
42 + ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
43 +
44 + ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
45 + sug = ImmutableMap.builder();
46 +
47 + Set<DeviceId> req = new HashSet<>(ads.keySet());
48 +
49 + for (VersionedValue<Device> e : localValues) {
50 + final DeviceId id = e.entity().id();
51 + final Timestamp local = e.timestamp();
52 + final Timestamp theirs = ads.get(id);
53 + if (theirs == null) {
54 + // they don't have it, suggest
55 + sug.put(id, e);
56 + // don't need theirs
57 + req.remove(id);
58 + } else if (local.compareTo(theirs) < 0) {
59 + // they got older one, suggest
60 + sug.put(id, e);
61 + // don't need theirs
62 + req.remove(id);
63 + } else if (local.equals(theirs)) {
64 + // same, don't need theirs
65 + req.remove(id);
66 + }
67 + }
68 +
69 + return new DeviceAntiEntropyReply(self, sug.build(), req);
70 + }
71 +
72 + /**
73 + * Creates a reply to request for values held locally.
74 + *
75 + * @param requests message containing the request
76 + * @param self node identifier representing local node
77 + * @param localValues local valeds held on this node
78 + * @return reply message
79 + */
80 + public static DeviceAntiEntropyReply reply(
81 + DeviceAntiEntropyReply requests,
82 + NodeId self,
83 + Map<DeviceId, VersionedValue<Device>> localValues
84 + ) {
85 +
86 + Set<DeviceId> reqs = requests.request();
87 +
88 + Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
89 + for (DeviceId id : reqs) {
90 + final VersionedValue<Device> value = localValues.get(id);
91 + if (value != null) {
92 + requested.put(id, value);
93 + }
94 + }
95 +
96 + Set<DeviceId> empty = ImmutableSet.of();
97 + return new DeviceAntiEntropyReply(self, requested, empty);
98 + }
99 +
100 + // For serializer
101 + protected DeviceAntiEntropyReply() {}
102 +}
...@@ -40,6 +40,7 @@ import java.util.Map; ...@@ -40,6 +40,7 @@ import java.util.Map;
40 import java.util.Objects; 40 import java.util.Objects;
41 import java.util.Set; 41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
43 +import java.util.concurrent.ConcurrentMap;
43 44
44 import static com.google.common.base.Preconditions.checkArgument; 45 import static com.google.common.base.Preconditions.checkArgument;
45 import static org.onlab.onos.net.device.DeviceEvent.Type.*; 46 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
...@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore ...@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore
59 60
60 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; 61 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
61 62
62 - private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices; 63 + private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
63 - private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts; 64 + private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
64 65
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClockService clockService; 67 protected ClockService clockService;
......