alshabib

implementing flowremoved handling

package org.onlab.onos.of.controller.impl;
import static org.onlab.util.Tools.namedThreads;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
......@@ -13,6 +19,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.of.controller.DefaultOpenFlowPacketContext;
import org.onlab.onos.of.controller.Dpid;
import org.onlab.onos.of.controller.OpenFlowController;
import org.onlab.onos.of.controller.OpenFlowEventListener;
import org.onlab.onos.of.controller.OpenFlowPacketContext;
import org.onlab.onos.of.controller.OpenFlowSwitch;
import org.onlab.onos.of.controller.OpenFlowSwitchListener;
......@@ -22,10 +29,12 @@ import org.onlab.onos.of.controller.driver.OpenFlowAgent;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@Component(immediate = true)
......@@ -35,6 +44,10 @@ public class OpenFlowControllerImpl implements OpenFlowController {
private static final Logger log =
LoggerFactory.getLogger(OpenFlowControllerImpl.class);
private final ExecutorService executor = Executors.newFixedThreadPool(16,
namedThreads("of-event-dispatch-%d"));
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
......@@ -43,11 +56,12 @@ public class OpenFlowControllerImpl implements OpenFlowController {
new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
protected Set<OpenFlowSwitchListener> ofEventListener = new HashSet<>();
protected Set<OpenFlowSwitchListener> ofSwitchListener = new HashSet<>();
protected Multimap<Integer, PacketListener> ofPacketListener =
ArrayListMultimap.create();
protected Map<OFType, List<OpenFlowEventListener>> ofEventListener = Maps.newHashMap();
private final Controller ctrl = new Controller();
......@@ -93,14 +107,14 @@ public class OpenFlowControllerImpl implements OpenFlowController {
@Override
public void addListener(OpenFlowSwitchListener listener) {
if (!ofEventListener.contains(listener)) {
this.ofEventListener.add(listener);
if (!ofSwitchListener.contains(listener)) {
this.ofSwitchListener.add(listener);
}
}
@Override
public void removeListener(OpenFlowSwitchListener listener) {
this.ofEventListener.remove(listener);
this.ofSwitchListener.remove(listener);
}
@Override
......@@ -122,7 +136,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
public void processPacket(Dpid dpid, OFMessage msg) {
switch (msg.getType()) {
case PORT_STATUS:
for (OpenFlowSwitchListener l : ofEventListener) {
for (OpenFlowSwitchListener l : ofSwitchListener) {
l.portChanged(dpid, (OFPortStatus) msg);
}
break;
......@@ -134,6 +148,12 @@ public class OpenFlowControllerImpl implements OpenFlowController {
p.handlePacket(pktCtx);
}
break;
case FLOW_REMOVED:
case ERROR:
case STATS_REPLY:
case BARRIER_REPLY:
executor.submit(new OFMessageHandler(dpid, msg));
break;
default:
log.warn("Handling message type {} not yet implemented {}",
msg.getType(), msg);
......@@ -164,7 +184,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
} else {
log.error("Added switch {}", dpid);
connectedSwitches.put(dpid, sw);
for (OpenFlowSwitchListener l : ofEventListener) {
for (OpenFlowSwitchListener l : ofSwitchListener) {
l.switchAdded(dpid);
}
return true;
......@@ -277,7 +297,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
if (sw == null) {
sw = activeEqualSwitches.remove(dpid);
}
for (OpenFlowSwitchListener l : ofEventListener) {
for (OpenFlowSwitchListener l : ofSwitchListener) {
l.switchRemoved(dpid);
}
}
......@@ -288,5 +308,26 @@ public class OpenFlowControllerImpl implements OpenFlowController {
}
}
private final class OFMessageHandler implements Runnable {
private final OFMessage msg;
private final Dpid dpid;
public OFMessageHandler(Dpid dpid, OFMessage msg) {
this.msg = msg;
this.dpid = dpid;
}
@Override
public void run() {
List<OpenFlowEventListener> listeners =
ofEventListener.get(OFType.FLOW_REMOVED);
for (OpenFlowEventListener listener : listeners) {
listener.handleMessage(dpid, msg);
}
}
}
}
......
......@@ -46,7 +46,7 @@ public class OpenFlowCorePacketContext extends DefaultPacketContext {
private void sendBufferedPacket() {
List<Instruction> ins = treatmentBuilder().build().instructions();
OFPort p = null;
//TODO: support arbitrary list of treatments
//TODO: support arbitrary list of treatments must be supported in ofPacketContext
for (Instruction i : ins) {
if (i.type() == Type.OUTPUT) {
p = buildPort(((OutputInstruction) i).port());
......
......@@ -118,7 +118,7 @@ public final class IpAddress {
if (mask > MAX_INET_MASK) {
throw new IllegalArgumentException(
"Value of subnet mask cannot exceed "
+ MAX_INET_MASK);
+ MAX_INET_MASK);
}
}
......@@ -200,7 +200,7 @@ public final class IpAddress {
byte [] net = new byte [4];
byte [] mask = bytes(mask());
for (int i = 0; i < INET_LEN; i++) {
net[i] = (byte) (octets[i] & mask[i]);
net[i] = (byte) (octets[i] & mask[i]);
}
return new IpAddress(version, net, netmask);
}
......@@ -221,11 +221,15 @@ public final class IpAddress {
byte [] host = new byte [INET_LEN];
byte [] mask = bytes(mask());
for (int i = 0; i < INET_LEN; i++) {
host[i] = (byte) (octets[i] & ~mask[i]);
host[i] = (byte) (octets[i] & ~mask[i]);
}
return new IpAddress(version, host, netmask);
}
public boolean isMasked() {
return mask() != 0;
}
@Override
public int hashCode() {
final int prime = 31;
......