Madan Jampani
Committed by Gerrit Code Review

Code clean up: Removed unused code. Fixed comments. Renamed some files.

Change-Id: I78ca1f4a973c3b5356f749680ebe0f4ccde01279
Showing 37 changed files with 78 additions and 2765 deletions
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cfg;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigEvent;
import org.onosproject.cfg.ComponentConfigStore;
import org.onosproject.cfg.ComponentConfigStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of component configurations in a distributed data store
* that uses optimistic replication and gossip based anti-entropy techniques.
*/
@Component(immediate = true, enabled = false)
@Service
public class GossipComponentConfigStore
extends AbstractStore<ComponentConfigEvent, ComponentConfigStoreDelegate>
implements ComponentConfigStore {
private static final String SEP = "#";
private final Logger log = getLogger(getClass());
private EventuallyConsistentMap<String, String> properties;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
properties = storageService.<String, String>eventuallyConsistentMapBuilder()
.withName("cfg")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
properties.addListener(new InternalPropertiesListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
properties.destroy();
log.info("Stopped");
}
@Override
public void setProperty(String componentName, String name, String value) {
properties.put(key(componentName, name), value);
}
@Override
public void unsetProperty(String componentName, String name) {
properties.remove(key(componentName, name));
}
/**
* Listener to component configuration properties distributed map changes.
*/
private final class InternalPropertiesListener
implements EventuallyConsistentMapListener<String, String> {
@Override
public void event(EventuallyConsistentMapEvent<String, String> event) {
String[] keys = event.key().split(SEP);
String value = event.value();
if (event.type() == PUT) {
delegate.notify(new ComponentConfigEvent(PROPERTY_SET, keys[0], keys[1], value));
} else if (event.type() == REMOVE) {
delegate.notify(new ComponentConfigEvent(PROPERTY_UNSET, keys[0], keys[1], null));
}
}
}
// Generates a key from component name and property name.
private String key(String componentName, String name) {
return componentName + SEP + name;
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import org.onosproject.store.cluster.messaging.MessageSubject;
//Not used right now
public final class ClusterManagementMessageSubjects {
// avoid instantiation
private ClusterManagementMessageSubjects() {}
public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import org.onosproject.cluster.ControllerNode;
//Not used right now
/**
* Contains information that will be published when a cluster membership event occurs.
*/
public class ClusterMembershipEvent {
private final ClusterMembershipEventType type;
private final ControllerNode node;
public ClusterMembershipEvent(ClusterMembershipEventType type, ControllerNode node) {
this.type = type;
this.node = node;
}
public ClusterMembershipEventType type() {
return type;
}
public ControllerNode node() {
return node;
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
//Not used right now
public enum ClusterMembershipEventType {
NEW_MEMBER,
LEAVING_MEMBER,
UNREACHABLE_MEMBER,
HEART_BEAT,
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
// Not used right now
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
*/
public interface ClusterNodesDelegate {
/**
* Notifies about cluster node coming online.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return the controller node
*/
DefaultControllerNode nodeDetected(NodeId nodeId, IpAddress ip,
int tcpPort);
/**
* Notifies about cluster node going offline.
*
* @param nodeId identifier of the cluster node that vanished
*/
void nodeVanished(NodeId nodeId);
/**
* Notifies about remote request to remove node from cluster.
*
* @param nodeId identifier of the cluster node that was removed
*/
void nodeRemoved(NodeId nodeId);
}
......@@ -17,10 +17,9 @@ package org.onosproject.store.cluster.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -29,39 +28,28 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
* Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
* primitive.
*/
@Service
@Component(immediate = true, enabled = false)
@Component(immediate = true, enabled = true)
public class DistributedLeadershipStore
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
private static final Logger log = getLogger(DistributedLeadershipStore.class);
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -69,20 +57,15 @@ public class DistributedLeadershipStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected NodeId localNodeId;
protected ConsistentMap<String, InternalLeadership> leadershipMap;
protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();
private NodeId localNodeId;
private LeaderElector leaderElector;
private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
event -> {
Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
boolean leaderChanged =
!Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
boolean candidatesChanged =
!Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
ImmutableSet.<NodeId>of() : oldValue.candidates()),
Sets.newHashSet(newValue.candidates())).isEmpty();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
Leadership oldValue = change.oldValue();
Leadership newValue = change.newValue();
boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
......@@ -93,193 +76,58 @@ public class DistributedLeadershipStore
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
leadershipCache.compute(event.key(), (k, v) -> {
if (v == null || v.version() < event.newValue().version()) {
return event.newValue();
}
return v;
});
notifyDelegate(new LeadershipEvent(eventType, newValue));
notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
.withName("onos-leadership")
.withPartitionsDisabled()
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
.build();
leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
leadershipMap.addListener(leadershipChangeListener);
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leadershipMap.removeListener(leadershipChangeListener);
leaderElector.removeChangeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v == null || !v.candidates().contains(localNodeId),
(k, v) -> {
if (v == null || v.candidates().isEmpty()) {
return new InternalLeadership(topic,
localNodeId,
v == null ? 1 : v.term() + 1,
System.currentTimeMillis(),
ImmutableList.of(localNodeId));
}
List<NodeId> newCandidates = new ArrayList<>(v.candidates());
newCandidates.add(localNodeId);
return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
});
return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
return leaderElector.run(topic, localNodeId);
}
@Override
public void removeRegistration(String topic) {
removeRegistration(topic, localNodeId);
}
private void removeRegistration(String topic, NodeId nodeId) {
leadershipMap.computeIf(topic,
v -> v != null && v.candidates().contains(nodeId),
(k, v) -> {
List<NodeId> newCandidates = v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList());
NodeId newLeader = nodeId.equals(v.leader()) ?
newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.term() : v.term() + 1;
long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.termStartTime() : System.currentTimeMillis();
return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
});
leaderElector.withdraw(topic);
}
@Override
public void removeRegistration(NodeId nodeId) {
leadershipMap.entrySet()
.stream()
.filter(e -> e.getValue().value().candidates().contains(nodeId))
.map(e -> e.getKey())
.forEach(topic -> this.removeRegistration(topic, nodeId));
leaderElector.evict(nodeId);
}
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(toNodeId) &&
!Objects.equal(v.leader(), toNodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(toNodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !toNodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
toNodeId,
v.term() + 1,
System.currentTimeMillis(),
newCandidates);
});
return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
return leaderElector.anoint(topic, toNodeId);
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(nodeId) &&
!v.candidates().get(0).equals(nodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(nodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
v.leader(),
v.term(),
System.currentTimeMillis(),
newCandidates);
});
return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
return leaderElector.promote(topic, nodeId);
}
@Override
public Leadership getLeadership(String topic) {
InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipMap.get(topic));
return internalLeadership == null ? null : internalLeadership.asLeadership();
return leaderElector.getLeadership(topic);
}
@Override
public Map<String, Leadership> getLeaderships() {
return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
}
private static class InternalLeadership {
private final String topic;
private final NodeId leader;
private final long term;
private final long termStartTime;
private final List<NodeId> candidates;
public InternalLeadership(String topic,
NodeId leader,
long term,
long termStartTime,
List<NodeId> candidates) {
this.topic = topic;
this.leader = leader;
this.term = term;
this.termStartTime = termStartTime;
this.candidates = ImmutableList.copyOf(candidates);
}
public NodeId leader() {
return this.leader;
}
public long term() {
return term;
}
public long termStartTime() {
return termStartTime;
}
public List<NodeId> candidates() {
return candidates;
}
public Leadership asLeadership() {
return new Leadership(topic, leader == null ?
null : new Leader(leader, term, termStartTime), candidates);
}
public static Leadership toLeadership(InternalLeadership internalLeadership) {
return internalLeadership == null ? null : internalLeadership.asLeadership();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("leader", leader)
.add("term", term)
.add("termStartTime", termStartTime)
.add("candidates", candidates)
.toString();
}
return leaderElector.getLeaderships();
}
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
/**
* Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
* primitive.
*/
@Service
@Component(immediate = true, enabled = true)
public class NewDistributedLeadershipStore
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private NodeId localNodeId;
private LeaderElector leaderElector;
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
Leadership oldValue = change.oldValue();
Leadership newValue = change.newValue();
boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
}
if (leaderChanged && !candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_CHANGED;
}
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
return leaderElector.run(topic, localNodeId);
}
@Override
public void removeRegistration(String topic) {
leaderElector.withdraw(topic);
}
@Override
public void removeRegistration(NodeId nodeId) {
leaderElector.evict(nodeId);
}
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
return leaderElector.anoint(topic, toNodeId);
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
return leaderElector.promote(topic, nodeId);
}
@Override
public Leadership getLeadership(String topic) {
return leaderElector.getLeadership(topic);
}
@Override
public Map<String, Leadership> getLeaderships() {
return leaderElector.getLeaderships();
}
}
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
import org.onosproject.cluster.ControllerNode;
/**
* Node info read from configuration files during bootstrap.
*/
public final class NodeInfo {
private final String id;
private final String ip;
private final int tcpPort;
private NodeInfo(String id, String ip, int port) {
this.id = id;
this.ip = ip;
this.tcpPort = port;
}
/*
* Needed for serialization.
*/
private NodeInfo() {
id = null;
ip = null;
tcpPort = 0;
}
/**
* Creates a new instance.
* @param id node id
* @param ip node ip address
* @param port tcp port
* @return NodeInfo
*/
public static NodeInfo from(String id, String ip, int port) {
NodeInfo node = new NodeInfo(id, ip, port);
return node;
}
/**
* Returns the NodeInfo for a controller node.
* @param node controller node
* @return NodeInfo
*/
public static NodeInfo of(ControllerNode node) {
return NodeInfo.from(node.id().toString(), node.ip().toString(), node.tcpPort());
}
/**
* Returns node id.
* @return node id
*/
public String getId() {
return id;
}
/**
* Returns node ip.
* @return node ip
*/
public String getIp() {
return ip;
}
/**
* Returns node port.
* @return port
*/
public int getTcpPort() {
return tcpPort;
}
@Override
public int hashCode() {
return Objects.hash(id, ip, tcpPort);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof NodeInfo) {
NodeInfo that = (NodeInfo) o;
return Objects.equals(this.id, that.id) &&
Objects.equals(this.ip, that.ip) &&
Objects.equals(this.tcpPort, that.tcpPort);
}
return false;
}
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("ip", ip)
.add("tcpPort", tcpPort).toString();
}
}
\ No newline at end of file
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of a distributed cluster node store using Hazelcast.
* Implementation of a distributed cluster membership store and failure detector.
*/
package org.onosproject.store.cluster.impl;
......
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of the network configuration distributed store.
* Implementation of the distributed network configuration store.
*/
package org.onosproject.store.config.impl;
\ No newline at end of file
......
......@@ -17,28 +17,31 @@ package org.onosproject.store.core.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.ApplicationIdStore;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
......@@ -48,7 +51,7 @@ import com.google.common.collect.Maps;
*/
@Component(immediate = true, enabled = true)
@Service
public class ConsistentApplicationIdStore implements ApplicationIdStore {
public class DistributedApplicationIdStore implements ApplicationIdStore {
private final Logger log = getLogger(getClass());
......@@ -57,13 +60,12 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
private AtomicCounter appIdCounter;
private ConsistentMap<String, ApplicationId> registeredIds;
private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap();
private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap();
private static final Serializer SERIALIZER = Serializer.using(new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build());
private MapEventListener<String, ApplicationId> mapEventListener = event -> {
if (event.type() == MapEvent.Type.INSERT) {
idToAppIdCache.put(event.newValue().value().id(), event.newValue().value());
}
};
@Activate
public void activate() {
......@@ -71,75 +73,50 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
.withName("onos-app-ids")
.withSerializer(SERIALIZER)
.withSerializer(Serializer.using(KryoNamespaces.API))
.withRelaxedReadConsistency()
.build();
primeAppIds();
primeIdToAppIdCache();
registeredIds.addListener(mapEventListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
registeredIds.removeListener(mapEventListener);
log.info("Stopped");
}
@Override
public Set<ApplicationId> getAppIds() {
// TODO: Rework this when we have notification support in ConsistentMap.
primeAppIds();
return ImmutableSet.copyOf(nameToAppIdCache.values());
return ImmutableSet.copyOf(registeredIds.asJavaMap().values());
}
@Override
public ApplicationId getAppId(Short id) {
if (!idToAppIdCache.containsKey(id)) {
primeAppIds();
primeIdToAppIdCache();
}
return idToAppIdCache.get(id);
}
@Override
public ApplicationId getAppId(String name) {
ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
Versioned<ApplicationId> existingAppId = registeredIds.get(key);
return existingAppId != null ? existingAppId.value() : null;
});
if (appId != null) {
idToAppIdCache.putIfAbsent(appId.id(), appId);
}
return appId;
return registeredIds.asJavaMap().get(name);
}
@Override
public ApplicationId registerApplication(String name) {
ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
Versioned<ApplicationId> existingAppId = registeredIds.get(name);
if (existingAppId == null) {
int id = Tools.retryable(appIdCounter::incrementAndGet, StorageException.class, 1, 2000)
.get()
.intValue();
DefaultApplicationId newAppId = new DefaultApplicationId(id, name);
existingAppId = registeredIds.putIfAbsent(name, newAppId);
if (existingAppId != null) {
return existingAppId.value();
} else {
return newAppId;
}
} else {
return existingAppId.value();
}
});
idToAppIdCache.putIfAbsent(appId.id(), appId);
return appId;
return Versioned.valueOrNull(registeredIds.computeIfAbsent(name,
key -> new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name)));
}
private void primeAppIds() {
registeredIds.values()
.stream()
.map(Versioned::value)
private void primeIdToAppIdCache() {
registeredIds.asJavaMap()
.values()
.forEach(appId -> {
nameToAppIdCache.putIfAbsent(appId.name(), appId);
idToAppIdCache.putIfAbsent(appId.id(), appId);
});
}
......
......@@ -38,7 +38,7 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component(immediate = true, enabled = true)
@Service
public class ConsistentIdBlockStore implements IdBlockStore {
public class DistributedIdBlockStore implements IdBlockStore {
private final Logger log = getLogger(getClass());
private final Map<String, AtomicCounter> topicCounters = Maps.newConcurrentMap();
......
......@@ -34,7 +34,7 @@ import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLOCK_WRITE;
/**
* LogicalClockService implementation based on a AtomicCounter.
* LogicalClockService implementation based on a {@link AtomicCounter}.
*/
@Component(immediate = true, enabled = true)
@Service
......
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of a distributed application ID registry store using Hazelcast.
* Implementation of a distributed application registry.
*/
package org.onosproject.store.core.impl;
......
......@@ -14,6 +14,6 @@
* limitations under the License.
*/
/**
* Implementation of the group store.
* Implementation of a distributed group store.
*/
package org.onosproject.store.group.impl;
......
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of the distributed host store using p2p synchronization protocol.
* Implementation of a distributed host store.
*/
package org.onosproject.store.host.impl;
......
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultLink;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.Link.Type;
import org.onosproject.net.LinkKey;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkStore;
import org.onosproject.net.link.LinkStoreDelegate;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.minPriority;
import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.DefaultAnnotations.union;
import static org.onosproject.net.Link.State.ACTIVE;
import static org.onosproject.net.Link.State.INACTIVE;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.Link.Type.INDIRECT;
import static org.onosproject.net.LinkKey.linkKey;
import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure links in distributed data store
* that uses optimistic replication and gossip based techniques.
*/
//@Component(immediate = true, enabled = false)
@Service
public class GossipLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
// Timeout in milliseconds to process links on remote master node
private static final int REMOTE_MASTER_TIMEOUT = 1000;
// Default delay for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
private static final long DEFAULT_INITIAL_DELAY = 5;
// Default period for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
private static final long DEFAULT_PERIOD = 5;
private static long initialDelaySec = DEFAULT_INITIAL_DELAY;
private static long periodSec = DEFAULT_PERIOD;
private final Logger log = getLogger(getClass());
// Link inventory
private final ConcurrentMap<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>> linkDescs =
new ConcurrentHashMap<>();
// Link instance cache
private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
// Egress and ingress link sets
private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
// Remove links
private final Map<LinkKey, Timestamp> removedLinks = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalLinkEvent.class)
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
.register(LinkInjectedEvent.class)
.build("GossipLink"));
private ExecutorService executor;
private ScheduledExecutorService backgroundExecutors;
@Activate
public void activate() {
executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_UPDATE,
new InternalLinkEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_REMOVED,
new InternalLinkRemovedEventListener(), executor);
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_INJECTED,
new LinkInjectedEventListener(), executor);
// start anti-entropy thread
backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdownNow();
backgroundExecutors.shutdownNow();
try {
if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
log.error("Error during executor shutdown", e);
}
linkDescs.clear();
links.clear();
srcLinks.clear();
dstLinks.clear();
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
return Collections.unmodifiableCollection(links.values());
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
// lock for iteration
synchronized (srcLinks) {
return FluentIterable.from(srcLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
// lock for iteration
synchronized (dstLinks) {
return FluentIterable.from(dstLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.get(linkKey(src, dst));
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egress = new HashSet<>();
//
// Change `srcLinks` to ConcurrentMap<DeviceId, (Concurrent)Set>
// to remove this synchronized block, if we hit performance issue.
// SetMultiMap#get returns wrapped collection to provide modifiable-view.
// And the wrapped collection is not concurrent access safe.
//
// Our use case here does not require returned collection to be modifiable,
// so the wrapped collection forces us to lock the whole multiset,
// for benefit we don't need.
//
// Same applies to `dstLinks`
synchronized (srcLinks) {
for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
if (linkKey.src().equals(src)) {
Link link = links.get(linkKey);
if (link != null) {
egress.add(link);
} else {
log.debug("Egress link for {} was null, skipped", linkKey);
}
}
}
}
return egress;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingress = new HashSet<>();
synchronized (dstLinks) {
for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
if (linkKey.dst().equals(dst)) {
Link link = links.get(linkKey);
if (link != null) {
ingress.add(link);
} else {
log.debug("Ingress link for {} was null, skipped", linkKey);
}
}
}
}
return ingress;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId dstDeviceId = linkDescription.dst().deviceId();
final NodeId localNode = clusterService.getLocalNode().id();
final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
// Process link update only if we're the master of the destination node,
// otherwise signal the actual master.
LinkEvent linkEvent = null;
if (localNode.equals(dstNode)) {
Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
final Timestamped<LinkDescription> mergedDesc;
Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
synchronized (map) {
linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
mergedDesc = map.get(providerId);
}
if (linkEvent != null) {
log.debug("Notifying peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
}
} else {
// Only forward for ConfigProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.scheme().equals("cfg")) {
return null;
}
// FIXME Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
// redo ONOS-490
if (dstNode == null) {
// silently ignore
return null;
}
LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
// TODO check unicast return value
clusterCommunicator.unicast(linkInjectedEvent,
GossipLinkStoreMessageSubjects.LINK_INJECTED,
SERIALIZER::encode,
dstNode);
}
return linkEvent;
}
@Override
public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
Link link = getLink(src, dst);
if (link == null) {
return null;
}
if (link.isDurable()) {
// FIXME: this is not the right thing to call for the gossip store; will not sync link state!!!
return link.state() == INACTIVE ? null :
updateLink(linkKey(link.src(), link.dst()), link,
DefaultLink.builder()
.providerId(link.providerId())
.src(link.src())
.dst(link.dst())
.type(link.type())
.state(INACTIVE)
.isExpected(link.isExpected())
.annotations(link.annotations())
.build());
}
return removeLink(src, dst);
}
private LinkEvent createOrUpdateLinkInternal(
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
final LinkKey key = linkKey(linkDescription.value().src(),
linkDescription.value().dst());
Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
// if the link was previously removed, we should proceed if and
// only if this request is more recent.
Timestamp linkRemovedTimestamp = removedLinks.get(key);
if (linkRemovedTimestamp != null) {
if (linkDescription.isNewerThan(linkRemovedTimestamp)) {
removedLinks.remove(key);
} else {
log.trace("Link {} was already removed ignoring.", key);
return null;
}
}
final Link oldLink = links.get(key);
// update description
createOrUpdateLinkDescription(descs, providerId, linkDescription);
final Link newLink = composeLink(descs);
if (oldLink == null) {
return createLink(key, newLink);
}
return updateLink(key, oldLink, newLink);
}
}
// Guarded by linkDescs value (=locking each Link)
private Timestamped<LinkDescription> createOrUpdateLinkDescription(
Map<ProviderId, Timestamped<LinkDescription>> descs,
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
// merge existing annotations
Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
log.trace("local info is more up-to-date, ignoring {}.", linkDescription);
return null;
}
Timestamped<LinkDescription> newLinkDescription = linkDescription;
if (existingLinkDescription != null) {
// we only allow transition from INDIRECT -> DIRECT
final Type newType;
if (existingLinkDescription.value().type() == DIRECT) {
newType = DIRECT;
} else {
newType = linkDescription.value().type();
}
SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
linkDescription.value().annotations());
newLinkDescription = new Timestamped<>(
new DefaultLinkDescription(
linkDescription.value().src(),
linkDescription.value().dst(),
newType,
existingLinkDescription.value().isExpected(),
merged),
linkDescription.timestamp());
}
return descs.put(providerId, newLinkDescription);
}
// Creates and stores the link and returns the appropriate event.
// Guarded by linkDescs value (=locking each Link)
private LinkEvent createLink(LinkKey key, Link newLink) {
links.put(key, newLink);
srcLinks.put(newLink.src().deviceId(), key);
dstLinks.put(newLink.dst().deviceId(), key);
return new LinkEvent(LINK_ADDED, newLink);
}
// Updates, if necessary the specified link and returns the appropriate event.
// Guarded by linkDescs value (=locking each Link)
private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
// Note: INDIRECT -> DIRECT transition only
// so that BDDP discovered Link will not overwrite LDDP Link
if (oldLink.state() != newLink.state() ||
(oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
// strictly speaking following can be omitted
srcLinks.put(oldLink.src().deviceId(), key);
dstLinks.put(oldLink.dst().deviceId(), key);
return new LinkEvent(LINK_UPDATED, newLink);
}
return null;
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey key = linkKey(src, dst);
DeviceId dstDeviceId = dst.deviceId();
Timestamp timestamp = null;
try {
timestamp = deviceClockService.getTimestamp(dstDeviceId);
} catch (IllegalStateException e) {
log.debug("Failed to remove link {}, was not the master", key);
// there are times when this is called before mastership
// handoff correctly completes.
return null;
}
LinkEvent event = removeLinkInternal(key, timestamp);
if (event != null) {
log.debug("Notifying peers of a link removed topology event for a link "
+ "between src: {} and dst: {}", src, dst);
notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
}
return event;
}
private static Timestamped<LinkDescription> getPrimaryDescription(
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
synchronized (linkDescriptions) {
for (Entry<ProviderId, Timestamped<LinkDescription>>
e : linkDescriptions.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getValue();
}
}
}
return null;
}
// TODO: consider slicing out as Timestamp utils
/**
* Checks is timestamp is more recent than timestamped object.
*
* @param timestamp to check if this is more recent then other
* @param timestamped object to be tested against
* @return true if {@code timestamp} is more recent than {@code timestamped}
* or {@code timestamped is null}
*/
private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
checkNotNull(timestamp);
if (timestamped == null) {
return true;
}
return timestamp.compareTo(timestamped.timestamp()) > 0;
}
private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
= getOrCreateLinkDescriptions(key);
synchronized (linkDescriptions) {
if (linkDescriptions.isEmpty()) {
// never seen such link before. keeping timestamp for record
removedLinks.put(key, timestamp);
return null;
}
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
if (!isMoreRecent(timestamp, prim)) {
// outdated remove request, ignore
return null;
}
removedLinks.put(key, timestamp);
Link link = links.remove(key);
linkDescriptions.clear();
if (link != null) {
srcLinks.remove(link.src().deviceId(), key);
dstLinks.remove(link.dst().deviceId(), key);
return new LinkEvent(LINK_REMOVED, link);
}
return null;
}
}
/**
* Creates concurrent readable, synchronized HashMultimap.
*
* @return SetMultimap
*/
private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
return synchronizedSetMultimap(
Multimaps.newSetMultimap(new ConcurrentHashMap<>(),
() -> Sets.newConcurrentHashSet()));
}
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private static ProviderId pickBaseProviderId(
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
if (!e.getKey().isAncillary()) {
// found primary
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
fallBackPrimary = e.getKey();
}
}
return fallBackPrimary;
}
// Guarded by linkDescs value (=locking each Link)
private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
ProviderId baseProviderId = pickBaseProviderId(descs);
Timestamped<LinkDescription> base = descs.get(baseProviderId);
ConnectPoint src = base.value().src();
ConnectPoint dst = base.value().dst();
Type type = base.value().type();
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, base.value().annotations());
for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
if (baseProviderId.equals(e.getKey())) {
continue;
}
// Note: In the long run we should keep track of Description timestamp
// and only merge conflicting keys when timestamp is newer
// Currently assuming there will never be a key conflict between
// providers
// annotation merging. not so efficient, should revisit later
annotations = merge(annotations, e.getValue().value().annotations());
}
//boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true");
// TEMP
Link.State initialLinkState = base.value().isExpected() ? ACTIVE : INACTIVE;
return DefaultLink.builder()
.providerId(baseProviderId)
.src(src)
.dst(dst)
.type(type)
.state(initialLinkState)
.isExpected(base.value().isExpected())
.annotations(annotations)
.build();
}
private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
Map<ProviderId, Timestamped<LinkDescription>> r;
r = linkDescs.get(key);
if (r != null) {
return r;
}
r = new HashMap<>();
final Map<ProviderId, Timestamped<LinkDescription>> concurrentlyAdded;
concurrentlyAdded = linkDescs.putIfAbsent(key, r);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
} else {
return r;
}
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
/**
* Returns a Function to lookup Link instance using LinkKey from cache.
*
* @return lookup link function
*/
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
}
private final class LookupLink implements Function<LinkKey, Link> {
@Override
public Link apply(LinkKey input) {
if (input == null) {
return null;
} else {
return links.get(input);
}
}
}
private void notifyDelegateIfNotNull(LinkEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
private void broadcastMessage(MessageSubject subject, Object event) {
clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
}
private void notifyPeers(InternalLinkEvent event) {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
private void notifyPeers(InternalLinkRemovedEvent event) {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
// notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkEvent event) {
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
} catch (IOException e) {
log.debug("Failed to notify peer {} with message {}", peer, event);
}
}
// notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
} catch (IOException e) {
log.debug("Failed to notify peer {} with message {}", peer, event);
}
}
/**
* sets the time to delay first execution for anti-entropy.
* (scheduleAtFixedRate of ScheduledExecutorService)
*
* @param delay the time to delay first execution for anti-entropy
*/
private void setInitialDelaySec(long delay) {
checkArgument(delay >= 0, "Initial delay of scheduleAtFixedRate() must be 0 or more");
initialDelaySec = delay;
}
/**
* sets the period between successive execution for anti-entropy.
* (scheduleAtFixedRate of ScheduledExecutorService)
*
* @param period the period between successive execution for anti-entropy
*/
private void setPeriodSec(long period) {
checkArgument(period > 0, "Period of scheduleAtFixedRate() must be greater than 0");
periodSec = period;
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.debug("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
.transform(toNodeId())
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.trace("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
LinkAntiEntropyAdvertisement ad = createAdvertisement();
if (Thread.currentThread().isInterrupted()) {
log.debug("Interrupted, quitting");
return;
}
try {
unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
return;
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private LinkAntiEntropyAdvertisement createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
linkDescs.forEach((linkKey, linkDesc) -> {
synchronized (linkDesc) {
for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
}
}
});
linkTombstones.putAll(removedLinks);
return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
}
private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
final NodeId sender = ad.sender();
boolean localOutdated = false;
for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
l : linkDescs.entrySet()) {
final LinkKey key = l.getKey();
final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
synchronized (link) {
Timestamp localLatest = removedLinks.get(key);
for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
final ProviderId providerId = p.getKey();
final Timestamped<LinkDescription> pDesc = p.getValue();
final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
// remote
Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
if (remoteTimestamp == null) {
remoteTimestamp = ad.linkTombstones().get(key);
}
if (remoteTimestamp == null ||
pDesc.isNewerThan(remoteTimestamp)) {
// I have more recent link description. update peer.
notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
} else {
final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
if (remoteLive != null &&
remoteLive.compareTo(pDesc.timestamp()) > 0) {
// I have something outdated
localOutdated = true;
}
}
// search local latest along the way
if (localLatest == null ||
pDesc.isNewerThan(localLatest)) {
localLatest = pDesc.timestamp();
}
}
// Tests if remote remove is more recent then local latest.
final Timestamp remoteRemove = ad.linkTombstones().get(key);
if (remoteRemove != null) {
if (localLatest != null &&
localLatest.compareTo(remoteRemove) < 0) {
// remote remove is more recent
notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
}
}
}
}
// populate remove info if not known locally
for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
final LinkKey key = remoteRm.getKey();
final Timestamp remoteRemove = remoteRm.getValue();
// relying on removeLinkInternal to ignore stale info
notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
}
if (localOutdated) {
// send back advertisement to speed up convergence
try {
unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
createAdvertisement());
} catch (IOException e) {
log.debug("Failed to send back active advertisement");
}
}
}
private final class InternalLinkEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received link event from peer: {}", message.sender());
InternalLinkEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
try {
notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
} catch (Exception e) {
log.warn("Exception thrown handling link event", e);
}
}
}
private final class InternalLinkRemovedEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received link removed event from peer: {}", message.sender());
InternalLinkRemovedEvent event = SERIALIZER.decode(message.payload());
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
try {
notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
} catch (Exception e) {
log.warn("Exception thrown handling link removed", e);
}
}
}
private final class InternalLinkAntiEntropyAdvertisementListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
try {
handleAntiEntropyAdvertisement(advertisement);
} catch (Exception e) {
log.warn("Exception thrown while handling Link advertisements", e);
throw e;
}
}
}
private final class LinkInjectedEventListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received injected link event from peer: {}", message.sender());
LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
ProviderId providerId = linkInjectedEvent.providerId();
LinkDescription linkDescription = linkInjectedEvent.linkDescription();
final DeviceId deviceId = linkDescription.dst().deviceId();
if (!deviceClockService.isTimestampAvailable(deviceId)) {
// workaround for ONOS-1208
log.warn("Not ready to accept update. Dropping {}", linkDescription);
return;
}
try {
createOrUpdateLink(providerId, linkDescription);
} catch (Exception e) {
log.warn("Exception thrown while handling link injected event", e);
}
}
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import org.onosproject.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by GossipLinkStore peer-peer communication.
*/
public final class GossipLinkStoreMessageSubjects {
private GossipLinkStoreMessageSubjects() {}
public static final MessageSubject LINK_UPDATE =
new MessageSubject("peer-link-update");
public static final MessageSubject LINK_REMOVED =
new MessageSubject("peer-link-removed");
public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
new MessageSubject("link-enti-entropy-advertisement");
public static final MessageSubject LINK_INJECTED =
new MessageSubject("peer-link-injected");
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import com.google.common.base.MoreObjects;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a device
* change event.
*/
public class InternalLinkEvent {
private final ProviderId providerId;
private final Timestamped<LinkDescription> linkDescription;
protected InternalLinkEvent(
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
this.providerId = providerId;
this.linkDescription = linkDescription;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<LinkDescription> linkDescription() {
return linkDescription;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkDescription", linkDescription)
.toString();
}
// for serializer
protected InternalLinkEvent() {
this.providerId = null;
this.linkDescription = null;
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import org.onosproject.net.LinkKey;
import org.onosproject.store.Timestamp;
import com.google.common.base.MoreObjects;
/**
* Information published by GossipLinkStore to notify peers of a link
* being removed.
*/
public class InternalLinkRemovedEvent {
private final LinkKey linkKey;
private final Timestamp timestamp;
/**
* Creates a InternalLinkRemovedEvent.
* @param linkKey identifier of the removed link.
* @param timestamp timestamp of when the link was removed.
*/
public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) {
this.linkKey = linkKey;
this.timestamp = timestamp;
}
public LinkKey linkKey() {
return linkKey;
}
public Timestamp timestamp() {
return timestamp;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("linkKey", linkKey)
.add("timestamp", timestamp)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private InternalLinkRemovedEvent() {
linkKey = null;
timestamp = null;
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.LinkKey;
import org.onosproject.store.Timestamp;
/**
* Link AE Advertisement message.
*/
public class LinkAntiEntropyAdvertisement {
private final NodeId sender;
private final Map<LinkFragmentId, Timestamp> linkTimestamps;
private final Map<LinkKey, Timestamp> linkTombstones;
public LinkAntiEntropyAdvertisement(NodeId sender,
Map<LinkFragmentId, Timestamp> linkTimestamps,
Map<LinkKey, Timestamp> linkTombstones) {
this.sender = checkNotNull(sender);
this.linkTimestamps = checkNotNull(linkTimestamps);
this.linkTombstones = checkNotNull(linkTombstones);
}
public NodeId sender() {
return sender;
}
public Map<LinkFragmentId, Timestamp> linkTimestamps() {
return linkTimestamps;
}
public Map<LinkKey, Timestamp> linkTombstones() {
return linkTombstones;
}
// For serializer
@SuppressWarnings("unused")
private LinkAntiEntropyAdvertisement() {
this.sender = null;
this.linkTimestamps = null;
this.linkTombstones = null;
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import java.util.Objects;
import org.onosproject.net.LinkKey;
import org.onosproject.net.provider.ProviderId;
import com.google.common.base.MoreObjects;
/**
* Identifier for LinkDescription from a Provider.
*/
public final class LinkFragmentId {
public final ProviderId providerId;
public final LinkKey linkKey;
public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
this.providerId = providerId;
this.linkKey = linkKey;
}
public LinkKey linkKey() {
return linkKey;
}
public ProviderId providerId() {
return providerId;
}
@Override
public int hashCode() {
return Objects.hash(providerId, linkKey);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LinkFragmentId)) {
return false;
}
LinkFragmentId that = (LinkFragmentId) obj;
return Objects.equals(this.linkKey, that.linkKey) &&
Objects.equals(this.providerId, that.providerId);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkKey", linkKey)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private LinkFragmentId() {
this.providerId = null;
this.linkKey = null;
}
}
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import com.google.common.base.MoreObjects;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.provider.ProviderId;
public class LinkInjectedEvent {
ProviderId providerId;
LinkDescription linkDescription;
public LinkInjectedEvent(ProviderId providerId, LinkDescription linkDescription) {
this.providerId = providerId;
this.linkDescription = linkDescription;
}
public ProviderId providerId() {
return providerId;
}
public LinkDescription linkDescription() {
return linkDescription;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkDescription", linkDescription)
.toString();
}
// for serializer
protected LinkInjectedEvent() {
this.providerId = null;
this.linkDescription = null;
}
}
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of distributed link store using p2p synchronization protocol.
* Implementation of distributed link store using eventually consistent map primitive.
*/
package org.onosproject.store.link.impl;
......
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.mastership.impl;
import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.MastershipRole.NONE;
import static org.onosproject.net.MastershipRole.STANDBY;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.net.MastershipRole;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.Lists;
/**
* A structure that holds node mastership roles associated with a
* {@link org.onosproject.net.DeviceId}. This structure needs to be locked through IMap.
*/
final class RoleValue {
protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
/**
* Constructs empty RoleValue.
*/
public RoleValue() {
value.put(MastershipRole.MASTER, new LinkedList<>());
value.put(MastershipRole.STANDBY, new LinkedList<>());
value.put(MastershipRole.NONE, new LinkedList<>());
}
/**
* Constructs copy of specified RoleValue.
*
* @param original original to create copy from
*/
public RoleValue(final RoleValue original) {
value.put(MASTER, Lists.newLinkedList(original.value.get(MASTER)));
value.put(STANDBY, Lists.newLinkedList(original.value.get(STANDBY)));
value.put(NONE, Lists.newLinkedList(original.value.get(NONE)));
}
// exposing internals for serialization purpose only
Map<MastershipRole, List<NodeId>> value() {
return Collections.unmodifiableMap(value);
}
public List<NodeId> nodesOfRole(MastershipRole type) {
return value.get(type);
}
/**
* Returns the first node to match the MastershipRole, or if there
* are none, null.
*
* @param type the role
* @return a node ID or null
*/
public NodeId get(MastershipRole type) {
return value.get(type).isEmpty() ? null : value.get(type).get(0);
}
public boolean contains(MastershipRole type, NodeId nodeId) {
return value.get(type).contains(nodeId);
}
public MastershipRole getRole(NodeId nodeId) {
if (contains(MASTER, nodeId)) {
return MASTER;
}
if (contains(STANDBY, nodeId)) {
return STANDBY;
}
return NONE;
}
/**
* Associates a node to a certain role.
*
* @param type the role
* @param nodeId the node ID of the node to associate
* @return true if modified
*/
public boolean add(MastershipRole type, NodeId nodeId) {
List<NodeId> nodes = value.get(type);
if (!nodes.contains(nodeId)) {
return nodes.add(nodeId);
}
return false;
}
/**
* Removes a node from a certain role.
*
* @param type the role
* @param nodeId the ID of the node to remove
* @return true if modified
*/
public boolean remove(MastershipRole type, NodeId nodeId) {
List<NodeId> nodes = value.get(type);
if (!nodes.isEmpty()) {
return nodes.remove(nodeId);
} else {
return false;
}
}
/**
* Reassigns a node from one role to another. If the node was not of the
* old role, it will still be assigned the new role.
*
* @param nodeId the Node ID of node changing roles
* @param from the old role
* @param to the new role
* @return true if modified
*/
public boolean reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
boolean modified = remove(from, nodeId);
modified |= add(to, nodeId);
return modified;
}
/**
* Replaces a node in one role with another node. Even if there is no node to
* replace, the new node is associated to the role.
*
* @param from the old NodeId to replace
* @param to the new NodeId
* @param type the role associated with the old NodeId
* @return true if modified
*/
public boolean replace(NodeId from, NodeId to, MastershipRole type) {
boolean modified = remove(type, from);
modified |= add(type, to);
return modified;
}
/**
* Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
* may be empty, so the values should be checked for safety.
*
* @return the RoleInfo.
*/
public RoleInfo roleInfo() {
return new RoleInfo(
get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
}
@Override
public String toString() {
ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
helper.add(el.getKey().toString(), el.getValue());
}
return helper.toString();
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.mastership.impl;
import java.util.List;
import java.util.Map;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.MastershipRole;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Serializer for RoleValues used by {@link org.onosproject.mastership.MastershipStore}.
*/
public class RoleValueSerializer extends Serializer<RoleValue> {
//RoleValues are assumed to hold a Map of MastershipRoles (an enum)
//to a List of NodeIds.
@Override
public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
RoleValue rv = new RoleValue();
int size = input.readInt();
for (int i = 0; i < size; i++) {
MastershipRole role = MastershipRole.values()[input.readInt()];
int s = input.readInt();
for (int j = 0; j < s; j++) {
rv.add(role, new NodeId(input.readString()));
}
}
return rv;
}
@Override
public void write(Kryo kryo, Output output, RoleValue type) {
final Map<MastershipRole, List<NodeId>> map = type.value();
output.writeInt(map.size());
for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
output.writeInt(el.getKey().ordinal());
List<NodeId> nodes = el.getValue();
output.writeInt(nodes.size());
for (NodeId n : nodes) {
output.writeString(n.toString());
}
}
}
}
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of a distributed mastership store using Hazelcast.
* Implementation of a distributed mastership store.
*/
package org.onosproject.store.mastership.impl;
......
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.serializers.custom;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
/**
* Creates a serializer for {@link ClusterMessage}.
*/
public ClusterMessageSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, ClusterMessage message) {
kryo.writeClassAndObject(output, message.sender());
kryo.writeClassAndObject(output, message.subject());
output.writeInt(message.payload().length);
output.writeBytes(message.payload());
}
@Override
public ClusterMessage read(Kryo kryo, Input input,
Class<ClusterMessage> type) {
NodeId sender = (NodeId) kryo.readClassAndObject(input);
MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
int payloadSize = input.readInt();
byte[] payload = input.readBytes(payloadSize);
return new ClusterMessage(sender, subject, payload);
}
}
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.serializers.custom;
import org.onosproject.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
/**
* Creates a serializer for {@link MessageSubject}.
*/
public MessageSubjectSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, MessageSubject object) {
output.writeString(object.value());
}
@Override
public MessageSubject read(Kryo kryo, Input input,
Class<MessageSubject> type) {
return new MessageSubject(input.readString());
}
}
......@@ -15,8 +15,6 @@
*/
/**
* Cluster messaging and distributed store serializers.
* Distributed store serializers.
*/
//FIXME what is the right name for this package?
//FIXME can this be moved to onos-core-serializers?
package org.onosproject.store.serializers.custom;
......
......@@ -17,6 +17,7 @@
package org.onosproject.store.statistic.impl;
import com.google.common.base.Objects;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -38,6 +39,7 @@ import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
......@@ -59,8 +61,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -89,6 +89,9 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
private NodeId local;
......
......@@ -38,6 +38,7 @@ import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
......@@ -59,8 +60,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -85,6 +84,9 @@ public class DistributedStatisticStore implements StatisticStore {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
private Map<ConnectPoint, InternalStatisticRepresentation> representations =
new ConcurrentHashMap<>();
......
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.statistic.impl;
import org.onosproject.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by DistributedStatisticStore peer-peer communication.
*/
public final class StatisticStoreMessageSubjects {
private StatisticStoreMessageSubjects() {}
public static final MessageSubject GET_CURRENT =
new MessageSubject("peer-return-current");
public static final MessageSubject GET_PREVIOUS =
new MessageSubject("peer-return-previous");
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging.impl;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.packet.IpAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests of the cluster communication manager.
*/
public class ClusterCommunicationManagerTest {
private static final NodeId N1 = new NodeId("n1");
private static final NodeId N2 = new NodeId("n2");
private static final int P1 = 9881;
private static final int P2 = 9882;
private static final IpAddress IP = IpAddress.valueOf("127.0.0.1");
private ClusterCommunicationManager ccm1;
private ClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate();
private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
@Before
public void setUp() throws Exception {
NettyMessagingManager messagingService = new NettyMessagingManager();
messagingService.activate();
ccm1 = new ClusterCommunicationManager();
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
ccm2.activate();
// ccm1.initialize(node1, cnd1);
// ccm2.initialize(node2, cnd2);
}
@After
public void tearDown() {
ccm1.deactivate();
ccm2.deactivate();
}
@Ignore("FIXME: failing randomly?")
@Test
public void connect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
}
@Test
@Ignore
public void disconnect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.deactivate();
//
// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
}
private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
throws InterruptedException {
assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
assertEquals("incorrect event", op, delegate.op);
assertEquals("incorrect event node", nodeId, delegate.nodeId);
}
enum Op { DETECTED, VANISHED, REMOVED }
private class TestDelegate implements ClusterNodesDelegate {
Op op;
CountDownLatch latch;
NodeId nodeId;
@Override
public DefaultControllerNode nodeDetected(NodeId nodeId, IpAddress ip, int tcpPort) {
latch(nodeId, Op.DETECTED);
return new DefaultControllerNode(nodeId, ip, tcpPort);
}
@Override
public void nodeVanished(NodeId nodeId) {
latch(nodeId, Op.VANISHED);
}
@Override
public void nodeRemoved(NodeId nodeId) {
latch(nodeId, Op.REMOVED);
}
private void latch(NodeId nodeId, Op op) {
this.op = op;
this.nodeId = nodeId;
latch.countDown();
}
}
}
......@@ -17,7 +17,6 @@ package org.onosproject.store.link.impl;
import com.google.common.collect.Iterables;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
......@@ -59,8 +58,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
......@@ -76,7 +73,8 @@ import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
/**
* Test of the GossipLinkStoreTest implementation.
*/
public class GossipLinkStoreTest {
@Ignore
public class ECLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
......@@ -114,10 +112,9 @@ public class GossipLinkStoreTest {
private static final ControllerNode ONOS2 =
new DefaultControllerNode(NID2, IpAddress.valueOf("127.0.0.2"));
private GossipLinkStore linkStoreImpl;
private ECLinkStore linkStoreImpl;
private LinkStore linkStore;
private final AtomicLong ticker = new AtomicLong();
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
......@@ -139,7 +136,7 @@ public class GossipLinkStoreTest {
expectLastCall().anyTimes();
replay(clusterCommunicator);
linkStoreImpl = new GossipLinkStore();
linkStoreImpl = new ECLinkStore();
linkStoreImpl.deviceClockService = deviceClockService;
linkStoreImpl.clusterCommunicator = clusterCommunicator;
linkStoreImpl.clusterService = new TestClusterService();
......@@ -163,28 +160,10 @@ public class GossipLinkStoreTest {
SparseAnnotations... annotations) {
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
reset(clusterCommunicator);
clusterCommunicator.<InternalLinkEvent>broadcast(
anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
verify(clusterCommunicator);
}
private <T> void resetCommunicatorExpectingSingleBroadcast(
Capture<T> message,
Capture<MessageSubject> subject,
Capture<Function<T, byte[]>> encoder) {
message.reset();
subject.reset();
encoder.reset();
reset(clusterCommunicator);
clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
expectLastCall().once();
replay(clusterCommunicator);
}
private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) {
putLink(key.src().deviceId(), key.src().port(),
key.dst().deviceId(), key.dst().port(),
......@@ -358,57 +337,26 @@ public class GossipLinkStoreTest {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
Capture<InternalLinkEvent> message = new Capture<>();
Capture<MessageSubject> subject = new Capture<>();
Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add link
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
LinkEvent event = linkStore.createOrUpdateLink(PID,
linkDescription);
verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
assertEquals(LINK_ADDED, event.type());
// update link type
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
assertEquals(LINK_UPDATED, event2.type());
// no change
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
verifyNoBroadcastMessage(message);
assertNull("No change event expected", event3);
}
private <T> void verifyNoBroadcastMessage(Capture<T> message) {
assertFalse("No broadcast expected", message.hasCaptured());
}
private void verifyLinkBroadcastMessage(ProviderId providerId,
NodeId sender,
ConnectPoint src,
ConnectPoint dst,
Type type,
Capture<InternalLinkEvent> actualLinkEvent,
Capture<MessageSubject> actualSubject,
Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
verify(clusterCommunicator);
assertTrue(actualLinkEvent.hasCaptured());
assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
assertEquals(providerId, actualLinkEvent.getValue().providerId());
assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
}
private static void assertLinkDescriptionEquals(ConnectPoint src,
ConnectPoint dst,
Type type,
......@@ -424,33 +372,23 @@ public class GossipLinkStoreTest {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
Capture<InternalLinkEvent> message = new Capture<>();
Capture<MessageSubject> subject = new Capture<>();
Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add Ancillary link
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
assertNotNull("Ancillary only link is ignored", event);
// add Primary link
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, INDIRECT, A2));
verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
assertEquals(LINK_UPDATED, event2.type());
// update link type
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2));
verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
......@@ -458,38 +396,30 @@ public class GossipLinkStoreTest {
// no change
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event4 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
verifyNoBroadcastMessage(message);
assertNull("No change event expected", event4);
// update link annotation (Primary)
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event5 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2_2));
verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
assertEquals(LINK_UPDATED, event5.type());
// update link annotation (Ancillary)
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, DIRECT, A1_2));
verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
assertEquals(LINK_UPDATED, event6.type());
// update link type (Ancillary) : ignored
resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, EDGE));
verifyNoBroadcastMessage(message);
assertNull("Ancillary change other than annotation is ignored", event7);
}
......
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import static org.onosproject.net.DeviceId.deviceId;
import org.junit.Test;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.LinkKey;
import org.onosproject.net.PortNumber;
import org.onosproject.net.provider.ProviderId;
import com.google.common.testing.EqualsTester;
public class LinkFragmentIdTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
private static final ConnectPoint CP3 = new ConnectPoint(DID1, P2);
private static final ConnectPoint CP4 = new ConnectPoint(DID2, P3);
private static final LinkKey L1 = LinkKey.linkKey(CP1, CP2);
private static final LinkKey L2 = LinkKey.linkKey(CP3, CP4);
@Test
public void testEquals() {
new EqualsTester()
.addEqualityGroup(new LinkFragmentId(L1, PID),
new LinkFragmentId(L1, PID))
.addEqualityGroup(new LinkFragmentId(L2, PID),
new LinkFragmentId(L2, PID))
.addEqualityGroup(new LinkFragmentId(L1, PIDA),
new LinkFragmentId(L1, PIDA))
.addEqualityGroup(new LinkFragmentId(L2, PIDA),
new LinkFragmentId(L2, PIDA))
.testEquals();
}
}
/*
* Copyright 2014-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.mastership.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onosproject.net.MastershipRole.*;
import org.junit.Test;
import org.onosproject.cluster.NodeId;
import com.google.common.collect.Sets;
public class RoleValueTest {
private static final RoleValue RV = new RoleValue();
private static final NodeId NID1 = new NodeId("node1");
private static final NodeId NID2 = new NodeId("node2");
private static final NodeId NID3 = new NodeId("node3");
@Test
public void add() {
assertEquals("faulty initialization: ", 3, RV.value.size());
RV.add(MASTER, NID1);
RV.add(STANDBY, NID2);
RV.add(STANDBY, NID3);
assertEquals("wrong nodeID: ", NID1, RV.get(MASTER));
assertTrue("wrong nodeIDs: ",
Sets.newHashSet(NID3, NID2).containsAll(RV.nodesOfRole(STANDBY)));
}
}