alshabib
Committed by Gerrit Code Review

adding device specific counters for meter ids in

the meter service.

Change-Id: I38d38a0a85024927f5a74013b2b4d9efa9b32d22
......@@ -43,7 +43,8 @@ public final class DefaultMeterRequest implements MeterRequest {
private DefaultMeterRequest(DeviceId deviceId, ApplicationId appId,
Meter.Unit unit, boolean burst,
Collection<Band> bands, MeterContext context, Type op) {
Collection<Band> bands, MeterContext context,
Type op) {
this.deviceId = deviceId;
this.appId = appId;
this.unit = unit;
......@@ -58,6 +59,7 @@ public final class DefaultMeterRequest implements MeterRequest {
return deviceId;
}
@Override
public ApplicationId appId() {
return appId;
......@@ -107,6 +109,7 @@ public final class DefaultMeterRequest implements MeterRequest {
private Collection<Band> bands;
private DeviceId deviceId;
private MeterContext context;
private Optional<MeterId> desiredId = Optional.empty();
@Override
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.meter;
import com.google.common.base.Objects;
import org.onosproject.net.DeviceId;
/**
* A meter key represents a meter uniquely.
*/
public final class MeterKey {
private final DeviceId deviceId;
private final MeterId id;
private MeterKey(DeviceId deviceId, MeterId id) {
this.deviceId = deviceId;
this.id = id;
}
public DeviceId deviceId() {
return deviceId;
}
public MeterId meterId() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MeterKey meterKey = (MeterKey) o;
return Objects.equal(deviceId, meterKey.deviceId) &&
Objects.equal(id, meterKey.id);
}
@Override
public int hashCode() {
return Objects.hashCode(deviceId, id);
}
public static MeterKey key(DeviceId deviceId, MeterId id) {
return new MeterKey(deviceId, id);
}
}
......@@ -16,6 +16,7 @@
package org.onosproject.net.meter;
import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
import java.util.Collection;
......@@ -46,10 +47,11 @@ public interface MeterService
/**
* Fetch the meter by the meter id.
*
* @param deviceId a device id
* @param id a meter id
* @return a meter
*/
Meter getMeter(MeterId id);
Meter getMeter(DeviceId deviceId, MeterId id);
/**
* Fetches all the meters.
......
......@@ -57,12 +57,12 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
void updateMeterState(Meter meter);
/**
* Obtains a meter matching the given meter id.
* Obtains a meter matching the given meter key.
*
* @param meterId a meter id
* @param key a meter key
* @return a meter
*/
Meter getMeter(MeterId meterId);
Meter getMeter(MeterKey key);
/**
* Returns all meters stored in the store.
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.incubator.net.meter.impl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -27,6 +28,7 @@ import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterListener;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterProvider;
......@@ -61,7 +63,7 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
MeterProvider, MeterProviderService>
implements MeterService, MeterProviderRegistry {
private final String meterIdentifier = "meter-id-counter";
private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
......@@ -71,15 +73,16 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MeterStore store;
private AtomicCounter meterIdCounter;
private Map<DeviceId, AtomicCounter> meterIdCounters
= Maps.newConcurrentMap();
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
@Activate
public void activate() {
meterIdCounter = storageService.atomicCounterBuilder()
.withName(meterIdentifier)
.build();
/*meterIdCounter = storageService.atomicCounterBuilder()
.withName(METERCOUNTERIDENTIFIER)
.build();*/
store.setDelegate(delegate);
......@@ -115,11 +118,13 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
@Override
public Meter submit(MeterRequest request) {
MeterId id = allocateMeterId(request.deviceId());
Meter.Builder mBuilder = DefaultMeter.builder()
.forDevice(request.deviceId())
.fromApp(request.appId())
.withBands(request.bands())
.withId(allocateMeterId())
.withId(id)
.withUnit(request.unit());
if (request.isBurst()) {
......@@ -152,8 +157,9 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
}
@Override
public Meter getMeter(MeterId id) {
return store.getMeter(id);
public Meter getMeter(DeviceId deviceId, MeterId id) {
MeterKey key = MeterKey.key(deviceId, id);
return store.getMeter(key);
}
@Override
......@@ -161,9 +167,21 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
return store.getAllMeters();
}
private MeterId allocateMeterId() {
// FIXME: This will break one day.
return MeterId.meterId((int) meterIdCounter.incrementAndGet());
private MeterId allocateMeterId(DeviceId deviceId) {
long id = meterIdCounters.compute(deviceId, (k, v) -> {
if (v == null) {
return allocateCounter(k);
}
return v;
}).incrementAndGet();
return MeterId.meterId((int) id);
}
private AtomicCounter allocateCounter(DeviceId deviceId) {
return storageService.atomicCounterBuilder()
.withName(String.format(METERCOUNTERIDENTIFIER, deviceId))
.build();
}
private class InternalMeterProviderService
......
......@@ -130,7 +130,7 @@ public class MeterManagerTest {
m2 = DefaultMeter.builder()
.forDevice(did("2"))
.fromApp(APP_ID)
.withId(MeterId.meterId(2))
.withId(MeterId.meterId(1))
.withUnit(Meter.Unit.KB_PER_SEC)
.withBands(Collections.singletonList(band))
.build();
......@@ -167,7 +167,7 @@ public class MeterManagerTest {
assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
assertThat(manager.getMeter(MeterId.meterId(1)), is(m1));
assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
}
@Test
......@@ -175,7 +175,7 @@ public class MeterManagerTest {
manager.submit(m1Request.add());
manager.withdraw(m1Request.remove(), m1.id());
assertThat(manager.getMeter(MeterId.meterId(1)).state(),
assertThat(manager.getMeter(did("1"), MeterId.meterId(1)).state(),
is(MeterState.PENDING_REMOVE));
providerService.pushMeterMetrics(m1.deviceId(), Collections.emptyList());
......@@ -184,7 +184,16 @@ public class MeterManagerTest {
}
@Test
public void testMultipleDevice() {
manager.submit(m1Request.add());
manager.submit(m2Request.add());
assertTrue("The meters were not added", manager.getAllMeters().size() == 2);
assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
assertThat(manager.getMeter(did("2"), MeterId.meterId(1)), is(m2));
}
public class TestApplicationId extends DefaultApplicationId {
public TestApplicationId(int id, String name) {
......
......@@ -33,6 +33,7 @@ import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.meter.MeterStore;
......@@ -78,12 +79,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private ConsistentMap<MeterId, MeterData> meters;
private ConsistentMap<MeterKey, MeterData> meters;
private NodeId local;
private MapEventListener mapListener = new InternalMapEventListener();
private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
@Activate
......@@ -92,9 +93,10 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
local = clusterService.getLocalNode().id();
meters = storageService.<MeterId, MeterData>consistentMapBuilder()
meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
MeterKey.class,
MeterData.class,
DefaultMeter.class,
DefaultBand.class,
......@@ -120,11 +122,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
futures.put(meter.id(), future);
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
try {
meters.put(meter.id(), data);
meters.put(key, data);
} catch (StorageException e) {
future.completeExceptionally(e);
}
......@@ -136,14 +139,15 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Override
public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
futures.put(meter.id(), future);
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
// update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
future.complete(MeterStoreResult.success());
}
} catch (StorageException e) {
......@@ -157,11 +161,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Override
public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
futures.put(meter.id(), future);
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
try {
if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
}
} catch (StorageException e) {
......@@ -172,7 +177,8 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Override
public void updateMeterState(Meter meter) {
meters.computeIfPresent(meter.id(), (id, v) -> {
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
meters.computeIfPresent(key, (k, v) -> {
DefaultMeter m = (DefaultMeter) v.meter();
m.setState(meter.state());
m.setProcessedPackets(meter.packetsSeen());
......@@ -185,8 +191,8 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
}
@Override
public Meter getMeter(MeterId meterId) {
MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
public Meter getMeter(MeterKey key) {
MeterData data = Versioned.valueOrElse(meters.get(key), null);
return data == null ? null : data.meter();
}
......@@ -198,14 +204,16 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
meters.computeIfPresent(op.meter().id(), (k, v) ->
MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
meters.computeIfPresent(key, (k, v) ->
new MeterData(v.meter(), reason, v.origin()));
}
@Override
public void deleteMeterNow(Meter m) {
futures.remove(m.id());
meters.remove(m.id());
MeterKey key = MeterKey.key(m.deviceId(), m.id());
futures.remove(key);
meters.remove(key);
}
private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
......