Madan Jampani

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

......@@ -188,7 +188,7 @@ public final class DefaultTrafficSelector implements TrafficSelector {
}
@Override
public Builder matchOpticalSignalType(Byte signalType) {
public Builder matchOpticalSignalType(Short signalType) {
return add(Criteria.matchOpticalSignalType(signalType));
}
......
......@@ -27,11 +27,14 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
*/
public enum Type {
// Request has been forwarded to MASTER Node
/**
* Signifies that a batch operation has been initiated.
*/
BATCH_OPERATION_REQUESTED,
// MASTER Node has pushed the batch down to the Device
// (e.g., Received barrier reply)
/**
* Signifies that a batch operation has completed.
*/
......
......@@ -25,29 +25,29 @@ import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final int batchId;
private final List<FlowEntry> toAdd;
private final List<FlowEntry> toRemove;
private final List<FlowRule> toAdd;
private final List<FlowRule> toRemove;
public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) {
public FlowRuleBatchRequest(int batchId, List<? extends FlowRule> toAdd, List<? extends FlowRule> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
public List<FlowEntry> toAdd() {
public List<FlowRule> toAdd() {
return toAdd;
}
public List<FlowEntry> toRemove() {
public List<FlowRule> toRemove() {
return toRemove;
}
public FlowRuleBatchOperation asBatchOperation() {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
for (FlowEntry e : toAdd) {
for (FlowRule e : toAdd) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
}
for (FlowEntry e : toRemove) {
for (FlowRule e : toRemove) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
}
return new FlowRuleBatchOperation(entries);
......
......@@ -147,7 +147,7 @@ public interface TrafficSelector {
* @param signalType
* @return a selection builder
*/
public Builder matchOpticalSignalType(Byte signalType);
public Builder matchOpticalSignalType(Short signalType);
/**
* Builds an immutable traffic selector.
......
......@@ -161,11 +161,11 @@ public final class Criteria {
/**
* Creates a match on lambda field using the specified value.
*
* @param lambda
* @param sigType
* @return match criterion
*/
public static Criterion matchOpticalSignalType(Byte lambda) {
return new OpticalSignalTypeCriterion(lambda, Type.OCH_SIGTYPE);
public static Criterion matchOpticalSignalType(Short sigType) {
return new OpticalSignalTypeCriterion(sigType, Type.OCH_SIGTYPE);
}
......@@ -587,10 +587,10 @@ public final class Criteria {
public static final class OpticalSignalTypeCriterion implements Criterion {
private final byte signalType;
private final Short signalType;
private final Type type;
public OpticalSignalTypeCriterion(byte signalType, Type type) {
public OpticalSignalTypeCriterion(Short signalType, Type type) {
this.signalType = signalType;
this.type = type;
}
......@@ -600,7 +600,7 @@ public final class Criteria {
return this.type;
}
public Byte signalType() {
public Short signalType() {
return this.signalType;
}
......
......@@ -371,10 +371,11 @@ public class FlowRuleManager
final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
for (FlowEntry entry : request.toAdd()) {
// Request has been forwarded to MASTER Node, and was
for (FlowRule entry : request.toAdd()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
}
for (FlowEntry entry : request.toRemove()) {
for (FlowRule entry : request.toRemove()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
}
// FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
......@@ -392,21 +393,15 @@ public class FlowRuleManager
Futures.getUnchecked(result)));
}
}, futureListeners);
break;
case BATCH_OPERATION_COMPLETED:
Set<FlowRule> failedItems = event.result().failedItems();
for (FlowEntry entry : request.toAdd()) {
if (!failedItems.contains(entry)) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
}
}
for (FlowEntry entry : request.toRemove()) {
if (!failedItems.contains(entry)) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
}
}
// MASTER Node has pushed the batch down to the Device
// Note: RULE_ADDED will be posted
// when Flow was actually confirmed by stats reply.
break;
default:
break;
}
......
......@@ -79,6 +79,7 @@ public class OpticalPathIntentInstaller implements IntentInstaller<OpticalPathIn
private ApplicationId appId;
//final short WAVELENGTH = 80;
static final short SIGNAL_TYPE = (short) 1;
@Activate
public void activate() {
......@@ -151,7 +152,9 @@ public class OpticalPathIntentInstaller implements IntentInstaller<OpticalPathIn
prev = link.dst();
selectorBuilder.matchInport(link.dst().port());
selectorBuilder.matchOpticalSignalType(SIGNAL_TYPE); //todo
selectorBuilder.matchLambda((short) la.toInt());
}
// build the last T port rule
......
......@@ -148,7 +148,7 @@ public class FlowRuleManagerTest {
int i = 0;
System.err.println("events :" + listener.events);
for (FlowRuleEvent e : listener.events) {
assertTrue("unexpected event", e.type().equals(events[i]));
assertEquals("unexpected event", events[i], e.type());
i++;
}
......@@ -178,15 +178,13 @@ public class FlowRuleManagerTest {
RULE_ADDED, RULE_ADDED);
addFlowRule(1);
System.err.println("events :" + listener.events);
assertEquals("should still be 2 rules", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1));
validateEvents(RULE_UPDATED);
}
// TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
//backing store is sensitive to the order of additions/removals
private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
Iterable<FlowEntry> rules = service.getFlowEntries(DID);
......@@ -539,17 +537,17 @@ public class FlowRuleManagerTest {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
return false;
}
@Override
public boolean isCancelled() {
return true;
return false;
}
@Override
public boolean isDone() {
return false;
return true;
}
@Override
......@@ -562,12 +560,14 @@ public class FlowRuleManagerTest {
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException,
ExecutionException, TimeoutException {
return null;
return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
}
@Override
public void addListener(Runnable task, Executor executor) {
// TODO: add stuff.
if (isDone()) {
executor.execute(task);
}
}
}
......
......@@ -447,7 +447,13 @@ implements MastershipStore {
RoleValue oldValue = event.getOldValue();
RoleValue newValue = event.getValue();
if (Objects.equal(oldValue.get(MASTER), newValue.get(MASTER))) {
NodeId oldMaster = null;
if (oldValue != null) {
oldMaster = oldValue.get(MASTER);
}
NodeId newMaster = newValue.get(MASTER);
if (Objects.equal(oldMaster, newMaster)) {
notifyDelegate(new MastershipEvent(
MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
} else {
......
......@@ -16,8 +16,12 @@
package org.onlab.onos.store.trivial.impl;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -43,13 +47,15 @@ import org.onlab.onos.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
......@@ -72,6 +78,18 @@ public class SimpleFlowRuleStore
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
private final AtomicInteger localBatchIdGen = new AtomicInteger();
// TODO: make this configurable
private int pendingFutureTimeoutMinutes = 5;
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
// TODO Explicitly fail the future if expired?
//.removalListener(listener)
.build();
@Activate
public void activate() {
log.info("Started");
......@@ -173,10 +191,6 @@ public class SimpleFlowRuleStore
}
// new flow rule added
existing.add(f);
notifyDelegate(FlowRuleBatchEvent.requested(
new FlowRuleBatchRequest(1, /* FIXME generate something */
Arrays.<FlowEntry>asList(f),
Collections.<FlowEntry>emptyList())));
}
}
......@@ -190,11 +204,6 @@ public class SimpleFlowRuleStore
if (entry.equals(rule)) {
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
notifyDelegate(FlowRuleBatchEvent.requested(
new FlowRuleBatchRequest(1, /* FIXME generate something */
Collections.<FlowEntry>emptyList(),
Arrays.<FlowEntry>asList(entry))));
}
}
}
......@@ -251,20 +260,47 @@ public class SimpleFlowRuleStore
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
List<FlowRule> toAdd = new ArrayList<>();
List<FlowRule> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
storeFlowRule(entry.getTarget());
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
toAdd.add(flowRule);
}
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
deleteFlowRule(entry.getTarget());
if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
deleteFlowRule(flowRule);
toRemove.add(flowRule);
}
} else {
throw new UnsupportedOperationException("Unsupported operation type");
}
}
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
return r;
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
final Integer batchId = event.subject().batchId();
SettableFuture<CompletedBatchOperation> future
= pendingFutures.getIfPresent(batchId);
if (future != null) {
future.set(event.result());
pendingFutures.invalidate(batchId);
}
notifyDelegate(event);
}
}
......
......@@ -289,7 +289,10 @@ public class FlowEntryBuilder {
case OCH_SIGID:
builder.matchLambda(match.get(MatchField.OCH_SIGID).getChannelNumber());
break;
case OCH_SIGTYPE_BASIC:
case OCH_SIGTYPE:
builder.matchOpticalSignalType(match.get(MatchField
.OCH_SIGTYPE).getValue());
break;
case ARP_OP:
case ARP_SHA:
case ARP_SPA:
......
......@@ -19,6 +19,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criteria.EthCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.EthTypeCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
......@@ -46,6 +47,7 @@ import org.projectfloodlight.openflow.types.Masked;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.OFVlanVidMatch;
import org.projectfloodlight.openflow.types.TransportPort;
import org.projectfloodlight.openflow.types.U8;
import org.projectfloodlight.openflow.types.VlanPcp;
import org.projectfloodlight.openflow.types.VlanVid;
import org.slf4j.Logger;
......@@ -197,6 +199,12 @@ public abstract class FlowModBuilder {
mBuilder.setExact(MatchField.OCH_SIGID,
new CircuitSignalID((byte) 1, (byte) 2, lc.lambda(), (short) 1));
break;
case OCH_SIGTYPE:
Criteria.OpticalSignalTypeCriterion sc =
(Criteria.OpticalSignalTypeCriterion) c;
mBuilder.setExact(MatchField.OCH_SIGTYPE,
U8.of(sc.signalType()));
break;
case ARP_OP:
case ARP_SHA:
case ARP_SPA:
......
......@@ -101,7 +101,7 @@ public class TopologyResource extends BaseResource {
new Prop("Vendor", device.manufacturer()),
new Prop("H/W Version", device.hwVersion()),
new Prop("S/W Version", device.swVersion()),
new Prop("S/W Version", device.serialNumber()),
new Prop("Serial Number", device.serialNumber()),
new Separator(),
new Prop("Latitude", annot.value("latitude")),
new Prop("Longitude", annot.value("longitude")),
......