Madan Jampani
Committed by Gerrit Code Review

Moving Dist flow rule store backup mechanism to use EC Map

Change-Id: I465cc2424004721bf09505ac9cde068884f04940
......@@ -22,7 +22,7 @@ import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
public class DefaultFlowEntry extends DefaultFlowRule
implements FlowEntry, StoredFlowEntry {
implements StoredFlowEntry {
private static final Logger log = getLogger(DefaultFlowEntry.class);
......
......@@ -357,6 +357,10 @@ public class EventuallyConsistentMapImpl<K, V>
}
private boolean removeInternal(K key, Timestamp timestamp) {
if (timestamp == null) {
return false;
}
counter.incrementCount();
final MutableBoolean updated = new MutableBoolean(false);
......
......@@ -15,29 +15,25 @@
*/
package org.onosproject.store.flow.impl;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
......@@ -56,28 +52,31 @@ import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -90,8 +89,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
......@@ -103,7 +100,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
......@@ -111,10 +108,7 @@ public class DistributedFlowRuleStore
// TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private InternalFlowTable flowTable = new InternalFlowTable();
/*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();*/
private InternalFlowTable flowTable;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
......@@ -129,21 +123,15 @@ public class DistributedFlowRuleStore
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
private ExecutorService messageHandlingExecutor;
private final ExecutorService backupExecutors =
BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
//Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
private boolean syncBackup = false;
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......@@ -162,20 +150,13 @@ public class DistributedFlowRuleStore
private IdGenerator idGenerator;
@Override
@Activate
public void activate() {
super.serializer = SERIALIZER;
super.theInstance = storeService.getHazelcastInstance();
flowTable = new InternalFlowTable();
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
// Cache to create SMap on demand
smaps = CacheBuilder.newBuilder()
.softValues()
.build(new SMapLoader());
final NodeId local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
......@@ -214,7 +195,7 @@ public class DistributedFlowRuleStore
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Set<StoredFlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
......@@ -315,7 +296,7 @@ public class DistributedFlowRuleStore
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return flowTable.getFlowEntries(deviceId);
return flowTable.getFlowEntries(deviceId).stream().collect(Collectors.toSet());
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
......@@ -412,7 +393,6 @@ public class DistributedFlowRuleStore
new CompletedBatchOperation(true, Collections.emptySet(), did)));
return;
}
updateBackup(did, currentOps);
notifyDelegate(FlowRuleBatchEvent.requested(new
FlowRuleBatchRequest(operation.id(),
......@@ -451,19 +431,6 @@ public class DistributedFlowRuleStore
).filter(op -> op != null).collect(Collectors.toSet());
}
private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
if (syncBackup) {
// wait for backup to complete
try {
backup.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create backups", e);
}
}
}
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(
......@@ -479,7 +446,7 @@ public class DistributedFlowRuleStore
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
final NodeId localId = clusterService.getLocalNode().id();
if (localId.equals(replicaInfo.master().orNull())) {
return addOrUpdateFlowRuleInternal(rule);
return addOrUpdateFlowRuleInternal((StoredFlowEntry) rule);
}
log.warn("Tried to update FlowRule {} state,"
......@@ -487,10 +454,7 @@ public class DistributedFlowRuleStore
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
private FlowRuleEvent addOrUpdateFlowRuleInternal(StoredFlowEntry rule) {
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = flowTable.getFlowEntry(rule);
if (stored != null) {
......@@ -499,21 +463,15 @@ public class DistributedFlowRuleStore
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
updateBackup(did, Sets.newHashSet(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
// TODO: also update backup if the behavior is correct.
flowTable.add(rule);
return null;
}
@Override
......@@ -554,9 +512,6 @@ public class DistributedFlowRuleStore
final DeviceId deviceId = rule.deviceId();
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
updateBackup(deviceId, Sets.newHashSet(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
......@@ -583,33 +538,10 @@ public class DistributedFlowRuleStore
}
}
private void loadFromBackup(final DeviceId did) {
try {
log.debug("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
: backupFlowTable.entrySet()) {
log.trace("loading {}", e.getValue());
for (StoredFlowEntry entry : e.getValue()) {
flowTable.getFlowEntriesById(entry).remove(entry);
flowTable.getFlowEntriesById(entry).add(entry);
}
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
}
}
private void removeFromPrimary(final DeviceId did) {
flowTable.clearDevice(did);
}
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
......@@ -626,10 +558,11 @@ public class DistributedFlowRuleStore
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
Set<FlowRule> failures = new HashSet<>(operation.size());
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
}
Set<FlowRule> failures = operation.getOperations()
.stream()
.map(FlowRuleBatchEntry::target)
.collect(Collectors.toSet());
CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
// This node is no longer the master, respond as all failed.
// TODO: we might want to wrap response in envelope
......@@ -650,17 +583,6 @@ public class DistributedFlowRuleStore
}
}
private final class SMapLoader
extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
@Override
public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
throws Exception {
IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
}
}
private final class InternalReplicaInfoEventListener
implements ReplicaInfoEventListener {
......@@ -673,9 +595,10 @@ public class DistributedFlowRuleStore
switch (event.type()) {
case MASTER_CHANGED:
if (local.equals(rInfo.master().orNull())) {
log.info("{} is now the master for {}. Will load flow rules from backup", local, did);
// This node is the new master, populate local structure
// from backup
loadFromBackup(did);
flowTable.loadFromBackup(did);
}
//else {
// This node is no longer the master holder,
......@@ -692,146 +615,122 @@ public class DistributedFlowRuleStore
}
}
// Task to update FlowEntries in backup HZ store
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
private final Set<FlowRuleBatchEntry> ops;
private class InternalFlowTable {
public UpdateBackup(DeviceId deviceId,
Set<FlowRuleBatchEntry> ops) {
this.deviceId = checkNotNull(deviceId);
this.ops = checkNotNull(ops);
private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
flowEntries = Maps.newConcurrentMap();
}
@Override
public void run() {
try {
log.trace("update backup {} {}", deviceId, ops
);
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
ops.stream().forEach(
op -> {
final FlowRule entry = op.target();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
if (original != null) {
list.addAll(original);
}
list.remove(op.target());
if (op.operator() == FlowRuleOperation.ADD) {
list.add((StoredFlowEntry) entry);
}
private final KryoNamespace.Builder flowSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipBasedTimestamp.class);
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
if (original == null) {
success = (backupFlowTable.putIfAbsent(id, newValue) == null);
} else {
success = backupFlowTable.replace(id, original, newValue);
}
if (!success) {
log.error("Updating backup failed.");
}
}
);
} catch (ExecutionException e) {
log.error("Failed to write to backups", e);
private final ClockService<FlowId, StoredFlowEntry> clockService =
new ClockService<FlowId, StoredFlowEntry>() {
@Override
public Timestamp getTimestamp(FlowId flowId, StoredFlowEntry flowEntry) {
if (flowEntry == null) {
return null;
}
return deviceClockService.getTimestamp(flowEntry.deviceId());
}
};
private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
new EventuallyConsistentMapImpl<>("flow-backup",
clusterService,
clusterCommunicator,
flowSerializer,
clockService,
(key, flowEntry) -> getPeerNodes());
private Collection<NodeId> getPeerNodes() {
List<NodeId> nodes = clusterService.getNodes()
.stream()
.map(node -> node.id())
.filter(id -> !id.equals(clusterService.getLocalNode().id()))
.collect(Collectors.toList());
if (nodes.isEmpty()) {
return ImmutableList.of();
} else {
Collections.shuffle(nodes);
return ImmutableList.of(nodes.get(0));
}
}
}
private class InternalFlowTable {
/*
TODO: This needs to be cleaned up. Perhaps using the eventually consistent
map when it supports distributed to a sequence of instances.
*/
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
public void loadFromBackup(DeviceId deviceId) {
ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
}
/**
* Returns the flow table for specified device.
*
* @param deviceId identifier of the device
* @return Map representing Flow Table of given device.
*/
private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
return createIfAbsentUnchecked(flowEntries,
deviceId, lazyEmptyFlowTable());
backupMap.values()
.stream()
.filter(entry -> entry.deviceId().equals(deviceId))
.forEach(entry -> flowTable.computeIfPresent(entry.id(), (k, v) -> {
if (v == null) {
return Sets.newHashSet(entry);
} else {
v.add(entry);
}
return v;
}));
flowEntries.putIfAbsent(deviceId, flowTable);
}
private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
Set<StoredFlowEntry> r = flowTable.get(flowId);
if (r == null) {
final Set<StoredFlowEntry> concurrentlyAdded;
r = new CopyOnWriteArraySet<>();
concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
}
}
return r;
return flowEntries
.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
.computeIfAbsent(flowId, k -> new CopyOnWriteArraySet<>());
}
private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
if (f.equals(rule)) {
return f;
}
}
return null;
return getFlowEntriesInternal(rule.deviceId(), rule.id())
.stream()
.filter(element -> element.equals(rule))
.findFirst()
.orElse(null);
}
private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
return getFlowTable(deviceId).values().stream()
.flatMap((list -> list.stream())).collect(Collectors.toSet());
private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Set<StoredFlowEntry> entries = Sets.newHashSet();
flowEntries.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
.values()
.forEach(entries::addAll);
return entries;
}
public StoredFlowEntry getFlowEntry(FlowRule rule) {
return getFlowEntryInternal(rule);
}
public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
public Set<StoredFlowEntry> getFlowEntries(DeviceId deviceId) {
return getFlowEntriesInternal(deviceId);
}
public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
return getFlowEntriesInternal(entry.deviceId(), entry.id());
}
public void add(StoredFlowEntry rule) {
getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule);
public void add(FlowEntry rule) {
((CopyOnWriteArraySet)
getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
try {
backupMap.put(rule.id(), rule);
} catch (Exception e) {
log.warn("Failed to backup flow rule", e);
}
}
public boolean remove(DeviceId deviceId, FlowEntry rule) {
return ((CopyOnWriteArraySet)
getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
//return flowEntries.remove(deviceId, rule);
boolean status =
getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
if (status) {
try {
backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
} catch (Exception e) {
log.warn("Failed to remove backup of flow rule", e);
}
}
return status;
}
public void clearDevice(DeviceId did) {
flowEntries.remove(did);
// Flow entries should continue to remain in backup map.
}
}
}
}
\ No newline at end of file
......