Jonathan Hart
Committed by Gerrit Code Review

Move GossipDeviceStore away from deprecated ClusterCommunicationService API

Change-Id: Ib0ca7125e17013156aac27f8437ca717a96a56f0
......@@ -15,26 +15,10 @@
*/
package org.onosproject.store.device.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -81,8 +65,6 @@ import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -96,10 +78,26 @@ import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
......@@ -117,8 +115,13 @@ import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_UPDATE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_UPDATE;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -208,29 +211,15 @@ public class GossipDeviceStore
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
new InternalDeviceOfflineEventListener(),
executor);
clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
new InternalRemoveRequestListener(),
executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
new InternalDeviceAdvertisementListener(),
backgroundExecutor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
addSubscriber(DEVICE_OFFLINE, this::handleDeviceOfflineEvent);
addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
addSubscriber(PORT_UPDATE, this::handlePortEvent);
addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);
addSubscriber(DEVICE_INJECTED, this::handleDeviceInjectedEvent);
addSubscriber(PORT_INJECTED, this::handlePortInjectedEvent);
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
......@@ -261,6 +250,10 @@ public class GossipDeviceStore
log.info("Started");
}
private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
}
@Deactivate
public void deactivate() {
devicePortStats.removeListener(portStatsListener);
......@@ -281,24 +274,15 @@ public class GossipDeviceStore
devices.clear();
devicePorts.clear();
availableDevices.clear();
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_INJECTED);
clusterCommunicator.removeSubscriber(
GossipDeviceStoreMessageSubjects.PORT_INJECTED);
clusterCommunicator.removeSubscriber(DEVICE_UPDATE);
clusterCommunicator.removeSubscriber(DEVICE_OFFLINE);
clusterCommunicator.removeSubscriber(DEVICE_REMOVE_REQ);
clusterCommunicator.removeSubscriber(DEVICE_REMOVED);
clusterCommunicator.removeSubscriber(PORT_UPDATE);
clusterCommunicator.removeSubscriber(PORT_STATUS_UPDATE);
clusterCommunicator.removeSubscriber(DEVICE_ADVERTISE);
clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
clusterCommunicator.removeSubscriber(PORT_INJECTED);
log.info("Stopped");
}
......@@ -1336,7 +1320,7 @@ public class GossipDeviceStore
}
private void notifyPeers(InternalDeviceEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
broadcastMessage(DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) {
......@@ -1357,7 +1341,7 @@ public class GossipDeviceStore
private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
unicastMessage(recipient, DEVICE_UPDATE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
......@@ -1631,13 +1615,7 @@ public class GossipDeviceStore
}
}
private final class InternalDeviceEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = SERIALIZER.decode(message.payload());
private void handleDeviceEvent(InternalDeviceEvent event) {
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
......@@ -1649,15 +1627,8 @@ public class GossipDeviceStore
log.warn("Exception thrown handling device update", e);
}
}
}
private final class InternalDeviceOfflineEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device offline event from peer: {}", message.sender());
InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
private void handleDeviceOfflineEvent(InternalDeviceOfflineEvent event) {
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
......@@ -1667,30 +1638,16 @@ public class GossipDeviceStore
log.warn("Exception thrown handling device offline", e);
}
}
}
private final class InternalRemoveRequestListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device remove request from peer: {}", message.sender());
DeviceId did = SERIALIZER.decode(message.payload());
private void handleRemoveRequest(DeviceId did) {
try {
removeDevice(did);
} catch (Exception e) {
log.warn("Exception thrown handling device remove", e);
}
}
}
private final class InternalDeviceRemovedEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device removed event from peer: {}", message.sender());
InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
private void handleDeviceRemovedEvent(InternalDeviceRemovedEvent event) {
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
......@@ -1700,16 +1657,8 @@ public class GossipDeviceStore
log.warn("Exception thrown handling device removed", e);
}
}
}
private final class InternalPortEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port update event from peer: {}", message.sender());
InternalPortEvent event = SERIALIZER.decode(message.payload());
private void handlePortEvent(InternalPortEvent event) {
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
......@@ -1726,16 +1675,8 @@ public class GossipDeviceStore
log.warn("Exception thrown handling port update", e);
}
}
}
private final class InternalPortStatusEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
private void handlePortStatusEvent(InternalPortStatusEvent event) {
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<PortDescription> portDescription = event.portDescription();
......@@ -1752,29 +1693,16 @@ public class GossipDeviceStore
log.warn("Exception thrown handling port update", e);
}
}
}
private final class InternalDeviceAdvertisementListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
private void handleDeviceAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
try {
handleAdvertisement(advertisement);
} catch (Exception e) {
log.warn("Exception thrown handling Device advertisements.", e);
}
}
}
private final class DeviceInjectedEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received injected device event from peer: {}", message.sender());
DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
private void handleDeviceInjectedEvent(DeviceInjectedEvent event) {
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
DeviceDescription deviceDescription = event.deviceDescription();
......@@ -1790,15 +1718,8 @@ public class GossipDeviceStore
log.warn("Exception thrown handling device injected event.", e);
}
}
}
private final class PortInjectedEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received injected port event from peer: {}", message.sender());
PortInjectedEvent event = SERIALIZER.decode(message.payload());
private void handlePortInjectedEvent(PortInjectedEvent event) {
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
List<PortDescription> portDescriptions = event.portDescriptions();
......@@ -1814,7 +1735,6 @@ public class GossipDeviceStore
log.warn("Exception thrown handling port injected event.", e);
}
}
}
private class InternalPortStatsListener
implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
......
......@@ -17,7 +17,6 @@ package org.onosproject.store.device.impl;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
......@@ -55,12 +54,12 @@ import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.StorageService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
......@@ -70,20 +69,37 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.DefaultAnnotations.union;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_REMOVED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
// TODO add tests for remote replication
......@@ -157,9 +173,6 @@ public class GossipDeviceStoreTest {
@Before
public void setUp() throws Exception {
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
......