alshabib

Initial implementation of Meter Service (needs testing)

Change-Id: Ie07bd3e2bd7c67a6499c965d8926eb361ad16462

store impl started

Change-Id: Ib8b474f40dcecff335a421c12ad149fe9830c427

full implementation

Change-Id: Ie59fd61d02972bd04d887bdcca9745793b880681
......@@ -69,6 +69,20 @@ public interface Band {
*/
Type type();
/**
* Returns the packets seen by this band.
*
* @return a long value
*/
long packets();
/**
* Return the bytes seen by this band.
*
* @return a byte counter
*/
long bytes();
interface Builder {
/**
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.meter;
/**
* Represents a stored band.
*/
public interface BandEntry extends Band {
/**
* Sets the number of packets seen by this band.
*
* @param packets a packet count
*/
void setPackets(long packets);
/**
* Sets the number of bytes seen by this band.
*
* @param bytes a byte counter
*/
void setBytes(long bytes);
}
......@@ -20,12 +20,14 @@ import static com.google.common.base.Preconditions.checkArgument;
/**
* A default implementation for a Band.
*/
public final class DefaultBand implements Band {
public final class DefaultBand implements Band, BandEntry {
private final Type type;
private final long rate;
private final long burstSize;
private final short prec;
private long packets;
private long bytes;
public DefaultBand(Type type, long rate,
long burstSize, short prec) {
......@@ -55,6 +57,26 @@ public final class DefaultBand implements Band {
return type;
}
@Override
public long packets() {
return packets;
}
@Override
public long bytes() {
return bytes;
}
@Override
public void setPackets(long packets) {
this.packets = packets;
}
@Override
public void setBytes(long bytes) {
this.bytes = bytes;
}
public static Builder builder() {
return new Builder();
}
......@@ -91,7 +113,7 @@ public final class DefaultBand implements Band {
}
@Override
public Band build() {
public DefaultBand build() {
checkArgument(prec != null && type == Type.REMARK,
"Only REMARK bands can have a precendence.");
......
......@@ -28,7 +28,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* A default implementation of a meter.
*/
public final class DefaultMeter implements Meter {
public final class DefaultMeter implements Meter, MeterEntry {
private final MeterId id;
......@@ -39,6 +39,12 @@ public final class DefaultMeter implements Meter {
private final DeviceId deviceId;
private final Optional<MeterContext> context;
private MeterState state;
private long life;
private long refCount;
private long packets;
private long bytes;
private DefaultMeter(DeviceId deviceId, MeterId id, ApplicationId appId,
Unit unit, boolean burst,
Collection<Band> bands, Optional<MeterContext> context) {
......@@ -86,10 +92,60 @@ public final class DefaultMeter implements Meter {
return null;
}
@Override
public MeterState state() {
return state;
}
@Override
public long life() {
return life;
}
@Override
public long referenceCount() {
return refCount;
}
@Override
public long packetsSeen() {
return packets;
}
@Override
public long bytesSeen() {
return bytes;
}
public static Builder builder() {
return new Builder();
}
@Override
public void setState(MeterState state) {
this.state = state;
}
@Override
public void setLife(long life) {
this.life = life;
}
@Override
public void setReferenceCount(long count) {
this.refCount = count;
}
@Override
public void setProcessedPackets(long packets) {
this.packets = packets;
}
@Override
public void setProcessedBytes(long bytes) {
this.bytes = bytes;
}
public static final class Builder implements Meter.Builder {
private MeterId id;
......@@ -108,7 +164,7 @@ public final class DefaultMeter implements Meter {
}
@Override
public Meter.Builder withId(int id) {
public Meter.Builder withId(long id) {
this.id = MeterId.meterId(id);
return this;
}
......@@ -144,7 +200,7 @@ public final class DefaultMeter implements Meter {
}
@Override
public Meter build() {
public DefaultMeter build() {
checkNotNull(deviceId, "Must specify a device");
checkNotNull(bands, "Must have bands.");
checkArgument(bands.size() > 0, "Must have at least one band.");
......
......@@ -89,6 +89,41 @@ public interface Meter {
Optional<MeterContext> context();
/**
* Fetches the state of this meter.
*
* @return a meter state
*/
MeterState state();
/**
* The lifetime in seconds of this meter.
*
* @return number of seconds
*/
long life();
/**
* The number of flows pointing to this meter.
*
* @return a reference count
*/
long referenceCount();
/**
* Number of packets processed by this meter.
*
* @return a packet count
*/
long packetsSeen();
/**
* Number of bytes processed by this meter.
*
* @return a byte count
*/
long bytesSeen();
/**
* A meter builder.
*/
interface Builder {
......@@ -104,10 +139,10 @@ public interface Meter {
/**
* Assigns the id to this meter.
*
* @param id an integer
* @param id a long
* @return this
*/
Builder withId(int id);
Builder withId(long id);
/**
* Assigns the application that built this meter.
......
......@@ -34,6 +34,5 @@ public interface MeterContext {
* @param op a meter operation
* @param reason the reason why it failed
*/
default void onError(MeterOperation op, MeterFailReason reason) {
}
default void onError(MeterOperation op, MeterFailReason reason) {}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.meter;
/**
* Represents a stored meter.
*/
public interface MeterEntry extends Meter {
/**
* Updates the state of this meter.
*
* @param state a meter state
*/
void setState(MeterState state);
/**
* Set the amount of time the meter has existed in seconds.
*
* @param life number of seconds
*/
void setLife(long life);
/**
* Sets the number of flows which are using this meter.
*
* @param count a reference count.
*/
void setReferenceCount(long count);
/**
* Updates the number of packets seen by this meter.
*
* @param packets a packet count.
*/
void setProcessedPackets(long packets);
/**
* Updates the number of bytes seen by the meter.
*
* @param bytes a byte counter.
*/
void setProcessedBytes(long bytes);
}
......@@ -20,20 +20,12 @@ import org.onosproject.event.AbstractEvent;
/**
* Entity that represents Meter events.
*/
public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> {
enum Type {
private final MeterFailReason reason;
/**
* Signals that a new meter has been added.
*/
METER_ADDED,
/**
* Signals that a meter has been removed.
*/
METER_REMOVED,
public enum Type {
/**
* Signals that a meter has been added.
......@@ -41,19 +33,15 @@ public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
METER_UPDATED,
/**
* Signals that a meter addition failed.
* Signals that a meter update failed.
*/
METER_ADD_FAILED,
METER_OP_FAILED,
/**
* Signals that a meter removal failed.
* A meter operation was requested.
*/
METER_REMOVE_FAILED,
METER_OP_REQ,
/**
* Signals that a meter update failed.
*/
METER_UPDATE_FAILED
}
......@@ -62,21 +50,32 @@ public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
* current time.
*
* @param type meter event type
* @param meter event subject
* @param op event subject
*/
public MeterEvent(Type type, Meter meter) {
super(type, meter);
public MeterEvent(Type type, MeterOperation op) {
super(type, op);
this.reason = null;
}
/**
* Creates an event of a given type and for the specified meter and time.
*
* @param type meter event type
* @param meter event subject
* @param op event subject
* @param time occurrence time
*/
public MeterEvent(Type type, Meter meter, long time) {
super(type, meter, time);
public MeterEvent(Type type, MeterOperation op, long time) {
super(type, op, time);
this.reason = null;
}
public MeterEvent(Type type, MeterOperation op, MeterFailReason reason) {
super(type, op);
this.reason = reason;
}
public MeterFailReason reason() {
return reason;
}
}
......
......@@ -19,19 +19,19 @@ import static com.google.common.base.Preconditions.checkArgument;
/**
* A representation of a meter id.
* Uniquely identifies a meter for a given device.
* Uniquely identifies a meter system wide.
*/
public final class MeterId {
static final long MAX = 0xFFFF0000;
private final int id;
private final long id;
public static final MeterId SLOWPATH = new MeterId(0xFFFFFFFD);
public static final MeterId CONTROLLER = new MeterId(0xFFFFFFFE);
public static final MeterId ALL = new MeterId(0xFFFFFFFF);
private MeterId(int id) {
private MeterId(long id) {
checkArgument(id <= MAX, "id cannot be larger than 0xFFFF0000");
this.id = id;
}
......@@ -39,9 +39,9 @@ public final class MeterId {
/**
* The integer representation of the meter id.
*
* @return an integer
* @return a long
*/
public int id() {
public long id() {
return id;
}
......@@ -62,10 +62,10 @@ public final class MeterId {
@Override
public int hashCode() {
return id;
return Long.hashCode(id);
}
public static MeterId meterId(int id) {
public static MeterId meterId(long id) {
return new MeterId(id);
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.meter;
/**
* Represents the state of the meter as seen by the store.
*/
public enum MeterState {
/**
* The meter is in the process of being added.
*/
PENDING_ADD,
/**
* THe meter has been added.
*/
ADDED,
/**
* The meter is in the process of being removed.
*/
PENDING_REMOVE,
/**
* The meter has been removed.
*/
REMOVED,
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.meter;
import org.onosproject.store.Store;
import java.util.Collection;
/**
* Entity that stores and distributed meter objects.
*/
public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
/**
* Adds a meter to the store.
*
* @param meter a meter
*/
void storeMeter(Meter meter);
/**
* Deletes a meter from the store.
*
* @param meter a meter
*/
void deleteMeter(Meter meter);
/**
* Updates a meter whose meter id is the same as the passed meter.
*
* @param meter a new meter
*/
void updateMeter(Meter meter);
/**
* Updates a given meter's state with the provided state.
* @param meter a meter
*/
void updateMeterState(Meter meter);
/**
* Obtains a meter matching the given meter id.
*
* @param meterId a meter id
* @return a meter
*/
Meter getMeter(MeterId meterId);
/**
* Returns all meters stored in the store.
*
* @return a collection of meters
*/
Collection<Meter> getAllMeters();
/**
* Update the store by deleting the failed meter.
* Notifies the delegate that the meter failed to allow it
* to nofity the app.
*
* @param op a failed meter operation
* @param reason a failure reason
*/
void failedMeter(MeterOperation op, MeterFailReason reason);
}
......@@ -21,6 +21,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterEvent;
import org.onosproject.incubator.net.meter.MeterFailReason;
......@@ -31,6 +32,8 @@ import org.onosproject.incubator.net.meter.MeterProvider;
import org.onosproject.incubator.net.meter.MeterProviderRegistry;
import org.onosproject.incubator.net.meter.MeterProviderService;
import org.onosproject.incubator.net.meter.MeterService;
import org.onosproject.incubator.net.meter.MeterState;
import org.onosproject.incubator.net.meter.MeterStore;
import org.onosproject.incubator.net.meter.MeterStoreDelegate;
import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
......@@ -41,6 +44,7 @@ import org.slf4j.Logger;
import java.util.Collection;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -60,6 +64,9 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
MeterStore store;
private AtomicCounter meterIdCounter;
@Activate
......@@ -82,27 +89,35 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
@Override
public void addMeter(Meter meter) {
DefaultMeter m = (DefaultMeter) meter;
m.setState(MeterState.PENDING_ADD);
store.storeMeter(m);
}
@Override
public void updateMeter(Meter meter) {
DefaultMeter m = (DefaultMeter) meter;
m.setState(MeterState.PENDING_ADD);
store.updateMeter(m);
}
@Override
public void removeMeter(Meter meter) {
DefaultMeter m = (DefaultMeter) meter;
m.setState(MeterState.PENDING_REMOVE);
store.deleteMeter(m);
}
@Override
public void removeMeter(MeterId id) {
DefaultMeter meter = (DefaultMeter) store.getMeter(id);
checkNotNull(meter, "No such meter {}", id);
removeMeter(meter);
}
@Override
public Meter getMeter(MeterId id) {
return null;
return store.getMeter(id);
}
@Override
......@@ -125,13 +140,14 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
}
@Override
public void meterOperationFailed(MeterOperation operation, MeterFailReason reason) {
public void meterOperationFailed(MeterOperation operation,
MeterFailReason reason) {
store.failedMeter(operation, reason);
}
@Override
public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
meterEntries.forEach(m -> store.updateMeterState(m));
}
}
......@@ -139,6 +155,21 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
@Override
public void notify(MeterEvent event) {
DeviceId deviceId = event.subject().meter().deviceId();
MeterProvider p = getProvider(event.subject().meter().deviceId());
switch (event.type()) {
case METER_UPDATED:
break;
case METER_OP_FAILED:
event.subject().meter().context().ifPresent(c ->
c.onError(event.subject(), event.reason()));
break;
case METER_OP_REQ:
p.performMeterOperation(deviceId, event.subject());
break;
default:
log.warn("Unknown meter event {}", event.type());
}
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.store.meter.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.meter.DefaultBand;
import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterEvent;
import org.onosproject.incubator.net.meter.MeterFailReason;
import org.onosproject.incubator.net.meter.MeterId;
import org.onosproject.incubator.net.meter.MeterOperation;
import org.onosproject.incubator.net.meter.MeterStore;
import org.onosproject.incubator.net.meter.MeterStoreDelegate;
import org.onosproject.mastership.MastershipService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A distributed meter store implementation. Meters are stored consistently
* across the cluster.
*/
public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
implements MeterStore {
private Logger log = getLogger(getClass());
private static final String METERSTORE = "onos-meter-store";
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Number of threads in the message handler pool")
private int msgPoolSize;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private ConsistentMap<MeterId, Meter> meters;
private NodeId local;
private KryoNamespace kryoNameSpace;
private Serializer serializer;
@Activate
public void activate() {
local = clusterService.getLocalNode().id();
kryoNameSpace =
KryoNamespace.newBuilder()
.register(DefaultMeter.class)
.register(DefaultBand.class)
.build();
serializer = Serializer.using(kryoNameSpace);
meters = storageService.<MeterId, Meter>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(serializer)
.build();
ExecutorService executors = Executors.newFixedThreadPool(
msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
registerMessageHandlers(executors);
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
private void registerMessageHandlers(ExecutorService executor) {
clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
this::notifyDelegate, executor);
}
@Override
public void storeMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.ADD));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to install meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void deleteMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
// update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.REMOVE));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to delete meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void updateMeter(Meter meter) {
NodeId master = mastershipService.getMasterFor(meter.deviceId());
meters.put(meter.id(), meter);
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
new MeterOperation(meter, MeterOperation.Type.MODIFY));
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to update meter {} because {} on {}",
meter, error, master);
// notify app of failure
meter.context().ifPresent(c -> c.onError(
event.subject(), MeterFailReason.UNKNOWN));
}
});
}
}
@Override
public void updateMeterState(Meter meter) {
meters.compute(meter.id(), (id, v) -> {
DefaultMeter m = (DefaultMeter) v;
m.setState(meter.state());
m.setProcessedPackets(meter.packetsSeen());
m.setProcessedBytes(meter.bytesSeen());
m.setLife(meter.life());
m.setReferenceCount(meter.referenceCount());
return m;
});
}
@Override
public Meter getMeter(MeterId meterId) {
return meters.get(meterId).value();
}
@Override
public Collection<Meter> getAllMeters() {
return meters.values().stream()
.map(v -> v.value()).collect(Collectors.toSet());
}
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
meters.remove(op.meter().id());
MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
if (Objects.equals(local, master)) {
notifyDelegate(event);
} else {
clusterCommunicationService.unicast(
event,
UPDATE_METER,
serializer::encode,
master
).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to delete failed meter {} because {} on {}",
op.meter(), error, master);
// Can't do any more...
}
});
}
}
}
......@@ -46,7 +46,7 @@ public final class MeterModBuilder {
private final OFFactory factory;
private Meter.Unit unit = Meter.Unit.KB_PER_SEC;
private boolean burst = false;
private Integer id;
private Long id;
private Collection<Band> bands;
public MeterModBuilder(long xid, OFFactory factory) {
......
......@@ -20,7 +20,6 @@ package org.onosproject.provider.of.meter.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
......@@ -28,7 +27,10 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.net.DeviceId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.meter.Band;
import org.onosproject.incubator.net.meter.DefaultBand;
import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterFailReason;
import org.onosproject.incubator.net.meter.MeterOperation;
......@@ -36,6 +38,8 @@ import org.onosproject.incubator.net.meter.MeterOperations;
import org.onosproject.incubator.net.meter.MeterProvider;
import org.onosproject.incubator.net.meter.MeterProviderRegistry;
import org.onosproject.incubator.net.meter.MeterProviderService;
import org.onosproject.incubator.net.meter.MeterState;
import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.openflow.controller.Dpid;
......@@ -47,15 +51,23 @@ import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFMeterBandStats;
import org.projectfloodlight.openflow.protocol.OFMeterConfigStatsReply;
import org.projectfloodlight.openflow.protocol.OFMeterStats;
import org.projectfloodlight.openflow.protocol.OFMeterStatsReply;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFStatsReply;
import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.errormsg.OFMeterModFailedErrorMsg;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -74,6 +86,9 @@ public class OpenFlowMeterProvider extends AbstractProvider implements MeterProv
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MeterProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private MeterProviderService providerService;
private static final AtomicLong XID_COUNTER = new AtomicLong(1);
......@@ -81,8 +96,7 @@ public class OpenFlowMeterProvider extends AbstractProvider implements MeterProv
static final int POLL_INTERVAL = 10;
static final long TIMEOUT = 30;
private Cache<Integer, MeterOperation> pendingOperations;
private Cache<Long, MeterOperation> pendingXid;
private Cache<Long, MeterOperation> pendingOperations;
private InternalMeterListener listener = new InternalMeterListener();
......@@ -101,7 +115,7 @@ public class OpenFlowMeterProvider extends AbstractProvider implements MeterProv
pendingOperations = CacheBuilder.newBuilder()
.expireAfterWrite(TIMEOUT, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Integer, MeterOperation> notification) -> {
.removalListener((RemovalNotification<Long, MeterOperation> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
providerService.meterOperationFailed(notification.getValue(),
MeterFailReason.TIMEOUT);
......@@ -149,6 +163,8 @@ public class OpenFlowMeterProvider extends AbstractProvider implements MeterProv
return;
}
performOperation(sw, meterOp);
}
private void performOperation(OpenFlowSwitch sw, MeterOperation op) {
......@@ -203,7 +219,57 @@ public class OpenFlowMeterProvider extends AbstractProvider implements MeterProv
}
private void pushMeterStats(Dpid dpid, OFStatsReply msg) {
DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
if (msg.getStatsType() == OFStatsType.METER) {
OFMeterStatsReply reply = (OFMeterStatsReply) msg;
Collection<Meter> meters = buildMeters(deviceId, reply.getEntries());
//TODO do meter accounting here.
providerService.pushMeterMetrics(deviceId, meters);
} else if (msg.getStatsType() == OFStatsType.METER_CONFIG) {
OFMeterConfigStatsReply reply = (OFMeterConfigStatsReply) msg;
// FIXME: Map<Long, Meter> meters = collectMeters(deviceId, reply);
}
}
private Map<Long, Meter> collectMeters(DeviceId deviceId,
OFMeterConfigStatsReply reply) {
return Maps.newHashMap();
//TODO: Needs a fix to be applied to loxi MeterConfig stat is incorrect
}
private Collection<Meter> buildMeters(DeviceId deviceId,
List<OFMeterStats> entries) {
return entries.stream().map(stat -> {
DefaultMeter.Builder builder = DefaultMeter.builder();
Collection<Band> bands = buildBands(stat.getBandStats());
builder.forDevice(deviceId)
.withId(stat.getMeterId())
//FIXME: need to encode appId in meter id, but that makes
// things a little annoying for debugging
.fromApp(coreService.getAppId("org.onosproject.core"))
.withBands(bands);
DefaultMeter meter = builder.build();
meter.setState(MeterState.ADDED);
meter.setLife(stat.getDurationSec());
meter.setProcessedBytes(stat.getByteInCount().getValue());
meter.setProcessedPackets(stat.getPacketInCount().getValue());
meter.setReferenceCount(stat.getFlowCount());
// marks the meter as seen on the dataplane
pendingOperations.invalidate(stat.getMeterId());
return meter;
}).collect(Collectors.toSet());
}
private Collection<Band> buildBands(List<OFMeterBandStats> bandStats) {
return bandStats.stream().map(stat -> {
DefaultBand band = DefaultBand.builder().build();
band.setBytes(stat.getByteBandCount().getValue());
band.setPackets(stat.getPacketBandCount().getValue());
return band;
}).collect(Collectors.toSet());
}
private void signalMeterError(OFMeterModFailedErrorMsg meterError,
......
......@@ -29,4 +29,4 @@ fi
set -x
ssh $ONOS_USER@$node /tmp/$ONOS_BITS/bin/onos-form-cluster -u $user -p $password $nodes
\ No newline at end of file
ssh $ONOS_USER@$node $ONOS_INSTALL_DIR/bin/onos-form-cluster -u $user -p $password $nodes
......