alshabib

Moving meter store implementation to use map events

Change-Id: I338473b7286d7b9e5cdfb938f16c7b6155d4cbb5
...@@ -20,7 +20,6 @@ import org.onosproject.net.DeviceId; ...@@ -20,7 +20,6 @@ import org.onosproject.net.DeviceId;
20 20
21 import java.util.Collection; 21 import java.util.Collection;
22 import java.util.Collections; 22 import java.util.Collections;
23 -import java.util.Optional;
24 23
25 import static com.google.common.base.Preconditions.checkArgument; 24 import static com.google.common.base.Preconditions.checkArgument;
26 import static com.google.common.base.Preconditions.checkNotNull; 25 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -37,7 +36,6 @@ public final class DefaultMeter implements Meter, MeterEntry { ...@@ -37,7 +36,6 @@ public final class DefaultMeter implements Meter, MeterEntry {
37 private final boolean burst; 36 private final boolean burst;
38 private final Collection<Band> bands; 37 private final Collection<Band> bands;
39 private final DeviceId deviceId; 38 private final DeviceId deviceId;
40 - private final Optional<MeterContext> context;
41 39
42 private MeterState state; 40 private MeterState state;
43 private long life; 41 private long life;
...@@ -47,14 +45,13 @@ public final class DefaultMeter implements Meter, MeterEntry { ...@@ -47,14 +45,13 @@ public final class DefaultMeter implements Meter, MeterEntry {
47 45
48 private DefaultMeter(DeviceId deviceId, MeterId id, ApplicationId appId, 46 private DefaultMeter(DeviceId deviceId, MeterId id, ApplicationId appId,
49 Unit unit, boolean burst, 47 Unit unit, boolean burst,
50 - Collection<Band> bands, Optional<MeterContext> context) { 48 + Collection<Band> bands) {
51 this.deviceId = deviceId; 49 this.deviceId = deviceId;
52 this.id = id; 50 this.id = id;
53 this.appId = appId; 51 this.appId = appId;
54 this.unit = unit; 52 this.unit = unit;
55 this.burst = burst; 53 this.burst = burst;
56 this.bands = bands; 54 this.bands = bands;
57 - this.context = context;
58 } 55 }
59 56
60 @Override 57 @Override
...@@ -88,11 +85,6 @@ public final class DefaultMeter implements Meter, MeterEntry { ...@@ -88,11 +85,6 @@ public final class DefaultMeter implements Meter, MeterEntry {
88 } 85 }
89 86
90 @Override 87 @Override
91 - public Optional<MeterContext> context() {
92 - return null;
93 - }
94 -
95 - @Override
96 public MeterState state() { 88 public MeterState state() {
97 return state; 89 return state;
98 } 90 }
...@@ -154,7 +146,6 @@ public final class DefaultMeter implements Meter, MeterEntry { ...@@ -154,7 +146,6 @@ public final class DefaultMeter implements Meter, MeterEntry {
154 private boolean burst = false; 146 private boolean burst = false;
155 private Collection<Band> bands; 147 private Collection<Band> bands;
156 private DeviceId deviceId; 148 private DeviceId deviceId;
157 - private Optional<MeterContext> context;
158 149
159 150
160 @Override 151 @Override
...@@ -194,19 +185,13 @@ public final class DefaultMeter implements Meter, MeterEntry { ...@@ -194,19 +185,13 @@ public final class DefaultMeter implements Meter, MeterEntry {
194 } 185 }
195 186
196 @Override 187 @Override
197 - public Meter.Builder withContext(MeterContext context) {
198 - this.context = Optional.<MeterContext>ofNullable(context);
199 - return this;
200 - }
201 -
202 - @Override
203 public DefaultMeter build() { 188 public DefaultMeter build() {
204 checkNotNull(deviceId, "Must specify a device"); 189 checkNotNull(deviceId, "Must specify a device");
205 checkNotNull(bands, "Must have bands."); 190 checkNotNull(bands, "Must have bands.");
206 checkArgument(bands.size() > 0, "Must have at least one band."); 191 checkArgument(bands.size() > 0, "Must have at least one band.");
207 checkNotNull(appId, "Must have an application id"); 192 checkNotNull(appId, "Must have an application id");
208 checkNotNull(id, "Must specify a meter id"); 193 checkNotNull(id, "Must specify a meter id");
209 - return new DefaultMeter(deviceId, id, appId, unit, burst, bands, context); 194 + return new DefaultMeter(deviceId, id, appId, unit, burst, bands);
210 } 195 }
211 196
212 197
......
...@@ -19,7 +19,6 @@ import org.onosproject.core.ApplicationId; ...@@ -19,7 +19,6 @@ import org.onosproject.core.ApplicationId;
19 import org.onosproject.net.DeviceId; 19 import org.onosproject.net.DeviceId;
20 20
21 import java.util.Collection; 21 import java.util.Collection;
22 -import java.util.Optional;
23 22
24 /** 23 /**
25 * Represents a generalized meter to be deployed on a device. 24 * Represents a generalized meter to be deployed on a device.
...@@ -81,14 +80,6 @@ public interface Meter { ...@@ -81,14 +80,6 @@ public interface Meter {
81 Collection<Band> bands(); 80 Collection<Band> bands();
82 81
83 /** 82 /**
84 - * Obtains an optional context.
85 - *
86 - * @return optional; which will be empty if there is no context.
87 - * Otherwise it will return the context.
88 - */
89 - Optional<MeterContext> context();
90 -
91 - /**
92 * Fetches the state of this meter. 83 * Fetches the state of this meter.
93 * 84 *
94 * @return a meter state 85 * @return a meter state
...@@ -177,8 +168,6 @@ public interface Meter { ...@@ -177,8 +168,6 @@ public interface Meter {
177 */ 168 */
178 Builder withBands(Collection<Band> bands); 169 Builder withBands(Collection<Band> bands);
179 170
180 - Builder withContext(MeterContext context);
181 -
182 /** 171 /**
183 * Builds the meter based on the specified parameters. 172 * Builds the meter based on the specified parameters.
184 * 173 *
......
...@@ -16,23 +16,23 @@ ...@@ -16,23 +16,23 @@
16 package org.onosproject.incubator.net.meter; 16 package org.onosproject.incubator.net.meter;
17 17
18 /** 18 /**
19 - * Created by ash on 01/08/15. 19 + * A context permitting the application to be notified when the
20 + * meter installation has been successful.
20 */ 21 */
21 public interface MeterContext { 22 public interface MeterContext {
22 23
23 /** 24 /**
24 * Invoked on successful installation of the meter. 25 * Invoked on successful installation of the meter.
25 * 26 *
26 - * @param op a meter operation 27 + * @param op a meter
27 */ 28 */
28 - default void onSuccess(MeterOperation op) { 29 + default void onSuccess(Meter op) {}
29 - }
30 30
31 /** 31 /**
32 * Invoked when error is encountered while installing a meter. 32 * Invoked when error is encountered while installing a meter.
33 * 33 *
34 - * @param op a meter operation 34 + * @param op a meter
35 * @param reason the reason why it failed 35 * @param reason the reason why it failed
36 */ 36 */
37 - default void onError(MeterOperation op, MeterFailReason reason) {} 37 + default void onError(Meter op, MeterFailReason reason) {}
38 } 38 }
......
...@@ -20,28 +20,19 @@ import org.onosproject.event.AbstractEvent; ...@@ -20,28 +20,19 @@ import org.onosproject.event.AbstractEvent;
20 /** 20 /**
21 * Entity that represents Meter events. 21 * Entity that represents Meter events.
22 */ 22 */
23 -public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> { 23 +public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
24 24
25 25
26 - private final MeterFailReason reason;
27 -
28 public enum Type { 26 public enum Type {
29 -
30 - /**
31 - * Signals that a meter has been added.
32 - */
33 - METER_UPDATED,
34 -
35 /** 27 /**
36 - * Signals that a meter update failed. 28 + * A meter addition was requested.
37 */ 29 */
38 - METER_OP_FAILED, 30 + METER_ADD_REQ,
39 31
40 /** 32 /**
41 - * A meter operation was requested. 33 + * A meter removal was requested.
42 */ 34 */
43 - METER_OP_REQ, 35 + METER_REM_REQ
44 -
45 } 36 }
46 37
47 38
...@@ -50,32 +41,22 @@ public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> { ...@@ -50,32 +41,22 @@ public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> {
50 * current time. 41 * current time.
51 * 42 *
52 * @param type meter event type 43 * @param type meter event type
53 - * @param op event subject 44 + * @param meter event subject
54 */ 45 */
55 - public MeterEvent(Type type, MeterOperation op) { 46 + public MeterEvent(Type type, Meter meter) {
56 - super(type, op); 47 + super(type, meter);
57 - this.reason = null;
58 } 48 }
59 49
60 /** 50 /**
61 * Creates an event of a given type and for the specified meter and time. 51 * Creates an event of a given type and for the specified meter and time.
62 * 52 *
63 * @param type meter event type 53 * @param type meter event type
64 - * @param op event subject 54 + * @param meter event subject
65 * @param time occurrence time 55 * @param time occurrence time
66 */ 56 */
67 - public MeterEvent(Type type, MeterOperation op, long time) { 57 + public MeterEvent(Type type, Meter meter, long time) {
68 - super(type, op, time); 58 + super(type, meter, time);
69 - this.reason = null;
70 - }
71 -
72 - public MeterEvent(Type type, MeterOperation op, MeterFailReason reason) {
73 - super(type, op);
74 - this.reason = reason;
75 } 59 }
76 60
77 - public MeterFailReason reason() {
78 - return reason;
79 - }
80 61
81 } 62 }
......
...@@ -17,11 +17,15 @@ package org.onosproject.incubator.net.meter; ...@@ -17,11 +17,15 @@ package org.onosproject.incubator.net.meter;
17 17
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 19
20 +import java.util.Optional;
21 +
20 /** 22 /**
21 * Representation of an operation on the meter table. 23 * Representation of an operation on the meter table.
22 */ 24 */
23 public class MeterOperation { 25 public class MeterOperation {
24 26
27 + private final Optional<MeterContext> context;
28 +
25 /** 29 /**
26 * Tyoe of meter operation. 30 * Tyoe of meter operation.
27 */ 31 */
...@@ -35,9 +39,10 @@ public class MeterOperation { ...@@ -35,9 +39,10 @@ public class MeterOperation {
35 private final Type type; 39 private final Type type;
36 40
37 41
38 - public MeterOperation(Meter meter, Type type) { 42 + public MeterOperation(Meter meter, Type type, MeterContext context) {
39 this.meter = meter; 43 this.meter = meter;
40 this.type = type; 44 this.type = type;
45 + this.context = Optional.ofNullable(context);
41 } 46 }
42 47
43 /** 48 /**
...@@ -58,6 +63,16 @@ public class MeterOperation { ...@@ -58,6 +63,16 @@ public class MeterOperation {
58 return meter; 63 return meter;
59 } 64 }
60 65
66 + /**
67 + * Returns a context which allows application to
68 + * be notified on the success value of this operation.
69 + *
70 + * @return a meter context
71 + */
72 + public Optional<MeterContext> context() {
73 + return this.context;
74 + }
75 +
61 @Override 76 @Override
62 public String toString() { 77 public String toString() {
63 return MoreObjects.toStringHelper(this) 78 return MoreObjects.toStringHelper(this)
......
...@@ -30,28 +30,21 @@ public interface MeterService ...@@ -30,28 +30,21 @@ public interface MeterService
30 * 30 *
31 * @param meter a meter. 31 * @param meter a meter.
32 */ 32 */
33 - void addMeter(Meter meter); 33 + void addMeter(MeterOperation meter);
34 34
35 /** 35 /**
36 * Updates a meter by adding statistic information to the meter. 36 * Updates a meter by adding statistic information to the meter.
37 * 37 *
38 * @param meter an updated meter 38 * @param meter an updated meter
39 */ 39 */
40 - void updateMeter(Meter meter); 40 + void updateMeter(MeterOperation meter);
41 41
42 /** 42 /**
43 * Remove a meter from the system and the dataplane. 43 * Remove a meter from the system and the dataplane.
44 * 44 *
45 * @param meter a meter to remove 45 * @param meter a meter to remove
46 */ 46 */
47 - void removeMeter(Meter meter); 47 + void removeMeter(MeterOperation meter);
48 -
49 - /**
50 - * Remove a meter from the system and the dataplane by meter id.
51 - *
52 - * @param id a meter id
53 - */
54 - void removeMeter(MeterId id);
55 48
56 /** 49 /**
57 * Fetch the meter by the meter id. 50 * Fetch the meter by the meter id.
......
...@@ -18,6 +18,7 @@ package org.onosproject.incubator.net.meter; ...@@ -18,6 +18,7 @@ package org.onosproject.incubator.net.meter;
18 import org.onosproject.store.Store; 18 import org.onosproject.store.Store;
19 19
20 import java.util.Collection; 20 import java.util.Collection;
21 +import java.util.concurrent.CompletableFuture;
21 22
22 /** 23 /**
23 * Entity that stores and distributed meter objects. 24 * Entity that stores and distributed meter objects.
...@@ -28,25 +29,29 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> { ...@@ -28,25 +29,29 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
28 * Adds a meter to the store. 29 * Adds a meter to the store.
29 * 30 *
30 * @param meter a meter 31 * @param meter a meter
32 + * @return a future indicating the result of the store operation
31 */ 33 */
32 - void storeMeter(Meter meter); 34 + CompletableFuture<MeterStoreResult> storeMeter(Meter meter);
33 35
34 /** 36 /**
35 * Deletes a meter from the store. 37 * Deletes a meter from the store.
36 * 38 *
37 * @param meter a meter 39 * @param meter a meter
40 + * @return a future indicating the result of the store operation
38 */ 41 */
39 - void deleteMeter(Meter meter); 42 + CompletableFuture<MeterStoreResult> deleteMeter(Meter meter);
40 43
41 /** 44 /**
42 * Updates a meter whose meter id is the same as the passed meter. 45 * Updates a meter whose meter id is the same as the passed meter.
43 * 46 *
44 * @param meter a new meter 47 * @param meter a new meter
48 + * @return a future indicating the result of the store operation
45 */ 49 */
46 - void updateMeter(Meter meter); 50 + CompletableFuture<MeterStoreResult> updateMeter(Meter meter);
47 51
48 /** 52 /**
49 * Updates a given meter's state with the provided state. 53 * Updates a given meter's state with the provided state.
54 + *
50 * @param meter a meter 55 * @param meter a meter
51 */ 56 */
52 void updateMeterState(Meter meter); 57 void updateMeterState(Meter meter);
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.incubator.net.meter;
17 +
18 +import java.util.Optional;
19 +
20 +/**
21 + * An entity used to indicate whether the store operation passed.
22 + */
23 +public final class MeterStoreResult {
24 +
25 +
26 + private final Type type;
27 + private final Optional<MeterFailReason> reason;
28 +
29 + public enum Type {
30 + SUCCESS,
31 + FAIL
32 + }
33 +
34 + private MeterStoreResult(Type type, MeterFailReason reason) {
35 + this.type = type;
36 + this.reason = Optional.ofNullable(reason);
37 + }
38 +
39 + public Type type() {
40 + return type;
41 + }
42 +
43 + public Optional<MeterFailReason> reason() {
44 + return reason;
45 + }
46 +
47 + /**
48 + * A successful store opertion.
49 + *
50 + * @return a meter store result
51 + */
52 + public static MeterStoreResult success() {
53 + return new MeterStoreResult(Type.SUCCESS, null);
54 + }
55 +
56 + /**
57 + * A failed store operation.
58 + *
59 + * @param reason a failure reason
60 + * @return a meter store result
61 + */
62 + public static MeterStoreResult fail(MeterFailReason reason) {
63 + return new MeterStoreResult(Type.FAIL, reason);
64 + }
65 +
66 +}
...@@ -21,6 +21,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -21,6 +21,7 @@ import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
24 +import org.onlab.util.TriConsumer;
24 import org.onosproject.incubator.net.meter.DefaultMeter; 25 import org.onosproject.incubator.net.meter.DefaultMeter;
25 import org.onosproject.incubator.net.meter.Meter; 26 import org.onosproject.incubator.net.meter.Meter;
26 import org.onosproject.incubator.net.meter.MeterEvent; 27 import org.onosproject.incubator.net.meter.MeterEvent;
...@@ -35,6 +36,7 @@ import org.onosproject.incubator.net.meter.MeterService; ...@@ -35,6 +36,7 @@ import org.onosproject.incubator.net.meter.MeterService;
35 import org.onosproject.incubator.net.meter.MeterState; 36 import org.onosproject.incubator.net.meter.MeterState;
36 import org.onosproject.incubator.net.meter.MeterStore; 37 import org.onosproject.incubator.net.meter.MeterStore;
37 import org.onosproject.incubator.net.meter.MeterStoreDelegate; 38 import org.onosproject.incubator.net.meter.MeterStoreDelegate;
39 +import org.onosproject.incubator.net.meter.MeterStoreResult;
38 import org.onosproject.net.DeviceId; 40 import org.onosproject.net.DeviceId;
39 import org.onosproject.net.provider.AbstractListenerProviderRegistry; 41 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
40 import org.onosproject.net.provider.AbstractProviderService; 42 import org.onosproject.net.provider.AbstractProviderService;
...@@ -44,7 +46,6 @@ import org.slf4j.Logger; ...@@ -44,7 +46,6 @@ import org.slf4j.Logger;
44 46
45 import java.util.Collection; 47 import java.util.Collection;
46 48
47 -import static com.google.common.base.Preconditions.checkNotNull;
48 import static org.slf4j.LoggerFactory.getLogger; 49 import static org.slf4j.LoggerFactory.getLogger;
49 50
50 51
...@@ -69,11 +70,29 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M ...@@ -69,11 +70,29 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
69 70
70 private AtomicCounter meterIdCounter; 71 private AtomicCounter meterIdCounter;
71 72
73 + private TriConsumer<MeterOperation, MeterStoreResult, Throwable> onComplete;
74 +
72 @Activate 75 @Activate
73 public void activate() { 76 public void activate() {
74 meterIdCounter = storageService.atomicCounterBuilder() 77 meterIdCounter = storageService.atomicCounterBuilder()
75 .withName(meterIdentifier) 78 .withName(meterIdentifier)
76 .build(); 79 .build();
80 +
81 + onComplete = (op, result, error) ->
82 + {
83 + op.context().ifPresent(c -> {
84 + if (error != null) {
85 + c.onError(op.meter(), MeterFailReason.UNKNOWN);
86 + } else {
87 + if (result.reason().isPresent()) {
88 + c.onError(op.meter(), result.reason().get());
89 + } else {
90 + c.onSuccess(op.meter());
91 + }
92 + }
93 + });
94 +
95 + };
77 log.info("Started"); 96 log.info("Started");
78 } 97 }
79 98
...@@ -88,31 +107,27 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M ...@@ -88,31 +107,27 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
88 } 107 }
89 108
90 @Override 109 @Override
91 - public void addMeter(Meter meter) { 110 + public void addMeter(MeterOperation op) {
92 - DefaultMeter m = (DefaultMeter) meter; 111 + DefaultMeter m = (DefaultMeter) op.meter();
93 m.setState(MeterState.PENDING_ADD); 112 m.setState(MeterState.PENDING_ADD);
94 - store.storeMeter(m); 113 + store.storeMeter(m).whenComplete((result, error) ->
114 + onComplete.accept(op, result, error));
95 } 115 }
96 116
97 @Override 117 @Override
98 - public void updateMeter(Meter meter) { 118 + public void updateMeter(MeterOperation op) {
99 - DefaultMeter m = (DefaultMeter) meter; 119 + DefaultMeter m = (DefaultMeter) op.meter();
100 m.setState(MeterState.PENDING_ADD); 120 m.setState(MeterState.PENDING_ADD);
101 - store.updateMeter(m); 121 + store.updateMeter(m).whenComplete((result, error) ->
122 + onComplete.accept(op, result, error));
102 } 123 }
103 124
104 @Override 125 @Override
105 - public void removeMeter(Meter meter) { 126 + public void removeMeter(MeterOperation op) {
106 - DefaultMeter m = (DefaultMeter) meter; 127 + DefaultMeter m = (DefaultMeter) op.meter();
107 m.setState(MeterState.PENDING_REMOVE); 128 m.setState(MeterState.PENDING_REMOVE);
108 - store.deleteMeter(m); 129 + store.deleteMeter(m).whenComplete((result, error) ->
109 - } 130 + onComplete.accept(op, result, error));
110 -
111 - @Override
112 - public void removeMeter(MeterId id) {
113 - DefaultMeter meter = (DefaultMeter) store.getMeter(id);
114 - checkNotNull(meter, "No such meter {}", id);
115 - removeMeter(meter);
116 } 131 }
117 132
118 @Override 133 @Override
...@@ -155,17 +170,18 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M ...@@ -155,17 +170,18 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
155 170
156 @Override 171 @Override
157 public void notify(MeterEvent event) { 172 public void notify(MeterEvent event) {
158 - DeviceId deviceId = event.subject().meter().deviceId(); 173 + DeviceId deviceId = event.subject().deviceId();
159 - MeterProvider p = getProvider(event.subject().meter().deviceId()); 174 + MeterProvider p = getProvider(event.subject().deviceId());
160 switch (event.type()) { 175 switch (event.type()) {
161 - case METER_UPDATED: 176 + case METER_ADD_REQ:
162 - break; 177 + p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
163 - case METER_OP_FAILED: 178 + MeterOperation.Type.ADD,
164 - event.subject().meter().context().ifPresent(c -> 179 + null));
165 - c.onError(event.subject(), event.reason()));
166 break; 180 break;
167 - case METER_OP_REQ: 181 + case METER_REM_REQ:
168 - p.performMeterOperation(deviceId, event.subject()); 182 + p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
183 + MeterOperation.Type.REMOVE,
184 + null));
169 break; 185 break;
170 default: 186 default:
171 log.warn("Unknown meter event {}", event.type()); 187 log.warn("Unknown meter event {}", event.type());
......
...@@ -15,38 +15,40 @@ ...@@ -15,38 +15,40 @@
15 */ 15 */
16 package org.onosproject.incubator.store.meter.impl; 16 package org.onosproject.incubator.store.meter.impl;
17 17
18 +import com.google.common.collect.Collections2;
19 +import com.google.common.collect.Maps;
18 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Deactivate; 21 import org.apache.felix.scr.annotations.Deactivate;
20 -import org.apache.felix.scr.annotations.Property;
21 import org.apache.felix.scr.annotations.Reference; 22 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 23 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 -import org.onlab.util.KryoNamespace;
24 -import org.onlab.util.Tools;
25 import org.onosproject.cluster.ClusterService; 24 import org.onosproject.cluster.ClusterService;
26 import org.onosproject.cluster.NodeId; 25 import org.onosproject.cluster.NodeId;
27 -import org.onosproject.incubator.net.meter.DefaultBand;
28 import org.onosproject.incubator.net.meter.DefaultMeter; 26 import org.onosproject.incubator.net.meter.DefaultMeter;
29 import org.onosproject.incubator.net.meter.Meter; 27 import org.onosproject.incubator.net.meter.Meter;
30 import org.onosproject.incubator.net.meter.MeterEvent; 28 import org.onosproject.incubator.net.meter.MeterEvent;
31 import org.onosproject.incubator.net.meter.MeterFailReason; 29 import org.onosproject.incubator.net.meter.MeterFailReason;
32 import org.onosproject.incubator.net.meter.MeterId; 30 import org.onosproject.incubator.net.meter.MeterId;
33 import org.onosproject.incubator.net.meter.MeterOperation; 31 import org.onosproject.incubator.net.meter.MeterOperation;
32 +import org.onosproject.incubator.net.meter.MeterState;
34 import org.onosproject.incubator.net.meter.MeterStore; 33 import org.onosproject.incubator.net.meter.MeterStore;
35 import org.onosproject.incubator.net.meter.MeterStoreDelegate; 34 import org.onosproject.incubator.net.meter.MeterStoreDelegate;
35 +import org.onosproject.incubator.net.meter.MeterStoreResult;
36 import org.onosproject.mastership.MastershipService; 36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.store.AbstractStore; 37 import org.onosproject.store.AbstractStore;
38 -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 38 +import org.onosproject.store.serializers.KryoNamespaces;
39 -import org.onosproject.store.cluster.messaging.MessageSubject;
40 import org.onosproject.store.service.ConsistentMap; 39 import org.onosproject.store.service.ConsistentMap;
40 +import org.onosproject.store.service.MapEvent;
41 +import org.onosproject.store.service.MapEventListener;
41 import org.onosproject.store.service.Serializer; 42 import org.onosproject.store.service.Serializer;
43 +import org.onosproject.store.service.StorageException;
42 import org.onosproject.store.service.StorageService; 44 import org.onosproject.store.service.StorageService;
45 +import org.onosproject.store.service.Versioned;
43 import org.slf4j.Logger; 46 import org.slf4j.Logger;
44 47
48 +import java.util.Arrays;
45 import java.util.Collection; 49 import java.util.Collection;
46 -import java.util.Objects; 50 +import java.util.Map;
47 -import java.util.concurrent.ExecutorService; 51 +import java.util.concurrent.CompletableFuture;
48 -import java.util.concurrent.Executors;
49 -import java.util.stream.Collectors;
50 52
51 import static org.slf4j.LoggerFactory.getLogger; 53 import static org.slf4j.LoggerFactory.getLogger;
52 54
...@@ -60,54 +62,37 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD ...@@ -60,54 +62,37 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
60 private Logger log = getLogger(getClass()); 62 private Logger log = getLogger(getClass());
61 63
62 private static final String METERSTORE = "onos-meter-store"; 64 private static final String METERSTORE = "onos-meter-store";
63 - private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
64 -
65 - private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
66 -
67 -
68 - @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
69 - label = "Number of threads in the message handler pool")
70 - private int msgPoolSize;
71 65
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private StorageService storageService; 67 private StorageService storageService;
74 68
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 - private ClusterCommunicationService clusterCommunicationService;
77 -
78 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 private MastershipService mastershipService; 70 private MastershipService mastershipService;
80 71
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 private ClusterService clusterService; 73 private ClusterService clusterService;
83 74
84 - private ConsistentMap<MeterId, Meter> meters; 75 + private ConsistentMap<MeterId, MeterData> meters;
85 private NodeId local; 76 private NodeId local;
86 - private KryoNamespace kryoNameSpace;
87 77
88 - private Serializer serializer; 78 + private MapEventListener mapListener = new InternalMapEventListener();
79 +
80 + private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
81 + Maps.newConcurrentMap();
89 82
90 @Activate 83 @Activate
91 public void activate() { 84 public void activate() {
92 85
93 local = clusterService.getLocalNode().id(); 86 local = clusterService.getLocalNode().id();
94 87
95 - kryoNameSpace =
96 - KryoNamespace.newBuilder()
97 - .register(DefaultMeter.class)
98 - .register(DefaultBand.class)
99 - .build();
100 -
101 - serializer = Serializer.using(kryoNameSpace);
102 88
103 - meters = storageService.<MeterId, Meter>consistentMapBuilder() 89 + meters = storageService.<MeterId, MeterData>consistentMapBuilder()
104 .withName(METERSTORE) 90 .withName(METERSTORE)
105 - .withSerializer(serializer) 91 + .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
92 + MeterData.class))
106 .build(); 93 .build();
107 94
108 - ExecutorService executors = Executors.newFixedThreadPool( 95 + meters.addListener(mapListener);
109 - msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
110 - registerMessageHandlers(executors);
111 96
112 log.info("Started"); 97 log.info("Started");
113 } 98 }
...@@ -115,159 +100,133 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD ...@@ -115,159 +100,133 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
115 @Deactivate 100 @Deactivate
116 public void deactivate() { 101 public void deactivate() {
117 102
118 - 103 + meters.removeListener(mapListener);
119 log.info("Stopped"); 104 log.info("Stopped");
120 } 105 }
121 106
122 - private void registerMessageHandlers(ExecutorService executor) {
123 - clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
124 - this::notifyDelegate, executor);
125 -
126 - }
127 -
128 107
129 @Override 108 @Override
130 - public void storeMeter(Meter meter) { 109 + public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
131 - NodeId master = mastershipService.getMasterFor(meter.deviceId()); 110 + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
132 - 111 + futures.put(meter.id(), future);
133 - meters.put(meter.id(), meter); 112 + MeterData data = new MeterData(meter, null, local);
134 - 113 +
135 - MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ, 114 + try {
136 - new MeterOperation(meter, MeterOperation.Type.ADD)); 115 + meters.put(meter.id(), data);
137 - if (Objects.equals(local, master)) { 116 + } catch (StorageException e) {
138 - notifyDelegate(event); 117 + future.completeExceptionally(e);
139 - } else {
140 - clusterCommunicationService.unicast(
141 - event,
142 - UPDATE_METER,
143 - serializer::encode,
144 - master
145 - ).whenComplete((result, error) -> {
146 - if (error != null) {
147 - log.warn("Failed to install meter {} because {} on {}",
148 - meter, error, master);
149 -
150 - // notify app of failure
151 - meter.context().ifPresent(c -> c.onError(
152 - event.subject(), MeterFailReason.UNKNOWN));
153 - }
154 - });
155 } 118 }
156 119
120 + return future;
121 +
157 } 122 }
158 123
159 @Override 124 @Override
160 - public void deleteMeter(Meter meter) { 125 + public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
126 + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
127 + futures.put(meter.id(), future);
161 128
162 - NodeId master = mastershipService.getMasterFor(meter.deviceId()); 129 + MeterData data = new MeterData(meter, null, local);
163 130
164 // update the state of the meter. It will be pruned by observing 131 // update the state of the meter. It will be pruned by observing
165 // that it has been removed from the dataplane. 132 // that it has been removed from the dataplane.
166 - meters.put(meter.id(), meter); 133 + try {
167 - 134 + meters.put(meter.id(), data);
168 - MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ, 135 + } catch (StorageException e) {
169 - new MeterOperation(meter, MeterOperation.Type.REMOVE)); 136 + future.completeExceptionally(e);
170 - if (Objects.equals(local, master)) {
171 - notifyDelegate(event);
172 - } else {
173 - clusterCommunicationService.unicast(
174 - event,
175 - UPDATE_METER,
176 - serializer::encode,
177 - master
178 - ).whenComplete((result, error) -> {
179 - if (error != null) {
180 - log.warn("Failed to delete meter {} because {} on {}",
181 - meter, error, master);
182 -
183 - // notify app of failure
184 - meter.context().ifPresent(c -> c.onError(
185 - event.subject(), MeterFailReason.UNKNOWN));
186 - }
187 - });
188 } 137 }
189 138
139 +
140 + return future;
190 } 141 }
191 142
192 @Override 143 @Override
193 - public void updateMeter(Meter meter) { 144 + public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
194 - 145 + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
195 - NodeId master = mastershipService.getMasterFor(meter.deviceId()); 146 + futures.put(meter.id(), future);
196 - 147 +
197 - meters.put(meter.id(), meter); 148 + MeterData data = new MeterData(meter, null, local);
198 - 149 + try {
199 - MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ, 150 + meters.put(meter.id(), data);
200 - new MeterOperation(meter, MeterOperation.Type.MODIFY)); 151 + } catch (StorageException e) {
201 - if (Objects.equals(local, master)) { 152 + future.completeExceptionally(e);
202 - notifyDelegate(event);
203 - } else {
204 - clusterCommunicationService.unicast(
205 - event,
206 - UPDATE_METER,
207 - serializer::encode,
208 - master
209 - ).whenComplete((result, error) -> {
210 - if (error != null) {
211 - log.warn("Failed to update meter {} because {} on {}",
212 - meter, error, master);
213 -
214 - // notify app of failure
215 - meter.context().ifPresent(c -> c.onError(
216 - event.subject(), MeterFailReason.UNKNOWN));
217 - }
218 - });
219 } 153 }
220 - 154 + return future;
221 } 155 }
222 156
223 @Override 157 @Override
224 public void updateMeterState(Meter meter) { 158 public void updateMeterState(Meter meter) {
225 - meters.compute(meter.id(), (id, v) -> { 159 + meters.computeIfPresent(meter.id(), (id, v) -> {
226 - DefaultMeter m = (DefaultMeter) v; 160 + DefaultMeter m = (DefaultMeter) v.meter();
227 m.setState(meter.state()); 161 m.setState(meter.state());
228 m.setProcessedPackets(meter.packetsSeen()); 162 m.setProcessedPackets(meter.packetsSeen());
229 m.setProcessedBytes(meter.bytesSeen()); 163 m.setProcessedBytes(meter.bytesSeen());
230 m.setLife(meter.life()); 164 m.setLife(meter.life());
165 + // TODO: Prune if drops to zero.
231 m.setReferenceCount(meter.referenceCount()); 166 m.setReferenceCount(meter.referenceCount());
232 - return m; 167 + return new MeterData(m, null, v.origin());
233 }); 168 });
234 } 169 }
235 170
236 @Override 171 @Override
237 public Meter getMeter(MeterId meterId) { 172 public Meter getMeter(MeterId meterId) {
238 - return meters.get(meterId).value(); 173 + MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
174 + return data == null ? null : data.meter();
239 } 175 }
240 176
241 @Override 177 @Override
242 public Collection<Meter> getAllMeters() { 178 public Collection<Meter> getAllMeters() {
243 - return meters.values().stream() 179 + return Collections2.transform(meters.asJavaMap().values(),
244 - .map(v -> v.value()).collect(Collectors.toSet()); 180 + MeterData::meter);
245 } 181 }
246 182
247 @Override 183 @Override
248 public void failedMeter(MeterOperation op, MeterFailReason reason) { 184 public void failedMeter(MeterOperation op, MeterFailReason reason) {
249 - NodeId master = mastershipService.getMasterFor(op.meter().deviceId()); 185 + meters.computeIfPresent(op.meter().id(), (k, v) ->
250 - meters.remove(op.meter().id()); 186 + new MeterData(v.meter(), reason, v.origin()));
251 - 187 + }
252 - MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason); 188 +
253 - if (Objects.equals(local, master)) { 189 + private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
254 - notifyDelegate(event); 190 + @Override
255 - } else { 191 + public void event(MapEvent<MeterId, MeterData> event) {
256 - clusterCommunicationService.unicast( 192 + MeterData data = event.value().value();
257 - event, 193 + NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
258 - UPDATE_METER, 194 + switch (event.type()) {
259 - serializer::encode, 195 + case INSERT:
260 - master 196 + case UPDATE:
261 - ).whenComplete((result, error) -> { 197 + switch (data.meter().state()) {
262 - if (error != null) { 198 + case PENDING_ADD:
263 - log.warn("Failed to delete failed meter {} because {} on {}", 199 + case PENDING_REMOVE:
264 - op.meter(), error, master); 200 + if (!data.reason().isPresent() && local.equals(master)) {
265 - 201 + notifyDelegate(
266 - // Can't do any more... 202 + new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
267 - } 203 + MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
268 - }); 204 + data.meter()));
269 - } 205 + } else if (data.reason().isPresent() && local.equals(data.origin())) {
206 + MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
207 + //TODO: No future -> no friend
208 + futures.get(data.meter().id()).complete(msr);
209 + }
210 + break;
211 + case ADDED:
212 + case REMOVED:
213 + if (local.equals(data.origin())) {
214 + futures.get(data.meter().id()).complete(MeterStoreResult.success());
215 + }
216 + break;
217 + default:
218 + log.warn("Unknown meter state type {}", data.meter().state());
219 + }
220 + break;
221 + case REMOVE:
222 + //Only happens at origin so we do not need to care.
223 + break;
224 + default:
225 + log.warn("Unknown Map event type {}", event.type());
226 + }
270 227
228 + }
271 } 229 }
272 230
231 +
273 } 232 }
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.incubator.store.meter.impl;
17 +
18 +import org.onosproject.cluster.NodeId;
19 +import org.onosproject.incubator.net.meter.Meter;
20 +import org.onosproject.incubator.net.meter.MeterFailReason;
21 +
22 +import java.util.Optional;
23 +
24 +/**
25 + * A class representing the meter information stored in the meter store.
26 + */
27 +public class MeterData {
28 +
29 + private final Meter meter;
30 + private final Optional<MeterFailReason> reason;
31 + private final NodeId origin;
32 +
33 + public MeterData(Meter meter, MeterFailReason reason, NodeId origin) {
34 + this.meter = meter;
35 + this.reason = Optional.ofNullable(reason);
36 + this.origin = origin;
37 + }
38 +
39 + public Meter meter() {
40 + return meter;
41 + }
42 +
43 + public Optional<MeterFailReason> reason() {
44 + return this.reason;
45 + }
46 +
47 + public NodeId origin() {
48 + return this.origin;
49 + }
50 +
51 +
52 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.util;
17 +
18 +/**
19 + * A consumer that accepts three arguments.
20 + */
21 +public interface TriConsumer<U, V, W> {
22 +
23 + /**
24 + * Applies the given arguments to the function.
25 + */
26 + void accept(U arg1, V arg2, W arg3);
27 +
28 +}
...\ No newline at end of file ...\ No newline at end of file