alshabib

functional stats service

Change-Id: I90de3aa5d7721db8ef6a154e122af8b446243f60
package org.onlab.onos.net.statistic;
import org.onlab.onos.net.flow.FlowRuleProvider;
/**
* Implementation of a load.
*/
public class DefaultLoad implements Load {
private final boolean isValid;
private final long current;
private final long previous;
private final long time;
/**
* Creates an invalid load.
*/
public DefaultLoad() {
this.isValid = false;
this.time = System.currentTimeMillis();
this.current = -1;
this.previous = -1;
}
/**
* Creates a load value from the parameters.
* @param current the current value
* @param previous the previous value
*/
public DefaultLoad(long current, long previous) {
this.current = current;
this.previous = previous;
this.time = System.currentTimeMillis();
this.isValid = true;
}
@Override
public long rate() {
return (current - previous) / FlowRuleProvider.POLL_INTERVAL;
}
@Override
public long latest() {
return current;
}
@Override
public boolean isValid() {
return isValid;
}
@Override
public long time() {
return time;
}
}
......@@ -6,15 +6,27 @@ package org.onlab.onos.net.statistic;
public interface Load {
/**
* Obtain the current observed rate on a link.
* Obtain the current observed rate (in bytes/s) on a link.
* @return long value
*/
long rate();
/**
* Obtain the latest counter viewed on that link.
* Obtain the latest bytes counter viewed on that link.
* @return long value
*/
long latest();
/**
* Indicates whether this load was built on valid values.
* @return boolean
*/
boolean isValid();
/**
* Returns when this value was seen.
* @return epoch time
*/
long time();
}
......
......@@ -108,6 +108,9 @@ public class FlowRuleManager
if (local) {
// TODO: aggregate all local rules and push down once?
applyFlowRulesToProviders(f);
eventDispatcher.post(
new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
}
}
}
......@@ -136,6 +139,8 @@ public class FlowRuleManager
if (local) {
// TODO: aggregate all local rules and push down once?
removeFlowRulesFromProviders(f);
eventDispatcher.post(
new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
}
}
}
......
......@@ -68,7 +68,9 @@ implements PacketService, PacketProviderRegistry {
checkNotNull(packet, "Packet cannot be null");
final Device device = deviceService.getDevice(packet.sendThrough());
final PacketProvider packetProvider = getProvider(device.providerId());
packetProvider.emit(packet);
if (packetProvider != null) {
packetProvider.emit(packet);
}
}
@Override
......
......@@ -10,15 +10,19 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.statistic.DefaultLoad;
import org.onlab.onos.net.statistic.Load;
import org.onlab.onos.net.statistic.StatisticService;
import org.onlab.onos.net.statistic.StatisticStore;
import org.slf4j.Logger;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -54,12 +58,12 @@ public class StatisticManager implements StatisticService {
@Override
public Load load(Link link) {
return null;
return load(link.src());
}
@Override
public Load load(ConnectPoint connectPoint) {
return null;
return loadInternal(connectPoint);
}
@Override
......@@ -77,6 +81,35 @@ public class StatisticManager implements StatisticService {
return null;
}
private Load loadInternal(ConnectPoint connectPoint) {
Set<FlowEntry> current;
Set<FlowEntry> previous;
synchronized (statisticStore) {
current = statisticStore.getCurrentStatistic(connectPoint);
previous = statisticStore.getPreviousStatistic(connectPoint);
}
if (current == null || previous == null) {
return new DefaultLoad();
}
long currentAggregate = aggregate(current);
long previousAggregate = aggregate(previous);
return new DefaultLoad(currentAggregate, previousAggregate);
}
/**
* Aggregates a set of values.
* @param values the values to aggregate
* @return a long value
*/
private long aggregate(Set<FlowEntry> values) {
long sum = 0;
for (FlowEntry f : values) {
sum += f.bytes();
}
return sum;
}
/**
* Internal flow rule event listener.
*/
......@@ -84,22 +117,29 @@ public class StatisticManager implements StatisticService {
@Override
public void event(FlowRuleEvent event) {
// FlowRule rule = event.subject();
// switch (event.type()) {
// case RULE_ADDED:
// case RULE_UPDATED:
// if (rule instanceof FlowEntry) {
// statisticStore.addOrUpdateStatistic((FlowEntry) rule);
// }
// break;
// case RULE_ADD_REQUESTED:
// statisticStore.prepareForStatistics(rule);
// break;
// case RULE_REMOVE_REQUESTED:
// case RULE_REMOVED:
// statisticStore.removeFromStatistics(rule);
// break;
// }
FlowRule rule = event.subject();
switch (event.type()) {
case RULE_ADDED:
case RULE_UPDATED:
if (rule instanceof FlowEntry) {
statisticStore.addOrUpdateStatistic((FlowEntry) rule);
} else {
log.warn("IT AIN'T A FLOWENTRY");
}
break;
case RULE_ADD_REQUESTED:
log.info("Preparing for stats");
statisticStore.prepareForStatistics(rule);
break;
case RULE_REMOVE_REQUESTED:
log.info("Removing stats");
statisticStore.removeFromStatistics(rule);
break;
case RULE_REMOVED:
break;
default:
log.warn("Unknown flow rule event {}", event);
}
}
}
......
......@@ -6,9 +6,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
import java.util.ArrayList;
import java.util.Collections;
......@@ -164,7 +162,8 @@ public class FlowRuleManagerTest {
assertEquals("2 rules should exist", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
validateEvents(RULE_ADDED, RULE_ADDED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
RULE_ADDED, RULE_ADDED);
addFlowRule(1);
assertEquals("should still be 2 rules", 2, flowCount());
......@@ -217,11 +216,12 @@ public class FlowRuleManagerTest {
FlowEntry fe2 = new DefaultFlowEntry(f2);
FlowEntry fe3 = new DefaultFlowEntry(f3);
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2, fe3));
validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
RULE_ADDED, RULE_ADDED, RULE_ADDED);
mgr.removeFlowRules(f1, f2);
//removing from north, so no events generated
validateEvents();
validateEvents(RULE_REMOVE_REQUESTED, RULE_REMOVE_REQUESTED);
assertEquals("3 rule should exist", 3, flowCount());
assertTrue("Entries should be pending remove.",
validateState(ImmutableMap.of(
......@@ -243,7 +243,8 @@ public class FlowRuleManagerTest {
service.removeFlowRules(f1);
fe1.setState(FlowEntryState.REMOVED);
providerService.flowRemoved(fe1);
validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
providerService.flowRemoved(fe1);
validateEvents();
......@@ -252,7 +253,7 @@ public class FlowRuleManagerTest {
FlowEntry fe3 = new DefaultFlowEntry(f3);
service.applyFlowRules(f3);
providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
validateEvents(RULE_ADDED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
providerService.flowRemoved(fe3);
validateEvents();
......@@ -281,7 +282,8 @@ public class FlowRuleManagerTest {
f2, FlowEntryState.ADDED,
f3, FlowEntryState.PENDING_ADD)));
validateEvents(RULE_ADDED, RULE_ADDED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
RULE_ADDED, RULE_ADDED);
}
@Test
......@@ -301,7 +303,7 @@ public class FlowRuleManagerTest {
providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2, fe3));
validateEvents(RULE_ADDED, RULE_ADDED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, RULE_ADDED);
}
......@@ -326,7 +328,8 @@ public class FlowRuleManagerTest {
providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2));
validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
RULE_REMOVE_REQUESTED, RULE_ADDED, RULE_ADDED, RULE_REMOVED);
}
......
package org.onlab.onos.store.statistic.impl;
import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions;
import org.onlab.onos.net.statistic.StatisticStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Maintains statistics using RPC calls to collect stats from remote instances
* on demand.
*/
@Component(immediate = true)
@Service
public class DistributedStatisticStore implements StatisticStore {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ReplicaInfoService replicaInfoManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private Map<ConnectPoint, InternalStatisticRepresentation> representations =
new ConcurrentHashMap<>();
private Map<ConnectPoint, Set<FlowEntry>> previous =
new ConcurrentHashMap<>();
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespaces.API.newBuilder()
.build()
.populate(1);
}
};;
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
ConnectPoint cp = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
});
clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
ConnectPoint cp = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
});
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void prepareForStatistics(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
InternalStatisticRepresentation rep;
synchronized (representations) {
rep = getOrCreateRepresentation(cp);
}
rep.prepare();
}
@Override
public void removeFromStatistics(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
InternalStatisticRepresentation rep = representations.get(cp);
if (rep != null) {
rep.remove(rule);
}
}
@Override
public void addOrUpdateStatistic(FlowEntry rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
InternalStatisticRepresentation rep = representations.get(cp);
if (rep != null && rep.submit(rule)) {
updatePublishedStats(cp, rep.get());
}
}
private synchronized void updatePublishedStats(ConnectPoint cp,
Set<FlowEntry> flowEntries) {
Set<FlowEntry> curr = current.get(cp);
if (curr == null) {
curr = new HashSet<>();
}
previous.put(cp, curr);
current.put(cp, flowEntries);
}
@Override
public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GET_CURRENT,
SERIALIZER.encode(connectPoint));
try {
ClusterMessageResponse response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
}
}
private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
return current.get(connectPoint);
}
@Override
public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GET_CURRENT,
SERIALIZER.encode(connectPoint));
try {
ClusterMessageResponse response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
}
}
private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
return previous.get(connectPoint);
}
private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
if (representations.containsKey(cp)) {
return representations.get(cp);
} else {
InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
representations.put(cp, rep);
return rep;
}
}
private ConnectPoint buildConnectPoint(FlowRule rule) {
PortNumber port = getOutput(rule);
if (port == null) {
log.warn("Rule {} has no output.", rule);
return null;
}
ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
return cp;
}
private PortNumber getOutput(FlowRule rule) {
for (Instruction i : rule.treatment().instructions()) {
if (i.type() == Instruction.Type.OUTPUT) {
Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
return out.port();
}
if (i.type() == Instruction.Type.DROP) {
return PortNumber.P0;
}
}
return null;
}
private class InternalStatisticRepresentation {
private final AtomicInteger counter = new AtomicInteger(0);
private final Set<FlowEntry> rules = new HashSet<>();
public void prepare() {
counter.incrementAndGet();
}
public synchronized void remove(FlowRule rule) {
rules.remove(rule);
counter.decrementAndGet();
}
public synchronized boolean submit(FlowEntry rule) {
if (rules.contains(rule)) {
rules.remove(rule);
}
rules.add(rule);
if (counter.get() == 0) {
return true;
} else {
return counter.decrementAndGet() == 0;
}
}
public synchronized Set<FlowEntry> get() {
counter.set(rules.size());
return ImmutableSet.copyOf(rules);
}
}
}
package org.onlab.onos.store.statistic.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by DistributedStatisticStore peer-peer communication.
*/
public final class StatisticStoreMessageSubjects {
private StatisticStoreMessageSubjects() {}
public static final MessageSubject GET_CURRENT =
new MessageSubject("peer-return-current");
public static final MessageSubject GET_PREVIOUS =
new MessageSubject("peer-return-previous");
}
/**
* Implementation of the statistic store.
*/
package org.onlab.onos.store.statistic.impl;
\ No newline at end of file
......@@ -4,6 +4,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
......@@ -26,6 +27,7 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
......@@ -75,6 +77,7 @@ public final class KryoNamespaces {
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
//
//
ControllerNode.State.class,
......@@ -94,6 +97,7 @@ public final class KryoNamespaces {
HostDescription.class,
DefaultHostDescription.class,
DefaultFlowRule.class,
DefaultFlowEntry.class,
FlowId.class,
DefaultTrafficSelector.class,
Criteria.PortCriterion.class,
......
......@@ -16,6 +16,7 @@
package org.onlab.onos.provider.lldp.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
......@@ -95,11 +96,13 @@ public class LinkDiscovery implements TimerTask {
*/
public LinkDiscovery(Device device, PacketService pktService,
MastershipService masterService, LinkProviderService providerService, Boolean... useBDDP) {
this.device = device;
this.probeRate = 3000;
this.linkProvider = providerService;
this.pktService = pktService;
this.mastershipService = masterService;
this.mastershipService = checkNotNull(masterService, "WTF!");
this.slowPorts = Collections.synchronizedSet(new HashSet<Long>());
this.fastPorts = Collections.synchronizedSet(new HashSet<Long>());
this.portProbeCount = new HashMap<>();
......@@ -344,6 +347,12 @@ public class LinkDiscovery implements TimerTask {
}
private void sendProbes(Long portNumber) {
if (device == null) {
log.warn("CRAZY SHIT");
}
if (mastershipService == null) {
log.warn("INSANE");
}
if (device.type() != Device.Type.ROADM &&
mastershipService.getLocalRole(this.device.id()) ==
MastershipRole.MASTER) {
......
......@@ -103,6 +103,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Map<Long, InstallationFuture> pendingFMs =
new ConcurrentHashMap<Long, InstallationFuture>();
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
/**
* Creates an OpenFlow host provider.
*/
......@@ -115,6 +117,14 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
providerService = providerRegistry.register(this);
controller.addListener(listener);
controller.addEventListener(listener);
for (OpenFlowSwitch sw : controller.getSwitches()) {
FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
fsc.start();
collectors.put(new Dpid(sw.getId()), fsc);
}
log.info("Started");
}
......@@ -213,7 +223,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
private final Multimap<DeviceId, FlowEntry> completeEntries =
ArrayListMultimap.create();
......