Aaron Kruglikov
Committed by Gerrit Code Review

Adding commands for the distributed multimap.

Change-Id: Ieecfea9aaabefa1f95918d4f95c489a0e94b01a9
...@@ -109,6 +109,28 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive { ...@@ -109,6 +109,28 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
109 CompletableFuture<Boolean> remove(K key, V value); 109 CompletableFuture<Boolean> remove(K key, V value);
110 110
111 /** 111 /**
112 + * Removes the key-value pairs with the specified key and values if they
113 + * exist. In implementations that allow duplicates each instance of a key
114 + * will remove one matching entry, which one is not defined. Equivalent to
115 + * repeated calls to {@code remove()} for each key value pair but more
116 + * efficient.
117 + * @param key the key of the pair to be removed
118 + * @param values the set of values to be removed
119 + * @return a future whose value will be true if the map changes because of
120 + * this call, false otherwise.
121 + */
122 + CompletableFuture<Boolean> removeAll(K key, Iterable<? extends V> values);
123 +
124 + /**
125 + * Removes all values associated with the specified key as well as the key
126 + * itself.
127 + * @param key the key whose key-value pairs will be removed
128 + * @return a future whose value is the set of values that were removed,
129 + * which may be empty
130 + */
131 + CompletableFuture<Versioned<Collection<byte[]>>> removeAll(K key);
132 +
133 + /**
112 * Adds the set of key-value pairs of the specified key with each of the 134 * Adds the set of key-value pairs of the specified key with each of the
113 * values in the iterable if each key-value pair does not already exist, 135 * values in the iterable if each key-value pair does not already exist,
114 * if the pair does exist the behavior is implementation specific. 136 * if the pair does exist the behavior is implementation specific.
...@@ -142,15 +164,6 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive { ...@@ -142,15 +164,6 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
142 CompletableFuture<Collection<V>> replaceValues(K key, Iterable<V> values); 164 CompletableFuture<Collection<V>> replaceValues(K key, Iterable<V> values);
143 165
144 /** 166 /**
145 - * Removes all values associated with the specified key as well as the key
146 - * itself.
147 - * @param key the key whose key-value pairs will be removed
148 - * @return a future whose value is the set of values that were removed,
149 - * which may be empty
150 - */
151 - CompletableFuture<Collection<V>> removeAll(K key);
152 -
153 - /**
154 * Removes all key-value pairs, after which it will be empty. 167 * Removes all key-value pairs, after which it will be empty.
155 * @return a future whose value is irrelevant, simply used to determine if 168 * @return a future whose value is irrelevant, simply used to determine if
156 * the call has completed 169 * the call has completed
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.resources.impl;
18 +
19 +import com.google.common.base.MoreObjects;
20 +import com.google.common.collect.Multiset;
21 +import io.atomix.catalyst.buffer.BufferInput;
22 +import io.atomix.catalyst.buffer.BufferOutput;
23 +import io.atomix.catalyst.serializer.CatalystSerializable;
24 +import io.atomix.catalyst.serializer.SerializableTypeResolver;
25 +import io.atomix.catalyst.serializer.Serializer;
26 +import io.atomix.catalyst.serializer.SerializerRegistry;
27 +import io.atomix.catalyst.util.Assert;
28 +import io.atomix.copycat.Command;
29 +import io.atomix.copycat.Query;
30 +import org.onlab.util.Match;
31 +
32 +import java.util.Collection;
33 +import java.util.List;
34 +import java.util.Map;
35 +import java.util.Set;
36 +
37 +/**
38 + * AsyncConsistentMultimap state machine commands.
39 + */
40 +public final class AsyncConsistentMultimapCommands {
41 +
42 + private AsyncConsistentMultimapCommands() {
43 + }
44 +
45 + /**
46 + * Abstract multimap command.
47 + */
48 + @SuppressWarnings("serial")
49 + public abstract static class MultimapCommand<V> implements Command<V>,
50 + CatalystSerializable {
51 + @Override
52 + public ConsistencyLevel consistency() {
53 + return ConsistencyLevel.SEQUENTIAL;
54 + }
55 +
56 + @Override
57 + public String toString() {
58 + return MoreObjects.toStringHelper(getClass())
59 + .toString();
60 + }
61 +
62 + @Override
63 + public void writeObject(BufferOutput<?> buffer,
64 + Serializer serializer) {
65 + }
66 +
67 + @Override
68 + public void readObject(BufferInput<?> buffer, Serializer serializer) {
69 + }
70 + }
71 +
72 + /**
73 + * Abstract multimap query.
74 + */
75 + @SuppressWarnings("serial")
76 + public abstract static class MultimapQuery<V> implements Query<V>,
77 + CatalystSerializable {
78 + @Override
79 + public ConsistencyLevel consistency() {
80 + return ConsistencyLevel.SEQUENTIAL;
81 + }
82 +
83 + @Override
84 + public String toString() {
85 + return MoreObjects.toStringHelper(getClass())
86 + .toString();
87 + }
88 +
89 + @Override
90 + public void writeObject(BufferOutput<?> buffer,
91 + Serializer serializer) {
92 + }
93 +
94 + @Override
95 + public void readObject(BufferInput<?> buffer,
96 + Serializer serializer) {
97 + }
98 + }
99 +
100 + /**
101 + * Abstract key-based multimap query.
102 + */
103 + @SuppressWarnings("serial")
104 + public abstract static class KeyQuery<V> extends MultimapQuery<V> {
105 + protected String key;
106 +
107 + public KeyQuery() {
108 + }
109 +
110 + public KeyQuery(String key) {
111 + this.key = Assert.notNull(key, "key");
112 + }
113 +
114 + public String key() {
115 + return key;
116 + }
117 +
118 + @Override
119 + public String toString() {
120 + return MoreObjects.toStringHelper(getClass())
121 + .add("key", key)
122 + .toString();
123 + }
124 +
125 + @Override
126 + public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
127 + super.writeObject(buffer, serializer);
128 + serializer.writeObject(key, buffer);
129 + }
130 +
131 + @Override
132 + public void readObject(BufferInput<?> buffer, Serializer serializer) {
133 + super.readObject(buffer, serializer);
134 + key = serializer.readObject(buffer);
135 + }
136 + }
137 +
138 + /**
139 + * Abstract value-based query.
140 + */
141 + @SuppressWarnings("serial")
142 + public abstract static class ValueQuery<V> extends MultimapQuery<V> {
143 + protected byte[] value;
144 +
145 + public ValueQuery() {
146 + }
147 +
148 + public ValueQuery(byte[] value) {
149 + this.value = Assert.notNull(value, "value");
150 + }
151 +
152 + /**
153 + * Returns the value.
154 + *
155 + * @return value.
156 + */
157 + public byte[] value() {
158 + return value;
159 + }
160 +
161 + @Override
162 + public String toString() {
163 + return MoreObjects.toStringHelper(getClass())
164 + .add("value", value)
165 + .toString();
166 + }
167 +
168 + @Override
169 + public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
170 + super.writeObject(buffer, serializer);
171 + }
172 +
173 + @Override
174 + public void readObject(BufferInput<?> buffer, Serializer serializer) {
175 + super.readObject(buffer, serializer);
176 + }
177 + }
178 +
179 + /**
180 + * Size query.
181 + */
182 + public static class Size extends MultimapQuery<Integer> {
183 + }
184 +
185 + /**
186 + * Is empty query.
187 + */
188 + public static class IsEmpty extends MultimapQuery<Boolean> {
189 + }
190 +
191 + /**
192 + * Contains key query.
193 + */
194 + @SuppressWarnings("serial")
195 + public static class ContainsKey extends KeyQuery<Boolean> {
196 + public ContainsKey() {
197 + }
198 +
199 + public ContainsKey(String key) {
200 + super(key);
201 + }
202 +
203 + }
204 +
205 + /**
206 + * Contains value query.
207 + */
208 + @SuppressWarnings("serial")
209 + public static class ContainsValue extends ValueQuery<Boolean> {
210 + public ContainsValue() {
211 + }
212 +
213 + public ContainsValue(byte[] value) {
214 + super(value);
215 + }
216 + }
217 +
218 + /**
219 + * Contains entry query.
220 + */
221 + @SuppressWarnings("serial")
222 + public static class ContainsEntry extends MultimapQuery<Boolean> {
223 + protected String key;
224 + protected byte[] value;
225 +
226 + public ContainsEntry() {
227 + }
228 +
229 + public ContainsEntry(String key, byte[] value) {
230 + this.key = Assert.notNull(key, "key");
231 + this.value = Assert.notNull(value, "value");
232 + }
233 +
234 + public String key() {
235 + return key;
236 + }
237 +
238 + public byte[] value() {
239 + return value;
240 + }
241 +
242 + @Override
243 + public String toString() {
244 + return MoreObjects.toStringHelper(getClass())
245 + .add("key", key)
246 + .add("value", value)
247 + .toString();
248 + }
249 +
250 + @Override
251 + public void writeObject(BufferOutput<?> buffer,
252 + Serializer serializer) {
253 + super.writeObject(buffer, serializer);
254 + serializer.writeObject(key, buffer);
255 + serializer.writeObject(value, buffer);
256 + }
257 +
258 + @Override
259 + public void readObject(BufferInput<?> buffer, Serializer serializer) {
260 + super.readObject(buffer, serializer);
261 + key = serializer.readObject(buffer);
262 + value = serializer.readObject(buffer);
263 +
264 + }
265 + }
266 +
267 + /**
268 + * Update and get command. Note that corresponding values must have the
269 + * same index in the respective arrays.
270 + */
271 + @SuppressWarnings("serial")
272 + public static class UpdateAndGet extends
273 + MultimapCommand<MapEntryUpdateResult<String, Collection<byte[]>>> {
274 + private String key;
275 + private List<byte[]> values;
276 + private List<Match<byte[]>> valueMatches;
277 + private List<Match<Long>> versionMatches;
278 +
279 + public UpdateAndGet() {
280 + }
281 +
282 + public UpdateAndGet(String key, List<byte[]> values,
283 + List<Match<byte[]>> valueMatches,
284 + List<Match<Long>> versionMatches) {
285 + this.key = key;
286 + this.values = values;
287 + this.valueMatches = valueMatches;
288 + this.versionMatches = versionMatches;
289 + }
290 +
291 + public String key() {
292 + return this.key;
293 + }
294 +
295 + public List<byte[]> values() {
296 + return values;
297 + }
298 +
299 + public List<Match<byte[]>> valueMatches() {
300 + return valueMatches;
301 + }
302 +
303 + public List<Match<Long>> versionMatches() {
304 + return versionMatches;
305 + }
306 +
307 + @Override
308 + public CompactionMode compaction() {
309 + return values == null ? CompactionMode.FULL :
310 + CompactionMode.QUORUM;
311 + }
312 +
313 + @Override
314 + public void writeObject(BufferOutput<?> buffer,
315 + Serializer serializer) {
316 + super.writeObject(buffer, serializer);
317 + serializer.writeObject(key, buffer);
318 + serializer.writeObject(values, buffer);
319 + serializer.writeObject(valueMatches, buffer);
320 + serializer.writeObject(versionMatches, buffer);
321 + }
322 +
323 + @Override
324 + public void readObject(BufferInput<?> buffer, Serializer serializer) {
325 + super.readObject(buffer, serializer);
326 + key = serializer.readObject(buffer);
327 + values = serializer.readObject(buffer);
328 + valueMatches = serializer.readObject(buffer);
329 + versionMatches = serializer.readObject(buffer);
330 + }
331 +
332 + @Override
333 + public String toString() {
334 + return super.toString();
335 + }
336 + }
337 +
338 + /**
339 + * Clear multimap command.
340 + */
341 + @SuppressWarnings("serial")
342 + public static class Clear extends MultimapCommand<Void> {
343 + }
344 +
345 + /**
346 + * Key set query.
347 + */
348 + @SuppressWarnings("serial")
349 + public static class KeySet extends MultimapQuery<Set<String>> {
350 + }
351 +
352 + /**
353 + * Key multiset query.
354 + */
355 + @SuppressWarnings("serial")
356 + public static class Keys extends MultimapQuery<Multiset<String>> {
357 + }
358 +
359 + /**
360 + * Value collection query.
361 + */
362 + @SuppressWarnings("serial")
363 + public static class Values extends MultimapQuery<Collection<byte[]>> {
364 + }
365 +
366 + /**
367 + * Entry set query.
368 + */
369 + @SuppressWarnings("serial")
370 + public static class Entries extends
371 + MultimapQuery<Collection<Map.Entry<String, byte[]>>> {
372 + }
373 +
374 + /**
375 + * Get value query.
376 + */
377 + public static class Get extends KeyQuery<Collection<byte[]>> {
378 + }
379 +
380 + /**
381 + * Multimap command type resolver.
382 + */
383 + @SuppressWarnings("serial")
384 + public static class TypeResolver implements SerializableTypeResolver {
385 + @Override
386 + public void resolve(SerializerRegistry registry) {
387 + registry.register(ContainsKey.class, -1000);
388 + registry.register(ContainsValue.class, -1001);
389 + registry.register(ContainsEntry.class, -1002);
390 + registry.register(UpdateAndGet.class, -1003);
391 + registry.register(Clear.class, -1004);
392 + registry.register(KeySet.class, -1005);
393 + registry.register(Keys.class, -1006);
394 + registry.register(Values.class, -1007);
395 + registry.register(Entries.class, -1008);
396 + registry.register(Size.class, -1009);
397 + registry.register(IsEmpty.class, -1010);
398 + registry.register(Get.class, -1011);
399 + }
400 + }
401 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.resources.impl;
18 +
19 +import com.google.common.collect.Lists;
20 +import com.google.common.collect.Multimap;
21 +import com.google.common.collect.Multiset;
22 +import io.atomix.copycat.client.CopycatClient;
23 +import io.atomix.resource.AbstractResource;
24 +import org.onlab.util.Match;
25 +import org.onosproject.store.service.AsyncConsistentMultimap;
26 +import org.onosproject.store.service.Versioned;
27 +
28 +import java.util.Collection;
29 +import java.util.ConcurrentModificationException;
30 +import java.util.Map;
31 +import java.util.Properties;
32 +import java.util.Set;
33 +import java.util.concurrent.CompletableFuture;
34 +
35 +import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.*;
36 +
37 +/**
38 + * Set based implementation of the {@link AsyncConsistentMultimap}.
39 + * <p>
40 + * Note: this implementation does not allow null entries or duplicate entries.
41 + */
42 +public class AsyncConsistentSetMultimap
43 + extends AbstractResource<AsyncConsistentSetMultimap>
44 + implements AsyncConsistentMultimap<String, byte[]> {
45 +
46 + public AsyncConsistentSetMultimap(CopycatClient client,
47 + Properties properties) {
48 + super(client, properties);
49 + }
50 +
51 + @Override
52 + public CompletableFuture<AsyncConsistentSetMultimap> open() {
53 + return super.open();
54 + //TODO
55 + }
56 +
57 + @Override
58 + public CompletableFuture<Integer> size() {
59 + return submit(new Size());
60 + }
61 +
62 + @Override
63 + public CompletableFuture<Boolean> isEmpty() {
64 + return submit(new IsEmpty());
65 + }
66 +
67 + @Override
68 + public CompletableFuture<Boolean> containsKey(String key) {
69 + return submit(new ContainsKey(key));
70 + }
71 +
72 + @Override
73 + public CompletableFuture<Boolean> containsValue(byte[] value) {
74 + return submit(new ContainsValue(value));
75 + }
76 +
77 + @Override
78 + public CompletableFuture<Boolean> containsEntry(String key, byte[] value) {
79 + return submit(new ContainsEntry(key, value));
80 + }
81 +
82 + @Override
83 + public CompletableFuture<Boolean> put(String key, byte[] value) {
84 + return submit(new UpdateAndGet(key, Lists.newArrayList(value),
85 + Lists.newArrayList(Match.NULL),
86 + Lists.newArrayList(Match.NULL)))
87 + .whenComplete((result, e) -> throwIfLocked(result.status()))
88 + .thenApply(result ->
89 + result.status() == MapEntryUpdateResult.Status.OK);
90 + }
91 +
92 + @Override
93 + public CompletableFuture<Boolean> remove(String key, byte[] value) {
94 + return submit(new UpdateAndGet(key, Lists.newArrayList(value),
95 + Lists.newArrayList(Match.ifValue(value)),
96 + Lists.newArrayList(Match.NULL)))
97 + .whenComplete((result, e) -> throwIfLocked(result.status()))
98 + .thenApply(result ->
99 + result.status() == MapEntryUpdateResult.Status.OK);
100 + }
101 +
102 + @Override
103 + public CompletableFuture<Boolean> removeAll(String key, Iterable<? extends byte[]> values) {
104 +
105 + throw new UnsupportedOperationException("This operation cannot be " +
106 + "used without support for " +
107 + "transactions.");
108 + }
109 +
110 + @Override
111 + public CompletableFuture<Versioned<Collection<byte[]>>> removeAll(String key) {
112 + return submit(new UpdateAndGet(key, null, null, null))
113 + .whenComplete((result, e) -> throwIfLocked(result.status()))
114 + .thenApply(result -> result.oldValue());
115 + }
116 +
117 + @Override
118 + public CompletableFuture<Boolean> putAll(String key, Iterable<? extends byte[]> values) {
119 + throw new UnsupportedOperationException("This operation cannot be " +
120 + "used without support for " +
121 + "transactions.");
122 + }
123 +
124 + @Override
125 + public CompletableFuture<Boolean> putAll(Multimap<? extends String, ? extends byte[]> multiMap) {
126 + throw new UnsupportedOperationException("This operation cannot be " +
127 + "used without support for " +
128 + "transactions.");
129 + }
130 +
131 + @Override
132 + public CompletableFuture<Collection<byte[]>> replaceValues(String key, Iterable<byte[]> values) {
133 + throw new UnsupportedOperationException("This operation cannot be " +
134 + "used without support for " +
135 + "transactions.");
136 + }
137 +
138 + @Override
139 + public CompletableFuture<Void> clear() {
140 + return submit(new AsyncConsistentMultimapCommands.Clear());
141 + }
142 +
143 + @Override
144 + public CompletableFuture<Collection<byte[]>> get(String key) {
145 + return submit(new Get());
146 + }
147 +
148 + @Override
149 + public CompletableFuture<Set<String>> keySet() {
150 + return submit(new KeySet());
151 + }
152 +
153 + @Override
154 + public CompletableFuture<Multiset<String>> keys() {
155 + return submit(new Keys());
156 + }
157 +
158 + @Override
159 + public CompletableFuture<Collection<byte[]>> values() {
160 + return submit(new Values());
161 + }
162 +
163 + @Override
164 + public CompletableFuture<Collection<Map.Entry<String, byte[]>>> entries() {
165 + return submit(new Entries());
166 + }
167 +
168 + @Override
169 + public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
170 + //TODO
171 + throw new UnsupportedOperationException("Expensive operation.");
172 + }
173 +
174 + @Override
175 + public String name() {
176 + return null;
177 + }
178 +
179 + /**
180 + * Helper to check if there was a lock based issue.
181 + * @param status the status of an update result
182 + */
183 + private void throwIfLocked(MapEntryUpdateResult.Status status) {
184 + if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
185 + throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
186 + }
187 + }
188 +}