Madan Jampani
Committed by Gerrit Code Review

Always use mastershipService for querying device mastership in FlowRuleStore

Change-Id: I68051153e9555bd0e5b632fa30e7c4d844cf2163
......@@ -61,7 +61,6 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
......@@ -320,9 +319,7 @@ public class NewDistributedFlowRuleStore
@Override
public FlowEntry getFlowEntry(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
NodeId master = replicaInfo.master().orNull();
NodeId master = mastershipService.getMasterFor(rule.deviceId());
if (master == null) {
log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
......@@ -348,9 +345,7 @@ public class NewDistributedFlowRuleStore
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
NodeId master = replicaInfo.master().orNull();
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("Failed to getFlowEntries: No master for {}", deviceId);
......@@ -391,9 +386,7 @@ public class NewDistributedFlowRuleStore
}
DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
NodeId master = replicaInfo.master().orNull();
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {} : flows will be marked for removal", deviceId);
......@@ -418,7 +411,7 @@ public class NewDistributedFlowRuleStore
APPLY_BATCH_FLOWS,
SERIALIZER::encode,
master)) {
log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
log.warn("Failed to storeBatch: {} to {}", operation, master);
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
......@@ -491,8 +484,8 @@ public class NewDistributedFlowRuleStore
@Override
public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (Objects.equal(local, replicaInfo.master().orNull())) {
NodeId master = mastershipService.getMasterFor(rule.deviceId());
if (Objects.equal(local, master)) {
return addOrUpdateFlowRuleInternal(rule);
}
......@@ -524,8 +517,7 @@ public class NewDistributedFlowRuleStore
@Override
public FlowRuleEvent removeFlowRule(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
NodeId master = replicaInfo.master().orNull();
NodeId master = mastershipService.getMasterFor(deviceId);
if (Objects.equal(local, master)) {
// bypass and handle it locally
......@@ -580,9 +572,8 @@ public class NewDistributedFlowRuleStore
log.debug("received batch request {}", operation);
final DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (!Objects.equal(local, master)) {
Set<FlowRule> failures = new HashSet<>(operation.size());
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
......@@ -618,7 +609,8 @@ public class NewDistributedFlowRuleStore
public void event(ReplicaInfoEvent event) {
if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
DeviceId deviceId = event.subject();
if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (!Objects.equal(local, master)) {
// ignore since this event is for a device this node does not manage.
return;
}
......
......@@ -15,9 +15,7 @@
*/
package org.onosproject.store.flow.impl;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -40,8 +38,6 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
......@@ -67,8 +63,6 @@ public class ReplicaInfoManager implements ReplicaInfoService {
protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
listenerRegistry = new ListenerRegistry<>();
private final Map<DeviceId, ReplicaInfo> deviceReplicaInfoMap = Maps.newConcurrentMap();
@Activate
public void activate() {
eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
......@@ -85,9 +79,7 @@ public class ReplicaInfoManager implements ReplicaInfoService {
@Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
return deviceReplicaInfoMap.computeIfAbsent(
deviceId,
id -> buildFromRoleInfo(mastershipService.getNodesFor(deviceId)));
return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
}
@Override
......@@ -110,12 +102,7 @@ public class ReplicaInfoManager implements ReplicaInfoService {
@Override
public void event(MastershipEvent event) {
final DeviceId deviceId = event.subject();
final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
ReplicaInfo oldReplicaInfo = deviceReplicaInfoMap.put(deviceId, replicaInfo);
if (Objects.equal(oldReplicaInfo, replicaInfo)) {
return;
}
switch (event.type()) {
case MASTER_CHANGED:
eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
......