Yuta HIGUCHI
Committed by Gerrit Code Review

GossipHostStore: allow location change + update

- Actively sync with peer on anti-entropy message
  to improve convergence speed
- Timestamp not only location
- Refresh timestamp on delta update

Might fix ONOS-436

Change-Id: I271f9af04b87d78124d055e79b93413deaf1fa3c
...@@ -15,13 +15,16 @@ ...@@ -15,13 +15,16 @@
15 */ 15 */
16 package org.onosproject.store.host.impl; 16 package org.onosproject.store.host.impl;
17 17
18 +import static com.google.common.base.Preconditions.checkNotNull;
19 +import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
20 +import static com.google.common.collect.Multimaps.newSetMultimap;
21 +import static com.google.common.collect.Sets.newConcurrentHashSet;
18 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 22 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId; 23 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
20 import static org.onosproject.net.DefaultAnnotations.merge; 24 import static org.onosproject.net.DefaultAnnotations.merge;
21 import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; 25 import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
22 -import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
23 import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; 26 import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
24 -import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED; 27 +import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
25 import static org.onlab.util.Tools.namedThreads; 28 import static org.onlab.util.Tools.namedThreads;
26 import static org.onlab.util.Tools.minPriority; 29 import static org.onlab.util.Tools.minPriority;
27 import static org.slf4j.LoggerFactory.getLogger; 30 import static org.slf4j.LoggerFactory.getLogger;
...@@ -50,6 +53,7 @@ import org.onosproject.cluster.ClusterService; ...@@ -50,6 +53,7 @@ import org.onosproject.cluster.ClusterService;
50 import org.onosproject.cluster.ControllerNode; 53 import org.onosproject.cluster.ControllerNode;
51 import org.onosproject.cluster.NodeId; 54 import org.onosproject.cluster.NodeId;
52 import org.onosproject.net.Annotations; 55 import org.onosproject.net.Annotations;
56 +import org.onosproject.net.AnnotationsUtil;
53 import org.onosproject.net.ConnectPoint; 57 import org.onosproject.net.ConnectPoint;
54 import org.onosproject.net.DefaultAnnotations; 58 import org.onosproject.net.DefaultAnnotations;
55 import org.onosproject.net.DefaultHost; 59 import org.onosproject.net.DefaultHost;
...@@ -61,6 +65,7 @@ import org.onosproject.net.host.DefaultHostDescription; ...@@ -61,6 +65,7 @@ import org.onosproject.net.host.DefaultHostDescription;
61 import org.onosproject.net.host.HostClockService; 65 import org.onosproject.net.host.HostClockService;
62 import org.onosproject.net.host.HostDescription; 66 import org.onosproject.net.host.HostDescription;
63 import org.onosproject.net.host.HostEvent; 67 import org.onosproject.net.host.HostEvent;
68 +import org.onosproject.net.host.HostEvent.Type;
64 import org.onosproject.net.host.HostStore; 69 import org.onosproject.net.host.HostStore;
65 import org.onosproject.net.host.HostStoreDelegate; 70 import org.onosproject.net.host.HostStoreDelegate;
66 import org.onosproject.net.host.PortAddresses; 71 import org.onosproject.net.host.PortAddresses;
...@@ -109,7 +114,9 @@ public class GossipHostStore ...@@ -109,7 +114,9 @@ public class GossipHostStore
109 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16); 114 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
110 115
111 // Hosts tracked by their location 116 // Hosts tracked by their location
112 - private final Multimap<ConnectPoint, Host> locations = HashMultimap.create(); 117 + private final Multimap<ConnectPoint, Host> locations
118 + = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
119 + () -> newConcurrentHashSet()));
113 120
114 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses = 121 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
115 Multimaps.synchronizedSetMultimap( 122 Multimaps.synchronizedSetMultimap(
...@@ -142,16 +149,20 @@ public class GossipHostStore ...@@ -142,16 +149,20 @@ public class GossipHostStore
142 149
143 private ScheduledExecutorService backgroundExecutor; 150 private ScheduledExecutorService backgroundExecutor;
144 151
152 + // TODO: Make these anti-entropy params configurable
153 + private long initialDelaySec = 5;
154 + private long periodSec = 5;
155 +
145 @Activate 156 @Activate
146 public void activate() { 157 public void activate() {
147 clusterCommunicator.addSubscriber( 158 clusterCommunicator.addSubscriber(
148 - GossipHostStoreMessageSubjects.HOST_UPDATED, 159 + HOST_UPDATED_MSG,
149 new InternalHostEventListener()); 160 new InternalHostEventListener());
150 clusterCommunicator.addSubscriber( 161 clusterCommunicator.addSubscriber(
151 - GossipHostStoreMessageSubjects.HOST_REMOVED, 162 + HOST_REMOVED_MSG,
152 new InternalHostRemovedEventListener()); 163 new InternalHostRemovedEventListener());
153 clusterCommunicator.addSubscriber( 164 clusterCommunicator.addSubscriber(
154 - GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, 165 + HOST_ANTI_ENTROPY_ADVERTISEMENT,
155 new InternalHostAntiEntropyAdvertisementListener()); 166 new InternalHostAntiEntropyAdvertisementListener());
156 167
157 executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d")); 168 executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
...@@ -159,9 +170,6 @@ public class GossipHostStore ...@@ -159,9 +170,6 @@ public class GossipHostStore
159 backgroundExecutor = 170 backgroundExecutor =
160 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d"))); 171 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
161 172
162 - // TODO: Make these configurable
163 - long initialDelaySec = 5;
164 - long periodSec = 5;
165 // start anti-entropy thread 173 // start anti-entropy thread
166 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
167 initialDelaySec, periodSec, TimeUnit.SECONDS); 175 initialDelaySec, periodSec, TimeUnit.SECONDS);
...@@ -209,66 +217,112 @@ public class GossipHostStore ...@@ -209,66 +217,112 @@ public class GossipHostStore
209 217
210 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId, 218 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
211 HostDescription hostDescription, Timestamp timestamp) { 219 HostDescription hostDescription, Timestamp timestamp) {
220 + // If this host was previously removed, first ensure
221 + // this new request is "newer"
222 + if (isHostRemoved(hostId, timestamp)) {
223 + log.debug("Ignoring update for removed host {}@{}",
224 + hostDescription, timestamp);
225 + return null;
226 + }
212 StoredHost host = hosts.get(hostId); 227 StoredHost host = hosts.get(hostId);
213 if (host == null) { 228 if (host == null) {
214 return createHost(providerId, hostId, hostDescription, timestamp); 229 return createHost(providerId, hostId, hostDescription, timestamp);
215 } 230 }
216 - return updateHost(providerId, host, hostDescription, timestamp); 231 + return updateHost(providerId, hostId, host, hostDescription, timestamp);
232 + }
233 +
234 + /**
235 + * @param hostId host identifier
236 + * @param timestamp timstamp to compare with
237 + * @return true if given timestamp is more recent timestamp compared to
238 + * the timestamp Host was removed.
239 + */
240 + private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
241 + Timestamped<Host> removedInfo = removedHosts.get(hostId);
242 + if (removedInfo != null) {
243 + if (removedInfo.isNewer(timestamp)) {
244 + return true;
245 + }
246 + removedHosts.remove(hostId, removedInfo);
247 + }
248 + return false;
217 } 249 }
218 250
219 // creates a new host and sends HOST_ADDED 251 // creates a new host and sends HOST_ADDED
220 private HostEvent createHost(ProviderId providerId, HostId hostId, 252 private HostEvent createHost(ProviderId providerId, HostId hostId,
221 HostDescription descr, Timestamp timestamp) { 253 HostDescription descr, Timestamp timestamp) {
222 synchronized (this) { 254 synchronized (this) {
223 - // If this host was previously removed, first ensure 255 + StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
224 - // this new request is "newer"
225 - if (removedHosts.containsKey(hostId)) {
226 - if (removedHosts.get(hostId).isNewer(timestamp)) {
227 - return null;
228 - } else {
229 - removedHosts.remove(hostId);
230 - }
231 - }
232 - StoredHost newhost = new StoredHost(providerId, hostId,
233 descr.hwAddress(), 256 descr.hwAddress(),
234 descr.vlan(), 257 descr.vlan(),
235 - new Timestamped<>(descr.location(), timestamp), 258 + descr.location(),
236 ImmutableSet.copyOf(descr.ipAddress())); 259 ImmutableSet.copyOf(descr.ipAddress()));
237 - hosts.put(hostId, newhost); 260 + StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
261 + if (concAdd != null) {
262 + // concurrent add detected, retry from start
263 + return updateHost(providerId, hostId, concAdd, descr, timestamp);
264 + }
238 locations.put(descr.location(), newhost); 265 locations.put(descr.location(), newhost);
239 return new HostEvent(HOST_ADDED, newhost); 266 return new HostEvent(HOST_ADDED, newhost);
240 } 267 }
241 } 268 }
242 269
243 // checks for type of update to host, sends appropriate event 270 // checks for type of update to host, sends appropriate event
244 - private HostEvent updateHost(ProviderId providerId, StoredHost host, 271 + private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
245 HostDescription descr, Timestamp timestamp) { 272 HostDescription descr, Timestamp timestamp) {
246 - HostEvent event;
247 - if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
248 - host.setLocation(new Timestamped<>(descr.location(), timestamp));
249 - return new HostEvent(HOST_MOVED, host);
250 - }
251 273
252 - if (host.ipAddresses().containsAll(descr.ipAddress()) && 274 + if (timestamp.compareTo(oldHost.timestamp()) < 0) {
253 - descr.annotations().keys().isEmpty()) { 275 + // new timestamp is older
276 + log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
254 return null; 277 return null;
255 } 278 }
256 279
257 - Set<IpAddress> addresses = new HashSet<>(host.ipAddresses()); 280 + final boolean hostMoved = !oldHost.location().equals(descr.location());
281 + if (hostMoved ||
282 + !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
283 + !descr.annotations().keys().isEmpty()) {
284 +
285 + Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
258 addresses.addAll(descr.ipAddress()); 286 addresses.addAll(descr.ipAddress());
259 - Annotations annotations = merge((DefaultAnnotations) host.annotations(), 287 + Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
260 descr.annotations()); 288 descr.annotations());
261 - StoredHost updated = new StoredHost(providerId, host.id(), 289 +
262 - host.mac(), host.vlan(), 290 + Timestamp newTimestamp = timestamp;
263 - host.location, addresses, 291 + // if merged Set/Annotation differ from description...
292 + final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
293 + !AnnotationsUtil.isEqual(descr.annotations(), annotations);
294 + if (deltaUpdate) {
295 + // ..then local existing info had something description didn't
296 + newTimestamp = hostClockService.getTimestamp(hostId);
297 + log.debug("delta update detected on {}, substepping timestamp to {}",
298 + hostId, newTimestamp);
299 + }
300 +
301 + StoredHost updated = new StoredHost(newTimestamp,
302 + providerId, oldHost.id(),
303 + oldHost.mac(), oldHost.vlan(),
304 + descr.location(),
305 + addresses,
264 annotations); 306 annotations);
265 - event = new HostEvent(HOST_UPDATED, updated);
266 synchronized (this) { 307 synchronized (this) {
267 - hosts.put(host.id(), updated); 308 + boolean replaced = hosts.replace(hostId, oldHost, updated);
268 - locations.remove(host.location(), host); 309 + if (!replaced) {
310 + // concurrent update, retry
311 + return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
312 + }
313 + locations.remove(oldHost.location(), oldHost);
269 locations.put(updated.location(), updated); 314 locations.put(updated.location(), updated);
315 +
316 + HostEvent.Type eventType;
317 + if (hostMoved) {
318 + eventType = Type.HOST_MOVED;
319 + } else {
320 + eventType = Type.HOST_UPDATED;
270 } 321 }
271 - return event; 322 + return new HostEvent(eventType, updated);
323 + }
324 + }
325 + return null;
272 } 326 }
273 327
274 @Override 328 @Override
...@@ -397,9 +451,8 @@ public class GossipHostStore ...@@ -397,9 +451,8 @@ public class GossipHostStore
397 } 451 }
398 } 452 }
399 453
400 - // Auxiliary extension to allow location to mutate.
401 private static final class StoredHost extends DefaultHost { 454 private static final class StoredHost extends DefaultHost {
402 - private Timestamped<HostLocation> location; 455 + private final Timestamp timestamp;
403 456
404 /** 457 /**
405 * Creates an end-station host using the supplied information. 458 * Creates an end-station host using the supplied information.
...@@ -412,33 +465,24 @@ public class GossipHostStore ...@@ -412,33 +465,24 @@ public class GossipHostStore
412 * @param ips host IP addresses 465 * @param ips host IP addresses
413 * @param annotations optional key/value annotations 466 * @param annotations optional key/value annotations
414 */ 467 */
415 - public StoredHost(ProviderId providerId, HostId id, 468 + public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
416 - MacAddress mac, VlanId vlan, Timestamped<HostLocation> location, 469 + MacAddress mac, VlanId vlan, HostLocation location,
417 Set<IpAddress> ips, Annotations... annotations) { 470 Set<IpAddress> ips, Annotations... annotations) {
418 - super(providerId, id, mac, vlan, location.value(), ips, annotations); 471 + super(providerId, id, mac, vlan, location, ips, annotations);
419 - this.location = location; 472 + this.timestamp = checkNotNull(timestamp);
420 - }
421 -
422 - void setLocation(Timestamped<HostLocation> location) {
423 - this.location = location;
424 - }
425 -
426 - @Override
427 - public HostLocation location() {
428 - return location.value();
429 } 473 }
430 474
431 public Timestamp timestamp() { 475 public Timestamp timestamp() {
432 - return location.timestamp(); 476 + return timestamp;
433 } 477 }
434 } 478 }
435 479
436 private void notifyPeers(InternalHostRemovedEvent event) throws IOException { 480 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
437 - broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event); 481 + broadcastMessage(HOST_REMOVED_MSG, event);
438 } 482 }
439 483
440 private void notifyPeers(InternalHostEvent event) throws IOException { 484 private void notifyPeers(InternalHostEvent event) throws IOException {
441 - broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event); 485 + broadcastMessage(HOST_UPDATED_MSG, event);
442 } 486 }
443 487
444 private void broadcastMessage(MessageSubject subject, Object event) throws IOException { 488 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
...@@ -556,7 +600,7 @@ public class GossipHostStore ...@@ -556,7 +600,7 @@ public class GossipHostStore
556 } 600 }
557 601
558 try { 602 try {
559 - unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad); 603 + unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
560 } catch (IOException e) { 604 } catch (IOException e) {
561 log.debug("Failed to send anti-entropy advertisement to {}", peer); 605 log.debug("Failed to send anti-entropy advertisement to {}", peer);
562 return; 606 return;
...@@ -613,7 +657,7 @@ public class GossipHostStore ...@@ -613,7 +657,7 @@ public class GossipHostStore
613 localHost.location(), 657 localHost.location(),
614 localHost.ipAddresses()); 658 localHost.ipAddresses());
615 try { 659 try {
616 - unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED, 660 + unicastMessage(sender, HOST_UPDATED_MSG,
617 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp())); 661 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
618 } catch (IOException e1) { 662 } catch (IOException e1) {
619 log.debug("Failed to send advertisement response", e1); 663 log.debug("Failed to send advertisement response", e1);
...@@ -642,7 +686,7 @@ public class GossipHostStore ...@@ -642,7 +686,7 @@ public class GossipHostStore
642 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) { 686 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
643 // sender has zombie, push 687 // sender has zombie, push
644 try { 688 try {
645 - unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED, 689 + unicastMessage(sender, HOST_REMOVED_MSG,
646 new InternalHostRemovedEvent(hostId, localDeadTimestamp)); 690 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
647 } catch (IOException e1) { 691 } catch (IOException e1) {
648 log.debug("Failed to send advertisement response", e1); 692 log.debug("Failed to send advertisement response", e1);
...@@ -650,7 +694,6 @@ public class GossipHostStore ...@@ -650,7 +694,6 @@ public class GossipHostStore
650 } 694 }
651 } 695 }
652 696
653 -
654 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) { 697 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
655 // for each remote tombstone advertisement... 698 // for each remote tombstone advertisement...
656 final HostId hostId = e.getKey(); 699 final HostId hostId = e.getKey();
...@@ -665,6 +708,19 @@ public class GossipHostStore ...@@ -665,6 +708,19 @@ public class GossipHostStore
665 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp)); 708 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
666 } 709 }
667 } 710 }
711 +
712 + // if remote ad has something unknown, actively sync
713 + for (HostFragmentId key : ad.timestamps().keySet()) {
714 + if (!hosts.containsKey(key.hostId())) {
715 + HostAntiEntropyAdvertisement myAd = createAdvertisement();
716 + try {
717 + unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
718 + break;
719 + } catch (IOException e) {
720 + log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
721 + }
722 + }
723 + }
668 } 724 }
669 725
670 private final class InternalHostAntiEntropyAdvertisementListener 726 private final class InternalHostAntiEntropyAdvertisementListener
......
...@@ -20,9 +20,9 @@ import org.onosproject.store.cluster.messaging.MessageSubject; ...@@ -20,9 +20,9 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
20 public final class GossipHostStoreMessageSubjects { 20 public final class GossipHostStoreMessageSubjects {
21 private GossipHostStoreMessageSubjects() {} 21 private GossipHostStoreMessageSubjects() {}
22 22
23 - public static final MessageSubject HOST_UPDATED 23 + public static final MessageSubject HOST_UPDATED_MSG
24 = new MessageSubject("peer-host-updated"); 24 = new MessageSubject("peer-host-updated");
25 - public static final MessageSubject HOST_REMOVED 25 + public static final MessageSubject HOST_REMOVED_MSG
26 = new MessageSubject("peer-host-removed"); 26 = new MessageSubject("peer-host-removed");
27 public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT 27 public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
28 = new MessageSubject("host-enti-entropy-advertisement");; 28 = new MessageSubject("host-enti-entropy-advertisement");;
......