Jonathan Hart

Remove "throws IOException" from ClusterCommunicationService APIs

that never throw IOExceptions. These APIs already return boolean to indicate
if sending failed.

Change-Id: I339949fe59f3b8b18a117aabc8d67402dc66c2a3
......@@ -15,12 +15,11 @@
*/
package org.onosproject.store.cluster.messaging;
import java.io.IOException;
import java.util.Set;
import com.google.common.util.concurrent.ListenableFuture;
import org.onosproject.cluster.NodeId;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Set;
// TODO: remove IOExceptions?
/**
......@@ -33,18 +32,16 @@ public interface ClusterCommunicationService {
*
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
* @throws IOException when I/O exception of some sort has occurred
*/
boolean broadcast(ClusterMessage message) throws IOException;
boolean broadcast(ClusterMessage message);
/**
* Broadcast a message to all controller nodes including self.
*
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
* @throws IOException when I/O exception of some sort has occurred
*/
boolean broadcastIncludeSelf(ClusterMessage message) throws IOException;
boolean broadcastIncludeSelf(ClusterMessage message);
/**
* Sends a message to the specified controller node.
......@@ -62,9 +59,8 @@ public interface ClusterCommunicationService {
* @param message message to send
* @param nodeIds recipient node identifiers
* @return true if the message was sent successfully to all nodes in the group; false otherwise.
* @throws IOException when I/O exception of some sort has occurred
*/
boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
boolean multicast(ClusterMessage message, Set<NodeId> nodeIds);
/**
* Sends a message synchronously.
......
......@@ -15,23 +15,16 @@
*/
package org.onosproject.store.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
......@@ -47,12 +40,17 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.LockService;
import org.onosproject.store.service.impl.DistributedLockManager;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Distributed implementation of LeadershipService that is based on the primitives exposed by
......@@ -286,15 +284,11 @@ public class LeadershipManager implements LeadershipService {
public void event(LeadershipEvent event) {
// publish events originating on this host.
if (event.subject().leader().equals(localNodeId)) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
localNodeId,
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast leadership update message", e);
}
clusterCommunicator.broadcast(
new ClusterMessage(
localNodeId,
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
}
}
}
......
......@@ -15,11 +15,7 @@
*/
package org.onosproject.store.cluster.messaging.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.Set;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -31,6 +27,7 @@ import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -42,11 +39,13 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
@Component(immediate = true)
@Service
......@@ -101,7 +100,7 @@ public class ClusterCommunicationManager
}
@Override
public boolean broadcast(ClusterMessage message) throws IOException {
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
for (ControllerNode node : clusterService.getNodes()) {
......@@ -113,7 +112,7 @@ public class ClusterCommunicationManager
}
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) throws IOException {
public boolean broadcastIncludeSelf(ClusterMessage message) {
boolean ok = true;
for (ControllerNode node : clusterService.getNodes()) {
ok = unicastUnchecked(message, node.id()) && ok;
......@@ -122,7 +121,7 @@ public class ClusterCommunicationManager
}
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
for (NodeId nodeId : nodes) {
......@@ -148,7 +147,7 @@ public class ClusterCommunicationManager
}
}
private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) throws IOException {
private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) {
try {
return unicast(message, toNodeId);
} catch (IOException e) {
......
......@@ -263,12 +263,7 @@ public class GossipDeviceStore
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
}
return event;
}
......@@ -376,12 +371,7 @@ public class GossipDeviceStore
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
deviceId, timestamp);
try {
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
deviceId);
}
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
}
return event;
}
......@@ -487,12 +477,7 @@ public class GossipDeviceStore
if (!events.isEmpty()) {
log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
}
return events;
}
......@@ -667,12 +652,7 @@ public class GossipDeviceStore
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
}
return event;
}
......@@ -793,12 +773,7 @@ public class GossipDeviceStore
if (event != null) {
log.debug("Notifying peers of a device removed topology event for deviceId: {}",
deviceId);
try {
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
deviceId);
}
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
}
if (relinquishAtEnd) {
log.debug("Relinquishing temporary role acquired for {}", deviceId);
......@@ -973,7 +948,7 @@ public class GossipDeviceStore
clusterCommunicator.unicast(message, recipient);
}
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
......@@ -981,23 +956,23 @@ public class GossipDeviceStore
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
private void notifyPeers(InternalDeviceEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
private void notifyPeers(InternalDeviceOfflineEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
}
private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
private void notifyPeers(InternalDeviceRemovedEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
private void notifyPeers(InternalPortEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
private void notifyPeers(InternalPortStatusEvent event) {
broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
}
......
......@@ -15,33 +15,13 @@
*/
package org.onosproject.store.host.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
import static org.onlab.util.Tools.namedThreads;
import static org.onlab.util.Tools.minPriority;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -49,6 +29,10 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -79,19 +63,36 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT;
import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.HOST_REMOVED_MSG;
import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.HOST_UPDATED_MSG;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of end-station hosts in distributed data store
......@@ -205,12 +206,7 @@ public class GossipHostStore
if (event != null) {
log.debug("Notifying peers of a host topology event for providerId: "
+ "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
try {
notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a host topology event for providerId: "
+ "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
}
notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
}
return event;
}
......@@ -331,11 +327,7 @@ public class GossipHostStore
HostEvent event = removeHostInternal(hostId, timestamp);
if (event != null) {
log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
try {
notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
} catch (IOException e) {
log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
}
notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
}
return event;
}
......@@ -477,15 +469,15 @@ public class GossipHostStore
}
}
private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
private void notifyPeers(InternalHostRemovedEvent event) {
broadcastMessage(HOST_REMOVED_MSG, event);
}
private void notifyPeers(InternalHostEvent event) throws IOException {
private void notifyPeers(InternalHostEvent event) {
broadcastMessage(HOST_UPDATED_MSG, event);
}
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
......
......@@ -366,26 +366,14 @@ public class EventuallyConsistentMapImpl<K, V>
}
private void notifyPeers(InternalPutEvent event) {
try {
log.debug("sending put {}", event);
broadcastMessage(updateMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
broadcastMessage(updateMessageSubject, event);
}
private void notifyPeers(InternalRemoveEvent event) {
try {
broadcastMessage(removeMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
broadcastMessage(removeMessageSubject, event);
}
private void broadcastMessage(MessageSubject subject, Object event) throws
IOException {
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
......
......@@ -327,25 +327,14 @@ public class GossipIntentStore
}
private void notifyPeers(InternalIntentEvent event) {
try {
broadcastMessage(INTENT_UPDATED_MSG, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
broadcastMessage(INTENT_UPDATED_MSG, event);
}
private void notifyPeers(InternalSetInstallablesEvent event) {
try {
broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
}
private void broadcastMessage(MessageSubject subject, Object event) throws
IOException {
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
......
......@@ -15,22 +15,12 @@
*/
package org.onosproject.store.link.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -70,12 +60,21 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
......@@ -289,13 +288,7 @@ public class GossipLinkStore
log.info("Notifying peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
try {
notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
} catch (IOException e) {
log.debug("Failed to notify peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
}
notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
}
return event;
}
......@@ -432,12 +425,7 @@ public class GossipLinkStore
if (event != null) {
log.info("Notifying peers of a link removed topology event for a link "
+ "between src: {} and dst: {}", src, dst);
try {
notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a link removed topology event for a link "
+ "between src: {} and dst: {}", src, dst);
}
notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
}
return event;
}
......@@ -607,7 +595,7 @@ public class GossipLinkStore
}
}
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
......@@ -623,11 +611,11 @@ public class GossipLinkStore
clusterCommunicator.unicast(message, recipient);
}
private void notifyPeers(InternalLinkEvent event) throws IOException {
private void notifyPeers(InternalLinkEvent event) {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
private void notifyPeers(InternalLinkRemovedEvent event) {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
......
......@@ -16,24 +16,13 @@
package org.onosproject.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.MoreObjects;
import net.jodah.expiringmap.ExpiringMap;
import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
......@@ -44,7 +33,15 @@ import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.namedThreads;
/**
* Plugs into the database update stream and track the TTL of entries added to
......@@ -97,14 +94,10 @@ public class DatabaseEntryExpirationTracker implements
case ROW_DELETED:
map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database row deleted event.", e);
}
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
}
break;
case ROW_ADDED:
......
......@@ -15,23 +15,7 @@
*/
package org.onosproject.store.service.impl;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
......@@ -42,13 +26,13 @@ import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
......@@ -69,10 +53,24 @@ import org.onosproject.store.service.ReadStatus;
import org.onosproject.store.service.VersionedValue;
import org.onosproject.store.service.WriteResult;
import org.onosproject.store.service.WriteStatus;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Strongly consistent and durable state management service based on
......@@ -488,25 +486,21 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
@Override
public void handle(LeaderElectEvent event) {
try {
log.debug("Received LeaderElectEvent: {}", event);
if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
myLeaderEvent = event;
// This node just became the leader.
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} else {
if (myLeaderEvent != null) {
log.debug("This node is no longer the Leader");
}
myLeaderEvent = null;
log.debug("Received LeaderElectEvent: {}", event);
if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
myLeaderEvent = event;
// This node just became the leader.
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} else {
if (myLeaderEvent != null) {
log.debug("This node is no longer the Leader");
}
} catch (IOException e) {
log.error("Failed to broadcast raft leadership change event", e);
myLeaderEvent = null;
}
}
}
......
......@@ -15,25 +15,8 @@
*/
package org.onosproject.store.device.impl;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.cluster.ControllerNode.State.*;
import static org.onosproject.net.DefaultAnnotations.union;
import static java.util.Arrays.asList;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
......@@ -41,6 +24,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
......@@ -68,11 +53,25 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.DefaultAnnotations.union;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.device.DeviceEvent.Type.*;
// TODO add tests for remote replication
......@@ -180,12 +179,8 @@ public class GossipDeviceStoreTest {
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, CID, annotations);
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
} catch (IOException e) {
fail("Should never reach here");
}
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
verify(clusterCommunicator);
......@@ -664,11 +659,7 @@ public class GossipDeviceStoreTest {
bcast.reset();
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
} catch (IOException e) {
fail("Should never reach here");
}
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
replay(clusterCommunicator);
}
......
......@@ -16,7 +16,6 @@
package org.onosproject.store.link.impl;
import com.google.common.collect.Iterables;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
......@@ -24,6 +23,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -49,29 +49,24 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.device.impl.DeviceClockManager;
import org.onlab.packet.IpAddress;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.Link.Type.*;
import static org.onosproject.net.link.LinkEvent.Type.*;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.Link.Type.EDGE;
import static org.onosproject.net.Link.Type.INDIRECT;
import static org.onosproject.net.NetTestTools.assertAnnotationsEquals;
import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
/**
* Test of the GossipLinkStoreTest implementation.
......@@ -169,12 +164,8 @@ public class GossipLinkStoreTest {
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
} catch (IOException e) {
fail("Should never reach here");
}
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
replay(clusterCommunicator);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
verify(clusterCommunicator);
......@@ -192,11 +183,7 @@ public class GossipLinkStoreTest {
bcast.reset();
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
} catch (IOException e) {
fail("Should never reach here");
}
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
replay(clusterCommunicator);
}
......