Madan Jampani
Committed by Gerrit Code Review

Use mastershipService instead of replicaInfoService to determine device mastership

Change-Id: I9d07351bbd024e02b2b116dc011a8eac2f79cda1
......@@ -26,6 +26,8 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
......@@ -37,8 +39,6 @@ import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
......@@ -73,7 +73,7 @@ public class DistributedStatisticStore implements StatisticStore {
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -200,12 +200,12 @@ public class DistributedStatisticStore implements StatisticStore {
@Override
public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
......@@ -213,7 +213,7 @@ public class DistributedStatisticStore implements StatisticStore {
GET_CURRENT,
SERIALIZER::encode,
SERIALIZER::decode,
replicaInfo.master().get()),
master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
......@@ -228,12 +228,12 @@ public class DistributedStatisticStore implements StatisticStore {
@Override
public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
......@@ -241,7 +241,7 @@ public class DistributedStatisticStore implements StatisticStore {
GET_PREVIOUS,
SERIALIZER::encode,
SERIALIZER::decode,
replicaInfo.master().get()),
master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
......
......@@ -24,6 +24,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
import org.onosproject.incubator.net.resource.label.LabelResource;
import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
......@@ -33,6 +34,7 @@ import org.onosproject.incubator.net.resource.label.LabelResourceId;
import org.onosproject.incubator.net.resource.label.LabelResourcePool;
import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
import org.onosproject.incubator.net.resource.label.LabelResourceStore;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
......@@ -40,8 +42,6 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
......@@ -72,7 +72,7 @@ public class DistributedLabelResourceStore
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -210,27 +210,25 @@ public class DistributedLabelResourceStore
return false;
}
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
.deviceId());
NodeId master = mastershipService.getMasterFor(pool.deviceId());
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to getFlowEntries: No master for {}", pool);
if (master == null) {
log.warn("Failed to create label resource pool: No master for {}", pool);
return false;
}
if (replicaInfo.master().get()
.equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return internalCreate(pool);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), pool.deviceId());
master, pool.deviceId());
return complete(clusterCommunicator
.sendAndReceive(pool,
LabelResourceMessageSubjects.LABEL_POOL_CREATED,
SERIALIZER::encode, SERIALIZER::decode,
replicaInfo.master().get()));
master));
}
private boolean internalCreate(LabelResourcePool pool) {
......@@ -253,27 +251,26 @@ public class DistributedLabelResourceStore
if (device == null) {
return false;
}
ReplicaInfo replicaInfo = replicaInfoManager
.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to getFlowEntries: No master for {}", deviceId);
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
return false;
}
if (replicaInfo.master().get()
.equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return internalDestroy(deviceId);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
log.trace("Forwarding request to {}, which is the primary (master) for device {}",
master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(deviceId,
LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
SERIALIZER::encode, SERIALIZER::decode,
replicaInfo.master().get()));
master));
}
private boolean internalDestroy(DeviceId deviceId) {
......@@ -301,27 +298,25 @@ public class DistributedLabelResourceStore
deviceId,
LabelResourceRequest.Type.APPLY,
applyNum, null);
ReplicaInfo replicaInfo = replicaInfoManager
.getReplicaInfoFor(deviceId);
NodeId master = mastershipService.getMasterFor(deviceId);
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to getFlowEntries: No master for {}", deviceId);
if (master == null) {
log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
return Collections.emptyList();
}
if (replicaInfo.master().get()
.equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return internalApply(request);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
log.trace("Forwarding request to {}, which is the primary (master) for device {}",
master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(request,
LabelResourceMessageSubjects.LABEL_POOL_APPLY,
SERIALIZER::encode, SERIALIZER::decode,
replicaInfo.master().get()));
master));
}
private Collection<LabelResource> internalApply(LabelResourceRequest request) {
......@@ -388,27 +383,25 @@ public class DistributedLabelResourceStore
deviceId,
LabelResourceRequest.Type.RELEASE,
0, collection);
ReplicaInfo replicaInfo = replicaInfoManager
.getReplicaInfoFor(deviceId);
NodeId master = mastershipService.getMasterFor(deviceId);
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to getFlowEntries: No master for {}", deviceId);
if (master == null) {
log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
return false;
}
if (replicaInfo.master().get()
.equals(clusterService.getLocalNode().id())) {
if (master.equals(clusterService.getLocalNode().id())) {
return internalRelease(request);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
log.trace("Forwarding request to {}, which is the primary (master) for device {}",
master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(request,
LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
SERIALIZER::encode, SERIALIZER::decode,
replicaInfo.master().get()));
master));
}
return false;
}
......