tom

Merge remote-tracking branch 'origin/master'

......@@ -9,6 +9,7 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.Path;
......@@ -53,14 +54,18 @@ public class ReactiveForwarding {
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
packetService.addProcessor(processor, PacketProcessor.ADVISOR_MAX + 1);
log.info("Started");
log.info("Started with Application ID {}", appId.id());
}
@Deactivate
public void deactivate() {
flowRuleService.removeFlowRulesById(appId);
packetService.removeProcessor(processor);
processor = null;
log.info("Stopped");
......@@ -169,7 +174,7 @@ public class ReactiveForwarding {
treat.add(Instructions.createOutput(portNumber));
FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
builder.build(), treat.build(), 0);
builder.build(), treat.build(), 0, appId);
flowRuleService.applyFlowRules(f);
}
......
package org.onlab.onos;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Application id generator class.
*/
public final class ApplicationId {
private static AtomicInteger idDispenser;
private final Integer id;
// Ban public construction
private ApplicationId(Integer id) {
this.id = id;
}
public Integer id() {
return id;
}
public static ApplicationId valueOf(Integer id) {
return new ApplicationId(id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof ApplicationId)) {
return false;
}
ApplicationId other = (ApplicationId) obj;
return Objects.equals(this.id, other.id);
}
/**
* Returns a new application id.
*
* @return app id
*/
public static ApplicationId getAppId() {
if (ApplicationId.idDispenser == null) {
ApplicationId.idDispenser = new AtomicInteger(1);
}
return new ApplicationId(ApplicationId.idDispenser.getAndIncrement());
}
}
......@@ -5,6 +5,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Objects;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.slf4j.Logger;
......@@ -24,6 +25,8 @@ public class DefaultFlowRule implements FlowRule {
private final FlowId id;
private final ApplicationId appId;
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowRuleState state,
long life, long packets, long bytes, long flowId) {
......@@ -32,7 +35,7 @@ public class DefaultFlowRule implements FlowRule {
this.selector = selector;
this.treatment = treatment;
this.state = state;
this.appId = ApplicationId.valueOf((int) (flowId >> 32));
this.id = FlowId.valueOf(flowId);
this.life = life;
......@@ -42,18 +45,18 @@ public class DefaultFlowRule implements FlowRule {
}
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatement, int priority) {
this(deviceId, selector, treatement, priority, FlowRuleState.CREATED);
TrafficTreatment treatement, int priority, ApplicationId appId) {
this(deviceId, selector, treatement, priority, FlowRuleState.CREATED, appId);
}
public DefaultFlowRule(FlowRule rule, FlowRuleState state) {
this(rule.deviceId(), rule.selector(), rule.treatment(),
rule.priority(), state, rule.id());
rule.priority(), state, rule.id(), rule.appId());
}
private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment,
int priority, FlowRuleState state) {
int priority, FlowRuleState state, ApplicationId appId) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
......@@ -62,13 +65,15 @@ public class DefaultFlowRule implements FlowRule {
this.life = 0;
this.packets = 0;
this.bytes = 0;
this.id = FlowId.valueOf(this.hashCode());
this.appId = appId;
this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
this.created = System.currentTimeMillis();
}
private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment,
int priority, FlowRuleState state, FlowId flowId) {
int priority, FlowRuleState state, FlowId flowId, ApplicationId appId) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
......@@ -77,6 +82,7 @@ public class DefaultFlowRule implements FlowRule {
this.life = 0;
this.packets = 0;
this.bytes = 0;
this.appId = appId;
this.id = flowId;
this.created = System.currentTimeMillis();
}
......@@ -88,6 +94,11 @@ public class DefaultFlowRule implements FlowRule {
}
@Override
public ApplicationId appId() {
return appId;
}
@Override
public int priority() {
return priority;
}
......@@ -136,7 +147,11 @@ public class DefaultFlowRule implements FlowRule {
* @see java.lang.Object#equals(java.lang.Object)
*/
public int hashCode() {
return Objects.hash(deviceId, selector, treatment);
return Objects.hash(deviceId, id);
}
public int hash() {
return Objects.hash(deviceId, selector, id);
}
@Override
......
package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
/**
......@@ -53,6 +54,13 @@ public interface FlowRule {
FlowId id();
/**
* Returns the application id of this flow.
*
* @return an applicationId
*/
ApplicationId appId();
/**
* Returns the flow rule priority given in natural order; higher numbers
* mean higher priorities.
*
......
package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.provider.Provider;
/**
......@@ -25,4 +26,10 @@ public interface FlowRuleProvider extends Provider {
*/
void removeFlowRule(FlowRule... flowRules);
/**
* Removes rules by their id.
* @param id the id to remove
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
}
......
package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
/**
......@@ -43,6 +44,20 @@ public interface FlowRuleService {
*/
void removeFlowRules(FlowRule... flowRules);
/**
* Removes all rules by id.
*
* @param appId id to remove
*/
void removeFlowRulesById(ApplicationId appId);
/**
* Returns a list of rules with this application id.
*
* @param id the id to look up
* @return collection of flow rules
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
/**
* Adds the specified flow rule listener.
......@@ -58,4 +73,6 @@ public interface FlowRuleService {
*/
void removeListener(FlowRuleListener listener);
}
......
package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Store;
......@@ -9,6 +10,13 @@ import org.onlab.onos.store.Store;
public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
/**
* Returns the stored flow.
* @param rule the rule to look for
* @return a flow rule
*/
FlowRule getFlowRule(FlowRule rule);
/**
* Returns the flow entries associated with a device.
*
* @param deviceId the device ID
......@@ -17,6 +25,14 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
Iterable<FlowRule> getFlowEntries(DeviceId deviceId);
/**
* Returns the flow entries associated with an application.
*
* @param appId the application id
* @return the flow entries
*/
Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId);
/**
* Stores a new flow rule without generating events.
*
* @param rule the flow rule to add
......
......@@ -7,14 +7,13 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -81,7 +80,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
@Override
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = new DefaultFlowRule(flowRules[i], FlowRuleState.PENDING_ADD);
FlowRule f = flowRules[i];
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
store.storeFlowRule(f);
......@@ -92,14 +91,33 @@ implements FlowRuleService, FlowRuleProviderRegistry {
@Override
public void removeFlowRules(FlowRule... flowRules) {
FlowRule f;
FlowRuleProvider frp;
Device device;
for (int i = 0; i < flowRules.length; i++) {
f = new DefaultFlowRule(flowRules[i], FlowRuleState.PENDING_REMOVE);
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
f = flowRules[i];
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
store.deleteFlowRule(f);
frp.removeFlowRule(f);
}
}
@Override
public void removeFlowRulesById(ApplicationId id) {
Iterable<FlowRule> rules = getFlowRulesById(id);
FlowRuleProvider frp;
Device device;
for (FlowRule f : rules) {
store.deleteFlowRule(f);
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
frp.removeRulesById(id, f);
}
}
@Override
public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
return store.getFlowEntriesByAppId(id);
}
@Override
......@@ -130,8 +148,27 @@ implements FlowRuleService, FlowRuleProviderRegistry {
public void flowRemoved(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
FlowRuleEvent event = store.removeFlowRule(flowRule);
FlowRule stored = store.getFlowRule(flowRule);
if (stored == null) {
log.debug("Rule already evicted from store: {}", flowRule);
return;
}
Device device = deviceService.getDevice(flowRule.deviceId());
FlowRuleProvider frp = getProvider(device.providerId());
FlowRuleEvent event = null;
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
break;
case PENDING_REMOVE:
case REMOVED:
event = store.removeFlowRule(flowRule);
break;
default:
break;
}
if (event != null) {
log.debug("Flow {} removed", flowRule);
post(event);
......@@ -142,7 +179,22 @@ implements FlowRuleService, FlowRuleProviderRegistry {
public void flowMissing(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
log.debug("Flow {} has not been installed.", flowRule);
Device device = deviceService.getDevice(flowRule.deviceId());
FlowRuleProvider frp = getProvider(device.providerId());
switch (flowRule.state()) {
case PENDING_REMOVE:
case REMOVED:
store.removeFlowRule(flowRule);
frp.removeFlowRule(flowRule);
break;
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
break;
default:
log.debug("Flow {} has not been installed.", flowRule);
}
}
......@@ -150,6 +202,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
public void extraneousFlow(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
removeFlowRules(flowRule);
log.debug("Flow {} is on switch but not in store.", flowRule);
}
......
......@@ -43,8 +43,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
......@@ -182,7 +185,7 @@ public class DistributedDeviceManagerTest {
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED);
validateEvents(DEVICE_UPDATED, DEVICE_UPDATED);
}
@Test
......@@ -251,12 +254,16 @@ public class DistributedDeviceManagerTest {
}
protected void validateEvents(Enum... types) {
int i = 0;
assertEquals("wrong events received", types.length, listener.events.size());
for (Event event : listener.events) {
assertEquals("incorrect event type", types[i], event.type());
i++;
for (Enum type : types) {
try {
Event event = listener.events.poll(1, TimeUnit.SECONDS);
assertNotNull("Timed out waiting for " + event, event);
assertEquals("incorrect event type", type, event.type());
} catch (InterruptedException e) {
fail("Unexpected interrupt");
}
}
assertTrue("Unexpected events left", listener.events.isEmpty());
listener.events.clear();
}
......@@ -281,7 +288,7 @@ public class DistributedDeviceManagerTest {
}
private static class TestListener implements DeviceListener {
final List<DeviceEvent> events = new ArrayList<>();
final BlockingQueue<DeviceEvent> events = new LinkedBlockingQueue<>();
@Override
public void event(DeviceEvent event) {
......
......@@ -14,6 +14,7 @@ import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.Device;
......@@ -61,6 +62,7 @@ public class FlowRuleManagerTest {
protected FlowRuleProviderService providerService;
protected TestProvider provider;
protected TestListener listener = new TestListener();
private ApplicationId appId;
@Before
public void setUp() {
......@@ -75,6 +77,7 @@ public class FlowRuleManagerTest {
mgr.addListener(listener);
provider = new TestProvider(PID);
providerService = registry.register(provider);
appId = ApplicationId.getAppId();
assertTrue("provider should be registered",
registry.getProviders().contains(provider.id()));
}
......@@ -93,7 +96,7 @@ public class FlowRuleManagerTest {
private FlowRule flowRule(int tsval, int trval) {
TestSelector ts = new TestSelector(tsval);
TestTreatment tr = new TestTreatment(trval);
return new DefaultFlowRule(DID, ts, tr, 0);
return new DefaultFlowRule(DID, ts, tr, 0, appId);
}
private FlowRule flowRule(FlowRule rule, FlowRuleState state) {
......@@ -159,8 +162,8 @@ public class FlowRuleManagerTest {
public void applyFlowRules() {
FlowRule r1 = flowRule(1, 1);
FlowRule r2 = flowRule(1, 2);
FlowRule r3 = flowRule(1, 3);
FlowRule r2 = flowRule(2, 2);
FlowRule r3 = flowRule(3, 3);
assertTrue("store should be empty",
Sets.newHashSet(service.getFlowEntries(DID)).isEmpty());
......@@ -196,6 +199,7 @@ public class FlowRuleManagerTest {
@Test
public void flowRemoved() {
FlowRule f1 = addFlowRule(1);
service.removeFlowRules(f1);
addFlowRule(2);
FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
providerService.flowRemoved(rem1);
......@@ -293,6 +297,11 @@ public class FlowRuleManagerTest {
public void removeFlowRule(FlowRule... flowRules) {
}
@Override
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
}
private class TestSelector implements TrafficSelector {
......
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Predicates.notNull;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -95,6 +98,7 @@ public class DistributedDeviceStore
rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
loadDeviceCache();
loadDevicePortsCache();
log.info("Started");
}
......@@ -122,13 +126,16 @@ public class DistributedDeviceStore
}
private void loadDeviceCache() {
log.info("{}:{}", rawDevices.size(), devices.size());
if (rawDevices.size() != devices.size()) {
for (Map.Entry<byte[], byte[]> e : rawDevices.entrySet()) {
final DeviceId key = deserialize(e.getKey());
final DefaultDevice val = deserialize(e.getValue());
devices.put(key, Optional.of(val));
}
for (byte[] keyBytes : rawDevices.keySet()) {
final DeviceId id = deserialize(keyBytes);
devices.refresh(id);
}
}
private void loadDevicePortsCache() {
for (byte[] keyBytes : rawDevicePorts.keySet()) {
final DeviceId id = deserialize(keyBytes);
devicePorts.refresh(id);
}
}
......@@ -180,10 +187,12 @@ public class DistributedDeviceStore
desc.swVersion(),
desc.serialNumber());
synchronized (this) {
final byte[] deviceIdBytes = serialize(device.id());
rawDevices.put(deviceIdBytes, serialize(updated));
devices.put(device.id(), Optional.of(updated));
availableDevices.add(serialize(device.id()));
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, device, null);
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
}
// Otherwise merely attempt to change availability
......@@ -227,7 +236,7 @@ public class DistributedDeviceStore
events.addAll(pruneOldPorts(device, ports, processed));
}
return events;
return FluentIterable.from(events).filter(notNull()).toList();
}
// Creates a new port based on the port description adds it to the map and
......@@ -254,7 +263,7 @@ public class DistributedDeviceStore
portDescription.isEnabled());
ports.put(port.number(), updatedPort);
updatePortMap(device.id(), ports);
return new DeviceEvent(PORT_UPDATED, device, port);
return new DeviceEvent(PORT_UPDATED, device, updatedPort);
}
return null;
}
......@@ -351,17 +360,17 @@ public class DistributedDeviceStore
@Override
protected void onAdd(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_ADDED, device));
notifyDelegate(new DeviceEvent(DEVICE_ADDED, device));
}
@Override
protected void onRemove(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_REMOVED, device));
notifyDelegate(new DeviceEvent(DEVICE_REMOVED, device));
}
@Override
protected void onUpdate(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_UPDATED, device));
notifyDelegate(new DeviceEvent(DEVICE_UPDATED, device));
}
}
......@@ -372,17 +381,17 @@ public class DistributedDeviceStore
@Override
protected void onAdd(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_ADDED, getDevice(deviceId)));
// notifyDelegate(new DeviceEvent(PORT_ADDED, getDevice(deviceId)));
}
@Override
protected void onRemove(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_REMOVED, getDevice(deviceId)));
// notifyDelegate(new DeviceEvent(PORT_REMOVED, getDevice(deviceId)));
}
@Override
protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
// notifyDelegate(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
}
}
......
......@@ -107,7 +107,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
V val = deserialize(event.getOldValue());
cache.invalidate(key);
onRemove(key, val);
}
......
package org.onlab.onos.net.trivial.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -33,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -123,7 +126,7 @@ public class SimpleDeviceStore
devices.put(device.id(), updated);
availableDevices.add(device.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, device, null);
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
}
// Otherwise merely attempt to change availability
......@@ -165,7 +168,7 @@ public class SimpleDeviceStore
events.addAll(pruneOldPorts(device, ports, processed));
}
return events;
return FluentIterable.from(events).filter(notNull()).toList();
}
// Creates a new port based on the port description adds it to the map and
......
......@@ -4,12 +4,18 @@ import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Collections;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
......@@ -33,7 +39,11 @@ public class SimpleFlowRuleStore
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, FlowRule> flowEntries = ArrayListMultimap.create();
private final Multimap<DeviceId, FlowRule> flowEntries =
ArrayListMultimap.<DeviceId, FlowRule>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@Activate
public void activate() {
......@@ -45,48 +55,76 @@ public class SimpleFlowRuleStore
log.info("Stopped");
}
@Override
public Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
return ImmutableSet.copyOf(flowEntries.get(deviceId));
public synchronized FlowRule getFlowRule(FlowRule rule) {
for (FlowRule f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
}
return null;
}
@Override
public void storeFlowRule(FlowRule rule) {
DeviceId did = rule.deviceId();
flowEntries.put(did, rule);
public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
Collection<FlowRule> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public void deleteFlowRule(FlowRule rule) {
DeviceId did = rule.deviceId();
public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public synchronized void storeFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
flowEntriesById.put(rule.appId(), f);
}
}
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
DeviceId did = f.deviceId();
/*
* find the rule and mark it for deletion.
* Ultimately a flow removed will come remove it.
*/
if (flowEntries.containsEntry(did, rule)) {
synchronized (flowEntries) {
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
}
if (flowEntries.containsEntry(did, f)) {
//synchronized (flowEntries) {
flowEntries.remove(did, f);
flowEntries.put(did, f);
flowEntriesById.remove(rule.appId(), rule);
//}
}
}
@Override
public FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
if (flowEntries.containsEntry(did, rule)) {
synchronized (flowEntries) {
// Multimaps support duplicates so we have to remove our rule
// and replace it with the current version.
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
}
//synchronized (flowEntries) {
// Multimaps support duplicates so we have to remove our rule
// and replace it with the current version.
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
//}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
......@@ -95,16 +133,20 @@ public class SimpleFlowRuleStore
}
@Override
public FlowRuleEvent removeFlowRule(FlowRule rule) {
synchronized (this) {
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
//synchronized (this) {
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
//}
}
}
......
......@@ -183,7 +183,7 @@ public class FlowRuleBuilder {
break;
case ETH_DST:
MacAddress dMac = MacAddress.valueOf(match.get(MatchField.ETH_DST).getLong());
builder.add(Criteria.matchEthSrc(dMac));
builder.add(Criteria.matchEthDst(dMac));
break;
case ETH_TYPE:
int ethType = match.get(MatchField.ETH_TYPE).getValue();
......
......@@ -9,6 +9,7 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -102,12 +103,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private void removeRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
}
@Override
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
// TODO: optimize using the ApplicationId
removeFlowRule(flowRules);
}
//TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
// possibly barriers as well. May not be internal at all...
......@@ -179,4 +185,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
}
......