Aaron Kruglikov
Committed by Gerrit Code Review

Adding an option for persistent flow storage.

Change-Id: I1dd70c9f2ea9cd99ef5a55eaa4b54968f7d3c55f
......@@ -59,6 +59,7 @@ import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
......@@ -74,6 +75,7 @@ import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
......@@ -113,6 +115,7 @@ public class NewDistributedFlowRuleStore
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final boolean DEFAULT_BACKUP_ENABLED = true;
private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
// number of devices whose flow entries will be backed up in one communication round
......@@ -129,6 +132,9 @@ public class NewDistributedFlowRuleStore
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
label = "Delay in ms between successive backup runs")
private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
@Property(name = "persistenceEnabled", boolValue = false,
label = "Indicates whether or not changes in the flow table should be persisted to disk.")
private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
private InternalFlowTable flowTable = new InternalFlowTable();
......@@ -153,6 +159,9 @@ public class NewDistributedFlowRuleStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
private ExecutorService messageHandlingExecutor;
......@@ -716,7 +725,25 @@ public class NewDistributedFlowRuleStore
* @return Map representing Flow Table of given device.
*/
private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
if (persistenceEnabled) {
return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
.<FlowId, Set<StoredFlowEntry>>persistentMapBuilder()
.withName("FlowTable:" + deviceId.toString())
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
return SERIALIZER.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
return SERIALIZER.decode(bytes);
}
})
.build());
} else {
return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
}
}
private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
......