Yuta HIGUCHI

FlowEntry must not be modified outside store.

- Remove set method from FlowEntry
- Storing last seen timestamp for FlowEntry eviction locally on FlowManager.
  FlowEntry eviction based on packet counter will take longer time to timeout
  after master Node change.

Change-Id: I7134d698dd5b9bf7cca379c5ba7c4fbcc2e3d5f3
......@@ -6,7 +6,8 @@ import static org.slf4j.LoggerFactory.getLogger;
import org.onlab.onos.net.DeviceId;
import org.slf4j.Logger;
public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
public class DefaultFlowEntry extends DefaultFlowRule
implements FlowEntry, StoredFlowEntry {
private static final Logger log = getLogger(DefaultFlowEntry.class);
......
......@@ -65,6 +65,7 @@ public interface FlowEntry extends FlowRule {
*/
long bytes();
// TODO: consider removing this attribute
/**
* When this flow entry was last deemed active.
* @return epoch time of last activity
......@@ -72,35 +73,6 @@ public interface FlowEntry extends FlowRule {
long lastSeen();
/**
* Sets the last active epoch time.
*/
void setLastSeen();
/**
* Sets the new state for this entry.
* @param newState new flow entry state.
*/
void setState(FlowEntryState newState);
/**
* Sets how long this entry has been entered in the system.
* @param life epoch time
*/
void setLife(long life);
/**
* Number of packets seen by this entry.
* @param packets a long value
*/
void setPackets(long packets);
/**
* Number of bytes seen by this rule.
* @param bytes a long value
*/
void setBytes(long bytes);
/**
* Indicates the error type.
* @return an integer value of the error
*/
......
package org.onlab.onos.net.flow;
public interface StoredFlowEntry extends FlowEntry {
/**
* Sets the last active epoch time.
*/
void setLastSeen();
/**
* Sets the new state for this entry.
* @param newState new flow entry state.
*/
void setState(FlowEntryState newState);
/**
* Sets how long this entry has been entered in the system.
* @param life epoch time
*/
void setLife(long life);
/**
* Number of packets seen by this entry.
* @param packets a long value
*/
void setPackets(long packets);
/**
* Number of bytes seen by this rule.
* @param bytes a long value
*/
void setBytes(long bytes);
}
......@@ -3,8 +3,8 @@ package org.onlab.onos.net.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
......@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
/**
......@@ -197,6 +198,8 @@ public class FlowRuleManager
extends AbstractProviderService<FlowRuleProvider>
implements FlowRuleProviderService {
final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
super(provider);
}
......@@ -205,6 +208,7 @@ public class FlowRuleManager
public void flowRemoved(FlowEntry flowEntry) {
checkNotNull(flowEntry, FLOW_RULE_NULL);
checkValidity();
lastSeen.remove(flowEntry);
FlowEntry stored = store.getFlowEntry(flowEntry);
if (stored == null) {
log.info("Rule already evicted from store: {}", flowEntry);
......@@ -292,14 +296,25 @@ public class FlowRuleManager
if (storedRule == null) {
return false;
}
long timeout = storedRule.timeout() * 1000;
Long currentTime = System.currentTimeMillis();
final long timeout = storedRule.timeout() * 1000;
final long currentTime = System.currentTimeMillis();
if (storedRule.packets() != swRule.packets()) {
storedRule.setLastSeen();
lastSeen.put(storedRule, currentTime);
return true;
}
if (!lastSeen.containsKey(storedRule)) {
// checking for the first time
lastSeen.put(storedRule, storedRule.lastSeen());
// Use following if lastSeen attr. was removed.
//lastSeen.put(storedRule, currentTime);
}
Long last = lastSeen.get(storedRule);
if (last == null) {
// concurrently removed? let the liveness check fail
return false;
}
if ((currentTime - storedRule.lastSeen()) <= timeout) {
if ((currentTime - last) <= timeout) {
return true;
}
return false;
......@@ -316,10 +331,7 @@ public class FlowRuleManager
public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
Iterator<FlowEntry> switchRulesIterator = flowEntries.iterator();
while (switchRulesIterator.hasNext()) {
FlowEntry rule = switchRulesIterator.next();
for (FlowEntry rule : flowEntries) {
if (storedRules.remove(rule)) {
// we both have the rule, let's update some info then.
flowAdded(rule);
......
......@@ -46,6 +46,7 @@ import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
......@@ -232,7 +233,7 @@ public class FlowRuleManagerTest {
public void flowRemoved() {
FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
FlowEntry fe1 = new DefaultFlowEntry(f1);
StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
service.removeFlowRules(f1);
......
......@@ -26,6 +26,7 @@ import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
......@@ -53,8 +54,8 @@ public class DistributedFlowRuleStore
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
private final Multimap<Short, FlowRule> flowEntriesById =
ArrayListMultimap.<Short, FlowRule>create();
......@@ -99,7 +100,11 @@ public class DistributedFlowRuleStore
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
return getFlowEntryInternal(rule);
}
private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
......@@ -109,7 +114,7 @@ public class DistributedFlowRuleStore
@Override
public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Collection<FlowEntry> rules = flowEntries.get(deviceId);
Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
......@@ -148,7 +153,7 @@ public class DistributedFlowRuleStore
}
private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
// write to local copy.
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
......@@ -182,7 +187,7 @@ public class DistributedFlowRuleStore
}
private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
FlowEntry entry = getFlowEntry(flowRule);
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry == null) {
return;
}
......@@ -215,7 +220,7 @@ public class DistributedFlowRuleStore
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
FlowEntry stored = getFlowEntry(rule);
StoredFlowEntry stored = getFlowEntryInternal(rule);
if (stored != null) {
stored.setBytes(rule.bytes());
stored.setLife(rule.life());
......@@ -227,7 +232,8 @@ public class DistributedFlowRuleStore
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
flowEntries.put(did, new DefaultFlowEntry(rule));
return null;
// TODO: also update backup.
......
......@@ -5,6 +5,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static java.util.Collections.unmodifiableCollection;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -25,6 +26,7 @@ import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
......@@ -43,7 +45,7 @@ public class SimpleFlowRuleStore
// inner Map is Device flow table
// Assumption: FlowId cannot have synonyms
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, FlowEntry>>
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, StoredFlowEntry>>
flowEntries = new ConcurrentHashMap<>();
@Activate
......@@ -61,14 +63,14 @@ public class SimpleFlowRuleStore
@Override
public int getFlowRuleCount() {
int sum = 0;
for (ConcurrentMap<FlowId, FlowEntry> ft : flowEntries.values()) {
for (ConcurrentMap<FlowId, StoredFlowEntry> ft : flowEntries.values()) {
sum += ft.size();
}
return sum;
}
private static NewConcurrentHashMap<FlowId, FlowEntry> lazyEmptyFlowTable() {
return NewConcurrentHashMap.<FlowId, FlowEntry>ifNeeded();
private static NewConcurrentHashMap<FlowId, StoredFlowEntry> lazyEmptyFlowTable() {
return NewConcurrentHashMap.<FlowId, StoredFlowEntry>ifNeeded();
}
/**
......@@ -77,12 +79,12 @@ public class SimpleFlowRuleStore
* @param deviceId identifier of the device
* @return Map representing Flow Table of given device.
*/
private ConcurrentMap<FlowId, FlowEntry> getFlowTable(DeviceId deviceId) {
private ConcurrentMap<FlowId, StoredFlowEntry> getFlowTable(DeviceId deviceId) {
return createIfAbsentUnchecked(flowEntries,
deviceId, lazyEmptyFlowTable());
}
private FlowEntry getFlowEntry(DeviceId deviceId, FlowId flowId) {
private StoredFlowEntry getFlowEntry(DeviceId deviceId, FlowId flowId) {
return getFlowTable(deviceId).get(flowId);
}
......@@ -93,7 +95,8 @@ public class SimpleFlowRuleStore
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
return unmodifiableCollection(getFlowTable(deviceId).values());
return unmodifiableCollection((Collection<? extends FlowEntry>)
getFlowTable(deviceId).values());
}
@Override
......@@ -101,7 +104,7 @@ public class SimpleFlowRuleStore
Set<FlowRule> rules = new HashSet<>();
for (DeviceId did : flowEntries.keySet()) {
ConcurrentMap<FlowId, FlowEntry> ft = getFlowTable(did);
ConcurrentMap<FlowId, StoredFlowEntry> ft = getFlowTable(did);
for (FlowEntry fe : ft.values()) {
if (fe.appId() == appId.id()) {
rules.add(fe);
......@@ -117,7 +120,7 @@ public class SimpleFlowRuleStore
}
private boolean storeFlowRuleInternal(FlowRule rule) {
FlowEntry f = new DefaultFlowEntry(rule);
StoredFlowEntry f = new DefaultFlowEntry(rule);
final DeviceId did = f.deviceId();
final FlowId fid = f.id();
FlowEntry existing = getFlowTable(did).putIfAbsent(fid, f);
......@@ -133,7 +136,7 @@ public class SimpleFlowRuleStore
@Override
public void deleteFlowRule(FlowRule rule) {
FlowEntry entry = getFlowEntry(rule.deviceId(), rule.id());
StoredFlowEntry entry = getFlowEntry(rule.deviceId(), rule.id());
if (entry == null) {
//log.warn("Cannot find rule {}", rule);
return;
......@@ -146,7 +149,7 @@ public class SimpleFlowRuleStore
@Override
public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
// check if this new rule is an update to an existing entry
FlowEntry stored = getFlowEntry(rule.deviceId(), rule.id());
StoredFlowEntry stored = getFlowEntry(rule.deviceId(), rule.id());
if (stored != null) {
synchronized (stored) {
stored.setBytes(rule.bytes());
......@@ -174,7 +177,7 @@ public class SimpleFlowRuleStore
// This is where one could mark a rule as removed and still keep it in the store.
final DeviceId did = rule.deviceId();
ConcurrentMap<FlowId, FlowEntry> ft = getFlowTable(did);
ConcurrentMap<FlowId, StoredFlowEntry> ft = getFlowTable(did);
if (ft.remove(rule.id(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
......