Madan Jampani

AtomixDocumentTree support for filtering notifications by DocumentPath

Change-Id: I3f4f616bc4f2e488e5433e44f72bcd121b564b0d
...@@ -17,10 +17,14 @@ ...@@ -17,10 +17,14 @@
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 import java.util.Arrays; 19 import java.util.Arrays;
20 +import java.util.Collection;
20 import java.util.Iterator; 21 import java.util.Iterator;
21 import java.util.List; 22 import java.util.List;
22 import java.util.Objects; 23 import java.util.Objects;
23 24
25 +import org.apache.commons.collections.CollectionUtils;
26 +import org.apache.commons.lang.StringUtils;
27 +
24 import com.google.common.base.Preconditions; 28 import com.google.common.base.Preconditions;
25 import com.google.common.collect.ImmutableList; 29 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.Lists; 30 import com.google.common.collect.Lists;
...@@ -103,6 +107,46 @@ public class DocumentPath implements Comparable<DocumentPath> { ...@@ -103,6 +107,46 @@ public class DocumentPath implements Comparable<DocumentPath> {
103 return ImmutableList.copyOf(pathElements); 107 return ImmutableList.copyOf(pathElements);
104 } 108 }
105 109
110 + /**
111 + * Returns if the specified path belongs to a direct ancestor of the node pointed at by this path.
112 + * <p>
113 + * Example: {@code root.a} is a direct ancestor of {@code r.a.b.c}; while {@code r.a.x} is not.
114 + *
115 + * @param other other path
116 + * @return {@code true} is yes; {@code false} otherwise.
117 + */
118 + public boolean isAncestorOf(DocumentPath other) {
119 + return !other.equals(this) && other.toString().startsWith(toString());
120 + }
121 +
122 + /**
123 + * Returns if the specified path is belongs to a subtree rooted this path.
124 + * <p>
125 + * Example: {@code root.a.b} and {@code root.a.b.c.d.e} are descendants of {@code r.a.b};
126 + * while {@code r.a.x.c} is not.
127 + *
128 + * @param other other path
129 + * @return {@code true} is yes; {@code false} otherwise.
130 + */
131 + public boolean isDescendentOf(DocumentPath other) {
132 + return other.equals(this) || other.isAncestorOf(this);
133 + }
134 +
135 + /**
136 + * Returns the path that points to the least common ancestor of the specified
137 + * collection of paths.
138 + * @param paths collection of path
139 + * @return path to least common ancestor
140 + */
141 + public static DocumentPath leastCommonAncestor(Collection<DocumentPath> paths) {
142 + if (CollectionUtils.isEmpty(paths)) {
143 + return null;
144 + }
145 + return DocumentPath.from(StringUtils.getCommonPrefix(paths.stream()
146 + .map(DocumentPath::toString)
147 + .toArray(String[]::new)));
148 + }
149 +
106 @Override 150 @Override
107 public int hashCode() { 151 public int hashCode() {
108 return Objects.hash(pathElements); 152 return Objects.hash(pathElements);
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.service;
18 +
19 +import static org.junit.Assert.assertEquals;
20 +import static org.junit.Assert.assertFalse;
21 +import static org.junit.Assert.assertTrue;
22 +
23 +import java.util.Arrays;
24 +
25 +import org.junit.Test;
26 +
27 +/**
28 + * Unit tests for {@link DocumentPath}.
29 + */
30 +public class DocumentPathTest {
31 +
32 + @Test
33 + public void testConstruction() {
34 + DocumentPath path = DocumentPath.from("root.a.b");
35 + assertEquals(path.pathElements(), Arrays.asList("root", "a", "b"));
36 + assertEquals(DocumentPath.from("root.a"), path.parent());
37 + }
38 +
39 + @Test
40 + public void testAncestry() {
41 + DocumentPath path1 = DocumentPath.from("root.a.b");
42 + DocumentPath path2 = DocumentPath.from("root.a.d");
43 + DocumentPath path3 = DocumentPath.from("root.a.b.c");
44 + DocumentPath lca = DocumentPath.leastCommonAncestor(Arrays.asList(path1, path2, path3));
45 + assertEquals(DocumentPath.from("root.a"), lca);
46 + assertTrue(path1.isAncestorOf(path3));
47 + assertFalse(path1.isAncestorOf(path2));
48 + assertTrue(path3.isDescendentOf(path3));
49 + assertTrue(path3.isDescendentOf(path1));
50 + assertFalse(path3.isDescendentOf(path2));
51 + }
52 +}
...@@ -33,11 +33,11 @@ import java.util.concurrent.Executor; ...@@ -33,11 +33,11 @@ import java.util.concurrent.Executor;
33 33
34 import org.onlab.util.Match; 34 import org.onlab.util.Match;
35 import org.onlab.util.Tools; 35 import org.onlab.util.Tools;
36 -import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
37 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear; 36 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
38 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get; 37 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
39 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren; 38 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
40 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen; 39 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
40 +import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
41 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update; 41 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
42 import org.onosproject.store.service.AsyncDocumentTree; 42 import org.onosproject.store.service.AsyncDocumentTree;
43 import org.onosproject.store.service.DocumentPath; 43 import org.onosproject.store.service.DocumentPath;
...@@ -56,7 +56,7 @@ import com.google.common.util.concurrent.MoreExecutors; ...@@ -56,7 +56,7 @@ import com.google.common.util.concurrent.MoreExecutors;
56 public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree> 56 public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
57 implements AsyncDocumentTree<byte[]> { 57 implements AsyncDocumentTree<byte[]> {
58 58
59 - private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>(); 59 + private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
60 public static final String CHANGE_SUBJECT = "changeEvents"; 60 public static final String CHANGE_SUBJECT = "changeEvents";
61 61
62 protected AtomixDocumentTree(CopycatClient client, Properties options) { 62 protected AtomixDocumentTree(CopycatClient client, Properties options) {
...@@ -184,21 +184,21 @@ public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree> ...@@ -184,21 +184,21 @@ public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
184 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) { 184 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
185 checkNotNull(path); 185 checkNotNull(path);
186 checkNotNull(listener); 186 checkNotNull(listener);
187 + InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
187 // TODO: Support API that takes an executor 188 // TODO: Support API that takes an executor
188 - if (isListening()) { 189 + if (!eventListeners.containsKey(listener)) {
189 - eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
190 - return CompletableFuture.completedFuture(null);
191 - } else {
192 return client.submit(new Listen(path)) 190 return client.submit(new Listen(path))
193 - .thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor())); 191 + .thenRun(() -> eventListeners.put(listener, internalListener));
194 } 192 }
193 + return CompletableFuture.completedFuture(null);
195 } 194 }
196 195
197 @Override 196 @Override
198 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) { 197 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
199 checkNotNull(listener); 198 checkNotNull(listener);
200 - if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) { 199 + InternalListener internalListener = eventListeners.remove(listener);
201 - return client.submit(new Unlisten()).thenApply(v -> null); 200 + if (internalListener != null && eventListeners.isEmpty()) {
201 + return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
202 } 202 }
203 return CompletableFuture.completedFuture(null); 203 return CompletableFuture.completedFuture(null);
204 } 204 }
...@@ -213,7 +213,26 @@ public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree> ...@@ -213,7 +213,26 @@ public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
213 } 213 }
214 214
215 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) { 215 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
216 - events.forEach(event -> 216 + events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
217 - eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event)))); 217 + }
218 +
219 + private class InternalListener implements DocumentTreeListener<byte[]> {
220 +
221 + private final DocumentPath path;
222 + private final DocumentTreeListener<byte[]> listener;
223 + private final Executor executor;
224 +
225 + public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
226 + this.path = path;
227 + this.listener = listener;
228 + this.executor = executor;
229 + }
230 +
231 + @Override
232 + public void event(DocumentTreeEvent<byte[]> event) {
233 + if (event.path().isDescendentOf(path)) {
234 + executor.execute(() -> listener.event(event));
235 + }
236 + }
218 } 237 }
219 } 238 }
......
...@@ -225,11 +225,10 @@ public class AtomixDocumentTreeCommands { ...@@ -225,11 +225,10 @@ public class AtomixDocumentTreeCommands {
225 } 225 }
226 226
227 @Override 227 @Override
228 - public void writeObject(BufferOutput<?> buffer, Serializer serializer) { 228 + public String toString() {
229 - } 229 + return MoreObjects.toStringHelper(getClass())
230 - 230 + .add("path", path())
231 - @Override 231 + .toString();
232 - public void readObject(BufferInput<?> buffer, Serializer serializer) {
233 } 232 }
234 } 233 }
235 234
...@@ -248,11 +247,10 @@ public class AtomixDocumentTreeCommands { ...@@ -248,11 +247,10 @@ public class AtomixDocumentTreeCommands {
248 } 247 }
249 248
250 @Override 249 @Override
251 - public void writeObject(BufferOutput<?> buffer, Serializer serializer) { 250 + public String toString() {
252 - } 251 + return MoreObjects.toStringHelper(getClass())
253 - 252 + .add("path", path())
254 - @Override 253 + .toString();
255 - public void readObject(BufferInput<?> buffer, Serializer serializer) {
256 } 254 }
257 } 255 }
258 256
......
...@@ -26,7 +26,10 @@ import io.atomix.copycat.server.storage.snapshot.SnapshotReader; ...@@ -26,7 +26,10 @@ import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
26 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter; 26 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
27 import io.atomix.resource.ResourceStateMachine; 27 import io.atomix.resource.ResourceStateMachine;
28 28
29 +import java.util.Arrays;
29 import java.util.HashMap; 30 import java.util.HashMap;
31 +import java.util.Iterator;
32 +import java.util.List;
30 import java.util.Map; 33 import java.util.Map;
31 import java.util.Optional; 34 import java.util.Optional;
32 import java.util.Properties; 35 import java.util.Properties;
...@@ -52,7 +55,7 @@ import org.onosproject.store.service.Versioned; ...@@ -52,7 +55,7 @@ import org.onosproject.store.service.Versioned;
52 import org.slf4j.Logger; 55 import org.slf4j.Logger;
53 56
54 import com.google.common.base.Throwables; 57 import com.google.common.base.Throwables;
55 -import com.google.common.collect.ImmutableList; 58 +import com.google.common.collect.Lists;
56 import com.google.common.collect.Maps; 59 import com.google.common.collect.Maps;
57 import com.google.common.collect.Queues; 60 import com.google.common.collect.Queues;
58 61
...@@ -64,7 +67,7 @@ public class AtomixDocumentTreeState ...@@ -64,7 +67,7 @@ public class AtomixDocumentTreeState
64 implements SessionListener, Snapshottable { 67 implements SessionListener, Snapshottable {
65 68
66 private final Logger log = getLogger(getClass()); 69 private final Logger log = getLogger(getClass());
67 - private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>(); 70 + private final Map<Long, SessionListenCommits> listeners = new HashMap<>();
68 private AtomicLong versionCounter = new AtomicLong(0); 71 private AtomicLong versionCounter = new AtomicLong(0);
69 private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet); 72 private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
70 73
...@@ -97,25 +100,23 @@ public class AtomixDocumentTreeState ...@@ -97,25 +100,23 @@ public class AtomixDocumentTreeState
97 100
98 protected void listen(Commit<? extends Listen> commit) { 101 protected void listen(Commit<? extends Listen> commit) {
99 Long sessionId = commit.session().id(); 102 Long sessionId = commit.session().id();
100 - if (listeners.putIfAbsent(sessionId, commit) != null) { 103 + listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits()).add(commit);
101 - commit.close();
102 - return;
103 - }
104 commit.session().onStateChange( 104 commit.session().onStateChange(
105 state -> { 105 state -> {
106 if (state == ServerSession.State.CLOSED 106 if (state == ServerSession.State.CLOSED
107 || state == ServerSession.State.EXPIRED) { 107 || state == ServerSession.State.EXPIRED) {
108 - Commit<? extends Listen> listener = listeners.remove(sessionId); 108 + closeListener(commit.session().id());
109 - if (listener != null) {
110 - listener.close();
111 - }
112 } 109 }
113 }); 110 });
114 } 111 }
115 112
116 protected void unlisten(Commit<? extends Unlisten> commit) { 113 protected void unlisten(Commit<? extends Unlisten> commit) {
114 + Long sessionId = commit.session().id();
117 try { 115 try {
118 - closeListener(commit.session().id()); 116 + SessionListenCommits listenCommits = listeners.get(sessionId);
117 + if (listenCommits != null) {
118 + listenCommits.remove(commit);
119 + }
119 } finally { 120 } finally {
120 commit.close(); 121 commit.close();
121 } 122 }
...@@ -261,10 +262,11 @@ public class AtomixDocumentTreeState ...@@ -261,10 +262,11 @@ public class AtomixDocumentTreeState
261 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED, 262 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
262 Optional.ofNullable(result.newValue()), 263 Optional.ofNullable(result.newValue()),
263 Optional.ofNullable(result.oldValue())); 264 Optional.ofNullable(result.oldValue()));
265 +
264 listeners.values() 266 listeners.values()
265 - .forEach(commit -> commit.session() 267 + .stream()
266 - .publish(AtomixDocumentTree.CHANGE_SUBJECT, 268 + .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
267 - ImmutableList.of(event))); 269 + .forEach(listener -> listener.publish(AtomixDocumentTree.CHANGE_SUBJECT, Arrays.asList(event)));
268 } 270 }
269 271
270 @Override 272 @Override
...@@ -287,9 +289,52 @@ public class AtomixDocumentTreeState ...@@ -287,9 +289,52 @@ public class AtomixDocumentTreeState
287 } 289 }
288 290
289 private void closeListener(Long sessionId) { 291 private void closeListener(Long sessionId) {
290 - Commit<? extends Listen> commit = listeners.remove(sessionId); 292 + SessionListenCommits listenCommits = listeners.remove(sessionId);
291 - if (commit != null) { 293 + if (listenCommits != null) {
292 - commit.close(); 294 + listenCommits.close();
295 + }
296 + }
297 +
298 + private class SessionListenCommits {
299 + private final List<Commit<? extends Listen>> commits = Lists.newArrayList();
300 + private DocumentPath leastCommonAncestorPath;
301 +
302 + public void add(Commit<? extends Listen> commit) {
303 + commits.add(commit);
304 + recomputeLeastCommonAncestor();
305 + }
306 +
307 + public void remove(Commit<? extends Unlisten> commit) {
308 + // Remove the first listen commit with path matching path in unlisten commit
309 + Iterator<Commit<? extends Listen>> iterator = commits.iterator();
310 + while (iterator.hasNext()) {
311 + Commit<? extends Listen> listenCommit = iterator.next();
312 + if (listenCommit.operation().path().equals(commit.operation().path())) {
313 + iterator.remove();
314 + listenCommit.close();
315 + }
316 + }
317 + recomputeLeastCommonAncestor();
318 + }
319 +
320 + public DocumentPath leastCommonAncestorPath() {
321 + return leastCommonAncestorPath;
322 + }
323 +
324 + public <M> void publish(String topic, M message) {
325 + commits.stream().findAny().ifPresent(commit -> commit.session().publish(topic, message));
326 + }
327 +
328 + public void close() {
329 + commits.forEach(Commit::close);
330 + commits.clear();
331 + leastCommonAncestorPath = null;
332 + }
333 +
334 + private void recomputeLeastCommonAncestor() {
335 + this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(commits.stream()
336 + .map(c -> c.operation().path())
337 + .collect(Collectors.toList()));
293 } 338 }
294 } 339 }
295 } 340 }
......
...@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; ...@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNull; 22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue; 23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail; 24 import static org.junit.Assert.fail;
25 +import io.atomix.AtomixClient;
25 import io.atomix.resource.ResourceType; 26 import io.atomix.resource.ResourceType;
26 27
27 import java.util.Map; 28 import java.util.Map;
...@@ -343,12 +344,53 @@ public class AtomixDocumentTreeTest extends AtomixTestBase { ...@@ -343,12 +344,53 @@ public class AtomixDocumentTreeTest extends AtomixTestBase {
343 assertArrayEquals("xy".getBytes(), event.newValue().get().value()); 344 assertArrayEquals("xy".getBytes(), event.newValue().get().value());
344 } 345 }
345 346
347 + @Test
348 + public void testFilteredNotifications() throws Throwable {
349 + AtomixClient client1 = createAtomixClient();
350 + AtomixClient client2 = createAtomixClient();
351 +
352 + String treeName = UUID.randomUUID().toString();
353 + AtomixDocumentTree tree1 = client1.getResource(treeName, AtomixDocumentTree.class).join();
354 + AtomixDocumentTree tree2 = client2.getResource(treeName, AtomixDocumentTree.class).join();
355 +
356 + TestEventListener listener1a = new TestEventListener(3);
357 + TestEventListener listener1ab = new TestEventListener(2);
358 + TestEventListener listener2abc = new TestEventListener(1);
359 +
360 + tree1.addListener(DocumentPath.from("root.a"), listener1a).join();
361 + tree1.addListener(DocumentPath.from("root.a.b"), listener1ab).join();
362 + tree2.addListener(DocumentPath.from("root.a.b.c"), listener2abc).join();
363 +
364 + tree1.createRecursive(DocumentPath.from("root.a.b.c"), "abc".getBytes()).join();
365 + DocumentTreeEvent<byte[]> event = listener1a.event();
366 + assertEquals(DocumentPath.from("root.a"), event.path());
367 + event = listener1a.event();
368 + assertEquals(DocumentPath.from("root.a.b"), event.path());
369 + event = listener1a.event();
370 + assertEquals(DocumentPath.from("root.a.b.c"), event.path());
371 + event = listener1ab.event();
372 + assertEquals(DocumentPath.from("root.a.b"), event.path());
373 + event = listener1ab.event();
374 + assertEquals(DocumentPath.from("root.a.b.c"), event.path());
375 + event = listener2abc.event();
376 + assertEquals(DocumentPath.from("root.a.b.c"), event.path());
377 + }
378 +
346 private static class TestEventListener implements DocumentTreeListener<byte[]> { 379 private static class TestEventListener implements DocumentTreeListener<byte[]> {
347 380
348 - private final BlockingQueue<DocumentTreeEvent<byte[]>> queue = new ArrayBlockingQueue<>(1); 381 + private final BlockingQueue<DocumentTreeEvent<byte[]>> queue;
382 +
383 + public TestEventListener() {
384 + this(1);
385 + }
386 +
387 + public TestEventListener(int maxEvents) {
388 + queue = new ArrayBlockingQueue<>(maxEvents);
389 + }
349 390
350 @Override 391 @Override
351 public void event(DocumentTreeEvent<byte[]> event) { 392 public void event(DocumentTreeEvent<byte[]> event) {
393 +
352 try { 394 try {
353 queue.put(event); 395 queue.put(event);
354 } catch (InterruptedException e) { 396 } catch (InterruptedException e) {
......