Yuta HIGUCHI
Committed by Yuta Higuchi

DistributedFlowRuleStore: synchronized -> Reader/Writer lock

fix for ONOS-195

Change-Id: I3e15104225878d1616fa790095695400bcc43697
......@@ -60,6 +60,7 @@ public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDe
* Stores a batch of flow rules.
*
* @param batchOperation batch of flow rules.
* A batch can contain flow rules for a single device only.
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*/
......
......@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
......@@ -109,7 +110,8 @@ public class DistributedFlowRuleStore
private final Logger log = getLogger(getClass());
// primary data:
// read/write needs to be synchronized
// read/write needs to be locked
private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, StoredFlowEntry> flowEntries
= ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
......@@ -186,7 +188,7 @@ public class DistributedFlowRuleStore
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received get flow entry request for {}", rule);
log.debug("received get flow entry request for {}", rule);
FlowEntry flowEntry = getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
......@@ -201,7 +203,7 @@ public class DistributedFlowRuleStore
@Override
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
log.debug("Received get flow entries request for {} from {}", deviceId, message.sender());
Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
......@@ -240,21 +242,20 @@ public class DistributedFlowRuleStore
}
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
public FlowEntry getFlowEntry(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (!replicaInfo.master().isPresent()) {
log.warn("No master for {}", rule);
// TODO: revisit if this should be returning null.
// FIXME: throw a FlowStoreException
throw new RuntimeException("No master for " + rule);
// TODO: should we try returning from backup?
return null;
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntryInternal(rule);
}
log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
log.debug("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
ClusterMessage message = new ClusterMessage(
......@@ -271,25 +272,28 @@ public class DistributedFlowRuleStore
}
}
private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
flowEntriesLock.readLock().lock();
try {
for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
}
} finally {
flowEntriesLock.readLock().unlock();
}
return null;
}
@Override
public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
log.warn("No master for {}", deviceId);
// TODO: revisit if this should be returning empty collection or throwing exception.
// FIXME: throw a FlowStoreException
//throw new RuntimeException("No master for " + deviceId);
// TODO: should we try returning from backup?
return Collections.emptyList();
}
......@@ -297,7 +301,7 @@ public class DistributedFlowRuleStore
return getFlowEntriesInternal(deviceId);
}
log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
log.debug("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
......@@ -314,12 +318,17 @@ public class DistributedFlowRuleStore
}
}
private synchronized Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
flowEntriesLock.readLock().lock();
try {
Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptySet();
}
return ImmutableSet.copyOf(rules);
} finally {
flowEntriesLock.readLock().unlock();
}
}
@Override
......@@ -327,7 +336,6 @@ public class DistributedFlowRuleStore
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
// FIXME document that all of the FlowEntries must be about same device
@Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
......@@ -351,7 +359,7 @@ public class DistributedFlowRuleStore
return storeBatchInternal(operation);
}
log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
log.debug("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
......@@ -368,13 +376,16 @@ public class DistributedFlowRuleStore
}
}
private synchronized ListenableFuture<CompletedBatchOperation>
private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
final List<StoredFlowEntry> toRemove = new ArrayList<>();
final List<StoredFlowEntry> toAdd = new ArrayList<>();
DeviceId did = null;
flowEntriesLock.writeLock().lock();
try {
for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
FlowRule flowRule = batchEntry.getTarget();
FlowRuleOperation op = batchEntry.getOperator();
......@@ -401,8 +412,10 @@ public class DistributedFlowRuleStore
}
// create remote backup copies
final DeviceId deviceId = did;
updateBackup(deviceId, toAdd, toRemove);
updateBackup(did, toAdd, toRemove);
} finally {
flowEntriesLock.writeLock().unlock();
}
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
......@@ -451,9 +464,11 @@ public class DistributedFlowRuleStore
return null;
}
private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
flowEntriesLock.writeLock().lock();
try {
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = getFlowEntryInternal(rule);
if (stored != null) {
......@@ -472,6 +487,9 @@ public class DistributedFlowRuleStore
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
// TODO: also update backup.
flowEntries.put(did, new DefaultFlowEntry(rule));
} finally {
flowEntriesLock.writeLock().unlock();
}
return null;
}
......@@ -491,8 +509,10 @@ public class DistributedFlowRuleStore
return null;
}
private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
flowEntriesLock.writeLock().lock();
try {
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowEntries.remove(deviceId, rule);
updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
......@@ -501,6 +521,9 @@ public class DistributedFlowRuleStore
} else {
return null;
}
} finally {
flowEntriesLock.writeLock().unlock();
}
}
@Override
......@@ -515,9 +538,9 @@ public class DistributedFlowRuleStore
notifyDelegate(event);
}
private synchronized void loadFromBackup(final DeviceId did) {
// should relax synchronized condition
private void loadFromBackup(final DeviceId did) {
flowEntriesLock.writeLock().lock();
try {
log.info("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
......@@ -534,11 +557,19 @@ public class DistributedFlowRuleStore
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
} finally {
flowEntriesLock.writeLock().unlock();
}
}
private synchronized void removeFromPrimary(final DeviceId did) {
Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
private void removeFromPrimary(final DeviceId did) {
Collection<StoredFlowEntry> removed = null;
flowEntriesLock.writeLock().lock();
try {
removed = flowEntries.removeAll(did);
} finally {
flowEntriesLock.writeLock().unlock();
}
log.debug("removedFromPrimary {}", removed);
}
......