Committed by
Gerrit Code Review
Added PartitionedAsyncLeaderElector that federates leader election contents acro…
…ss a collection of AsyncLeaderElectors Change-Id: I6ae220d4e4d2ed8ae1cd9060482f66f418ae0551
Showing
2 changed files
with
127 additions
and
1 deletions
... | @@ -34,6 +34,7 @@ import org.onosproject.store.service.AsyncLeaderElector; | ... | @@ -34,6 +34,7 @@ import org.onosproject.store.service.AsyncLeaderElector; |
34 | import org.onosproject.store.service.DistributedQueue; | 34 | import org.onosproject.store.service.DistributedQueue; |
35 | import org.onosproject.store.service.Serializer; | 35 | import org.onosproject.store.service.Serializer; |
36 | 36 | ||
37 | +import com.google.common.base.Charsets; | ||
37 | import com.google.common.collect.ImmutableSet; | 38 | import com.google.common.collect.ImmutableSet; |
38 | import com.google.common.collect.Lists; | 39 | import com.google.common.collect.Lists; |
39 | import com.google.common.collect.Maps; | 40 | import com.google.common.collect.Maps; |
... | @@ -93,7 +94,15 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv | ... | @@ -93,7 +94,15 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv |
93 | 94 | ||
94 | @Override | 95 | @Override |
95 | public AsyncLeaderElector newAsyncLeaderElector(String name) { | 96 | public AsyncLeaderElector newAsyncLeaderElector(String name) { |
96 | - return getCreator(name).newAsyncLeaderElector(name); | 97 | + checkNotNull(name); |
98 | + Map<PartitionId, AsyncLeaderElector> leaderElectors = | ||
99 | + Maps.transformValues(members, | ||
100 | + partition -> partition.newAsyncLeaderElector(name)); | ||
101 | + Hasher<String> hasher = topic -> { | ||
102 | + long hashCode = HashCode.fromBytes(topic.getBytes(Charsets.UTF_8)).asLong(); | ||
103 | + return sortedMemberPartitionIds.get(Hashing.consistentHash(hashCode, members.size())); | ||
104 | + }; | ||
105 | + return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher); | ||
97 | } | 106 | } |
98 | 107 | ||
99 | @Override | 108 | @Override | ... | ... |
1 | +/* | ||
2 | + * Copyright 2016 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.primitives.impl; | ||
17 | + | ||
18 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
19 | + | ||
20 | +import java.util.Collection; | ||
21 | +import java.util.Map; | ||
22 | +import java.util.TreeMap; | ||
23 | +import java.util.concurrent.CompletableFuture; | ||
24 | +import java.util.function.Consumer; | ||
25 | + | ||
26 | +import org.onosproject.cluster.Leadership; | ||
27 | +import org.onosproject.cluster.NodeId; | ||
28 | +import org.onosproject.cluster.PartitionId; | ||
29 | +import org.onosproject.event.Change; | ||
30 | +import org.onosproject.store.service.AsyncLeaderElector; | ||
31 | + | ||
32 | +import com.google.common.collect.Maps; | ||
33 | + | ||
34 | +/** | ||
35 | + * {@link AsyncLeaderElector} that has its topics partitioned horizontally across | ||
36 | + * several {@link AsyncLeaderElector leader electors}. | ||
37 | + */ | ||
38 | +public class PartitionedAsyncLeaderElector implements AsyncLeaderElector { | ||
39 | + | ||
40 | + private final String name; | ||
41 | + private final TreeMap<PartitionId, AsyncLeaderElector> partitions = Maps.newTreeMap(); | ||
42 | + private final Hasher<String> topicHasher; | ||
43 | + | ||
44 | + public PartitionedAsyncLeaderElector(String name, | ||
45 | + Map<PartitionId, AsyncLeaderElector> partitions, | ||
46 | + Hasher<String> topicHasher) { | ||
47 | + this.name = name; | ||
48 | + this.partitions.putAll(checkNotNull(partitions)); | ||
49 | + this.topicHasher = checkNotNull(topicHasher); | ||
50 | + } | ||
51 | + | ||
52 | + @Override | ||
53 | + public String name() { | ||
54 | + return name; | ||
55 | + } | ||
56 | + | ||
57 | + @Override | ||
58 | + public CompletableFuture<Leadership> run(String topic, NodeId nodeId) { | ||
59 | + return getLeaderElector(topic).run(topic, nodeId); | ||
60 | + } | ||
61 | + | ||
62 | + @Override | ||
63 | + public CompletableFuture<Void> withdraw(String topic) { | ||
64 | + return getLeaderElector(topic).withdraw(topic); | ||
65 | + } | ||
66 | + | ||
67 | + @Override | ||
68 | + public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) { | ||
69 | + return getLeaderElector(topic).anoint(topic, nodeId); | ||
70 | + } | ||
71 | + | ||
72 | + @Override | ||
73 | + public CompletableFuture<Leadership> getLeadership(String topic) { | ||
74 | + return getLeaderElector(topic).getLeadership(topic); | ||
75 | + } | ||
76 | + | ||
77 | + @Override | ||
78 | + public CompletableFuture<Map<String, Leadership>> getLeaderships() { | ||
79 | + Map<String, Leadership> leaderships = Maps.newConcurrentMap(); | ||
80 | + return CompletableFuture.allOf(getLeaderElectors().stream() | ||
81 | + .map(le -> le.getLeaderships() | ||
82 | + .thenAccept(m -> leaderships.putAll(m))) | ||
83 | + .toArray(CompletableFuture[]::new)) | ||
84 | + .thenApply(v -> leaderships); | ||
85 | + } | ||
86 | + | ||
87 | + @Override | ||
88 | + public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) { | ||
89 | + return CompletableFuture.allOf(getLeaderElectors().stream() | ||
90 | + .map(map -> map.addChangeListener(listener)) | ||
91 | + .toArray(CompletableFuture[]::new)); | ||
92 | + } | ||
93 | + | ||
94 | + @Override | ||
95 | + public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) { | ||
96 | + return CompletableFuture.allOf(getLeaderElectors().stream() | ||
97 | + .map(map -> map.removeChangeListener(listener)) | ||
98 | + .toArray(CompletableFuture[]::new)); | ||
99 | + } | ||
100 | + | ||
101 | + /** | ||
102 | + * Returns the leaderElector (partition) to which the specified topic maps. | ||
103 | + * @param topic topic name | ||
104 | + * @return AsyncLeaderElector to which topic maps | ||
105 | + */ | ||
106 | + private AsyncLeaderElector getLeaderElector(String topic) { | ||
107 | + return partitions.get(topicHasher.hash(topic)); | ||
108 | + } | ||
109 | + | ||
110 | + /** | ||
111 | + * Returns all the constituent leader electors. | ||
112 | + * @return collection of leader electors. | ||
113 | + */ | ||
114 | + private Collection<AsyncLeaderElector> getLeaderElectors() { | ||
115 | + return partitions.values(); | ||
116 | + } | ||
117 | +} | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
-
Please register or login to post a comment