Madan Jampani

StorageManager with associated new primitive builders

Change-Id: Ic5d7048cda5ea5e7e86df2d2390c16eea850cd83
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.cluster.impl;
17 +
18 +import static org.slf4j.LoggerFactory.getLogger;
19 +
20 +import java.util.Map;
21 +import java.util.Objects;
22 +import java.util.function.Consumer;
23 +
24 +import org.apache.felix.scr.annotations.Activate;
25 +import org.apache.felix.scr.annotations.Component;
26 +import org.apache.felix.scr.annotations.Deactivate;
27 +import org.apache.felix.scr.annotations.Reference;
28 +import org.apache.felix.scr.annotations.ReferenceCardinality;
29 +import org.apache.felix.scr.annotations.Service;
30 +import org.onlab.util.Tools;
31 +import org.onosproject.cluster.ClusterService;
32 +import org.onosproject.cluster.Leadership;
33 +import org.onosproject.cluster.LeadershipEvent;
34 +import org.onosproject.cluster.LeadershipStore;
35 +import org.onosproject.cluster.LeadershipStoreDelegate;
36 +import org.onosproject.cluster.NodeId;
37 +import org.onosproject.event.Change;
38 +import org.onosproject.store.AbstractStore;
39 +import org.onosproject.store.service.LeaderElector;
40 +import org.onosproject.store.service.StorageException;
41 +import org.onosproject.store.service.StorageService;
42 +import org.slf4j.Logger;
43 +
44 +import com.google.common.collect.ImmutableMap;
45 +import com.google.common.collect.Maps;
46 +
47 +/**
48 + * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
49 + * primitive.
50 + */
51 +@Service
52 +@Component(immediate = true, enabled = false)
53 +public class NewDistributedLeadershipStore
54 + extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
55 + implements LeadershipStore {
56 +
57 + private final Logger log = getLogger(getClass());
58 +
59 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 + protected ClusterService clusterService;
61 +
62 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 + protected StorageService storageService;
64 +
65 + private NodeId localNodeId;
66 + private LeaderElector leaderElector;
67 + private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
68 +
69 + private static final int MAX_RETRIES = 10;
70 + private static final int MAX_DELAY_MILLIS_BETWEEN_RETRIES = 100;
71 +
72 + private final Consumer<Change<Leadership>> leadershipChangeListener =
73 + change -> {
74 + Leadership oldValue = change.oldValue();
75 + Leadership newValue = change.newValue();
76 + leaderBoard.put(newValue.topic(), newValue);
77 + boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
78 + boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
79 + LeadershipEvent.Type eventType = null;
80 + if (leaderChanged && candidatesChanged) {
81 + eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
82 + }
83 + if (leaderChanged && !candidatesChanged) {
84 + eventType = LeadershipEvent.Type.LEADER_CHANGED;
85 + }
86 + if (!leaderChanged && candidatesChanged) {
87 + eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
88 + }
89 + notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
90 + };
91 +
92 + @Activate
93 + public void activate() {
94 + localNodeId = clusterService.getLocalNode().id();
95 + leaderElector = storageService.leaderElectorBuilder()
96 + .withName("onos-leadership-elections")
97 + .build()
98 + .asLeaderElector();
99 + leaderElector.addChangeListener(leadershipChangeListener);
100 + leaderBoard.putAll(getLeaderships());
101 + log.info("Started");
102 + }
103 +
104 + @Deactivate
105 + public void deactivate() {
106 + leaderElector.removeChangeListener(leadershipChangeListener);
107 + log.info("Stopped");
108 + }
109 +
110 + @Override
111 + public Leadership addRegistration(String topic) {
112 + return Tools.retryable(() -> leaderElector.run(topic, localNodeId),
113 + StorageException.class,
114 + MAX_RETRIES,
115 + MAX_DELAY_MILLIS_BETWEEN_RETRIES).get();
116 + }
117 +
118 + @Override
119 + public void removeRegistration(String topic) {
120 + Tools.retryable(() -> {
121 + leaderElector.withdraw(topic);
122 + return null;
123 + },
124 + StorageException.class,
125 + MAX_RETRIES,
126 + MAX_DELAY_MILLIS_BETWEEN_RETRIES).get();
127 + }
128 +
129 + @Override
130 + public void removeRegistration(NodeId nodeId) {
131 + // TODO
132 + throw new UnsupportedOperationException();
133 + }
134 +
135 + @Override
136 + public boolean moveLeadership(String topic, NodeId toNodeId) {
137 + return leaderElector.anoint(topic, toNodeId);
138 + }
139 +
140 + @Override
141 + public boolean makeTopCandidate(String topic, NodeId nodeId) {
142 + // TODO
143 + throw new UnsupportedOperationException();
144 + }
145 +
146 + @Override
147 + public Leadership getLeadership(String topic) {
148 + return leaderBoard.get(topic);
149 + }
150 +
151 + @Override
152 + public Map<String, Leadership> getLeaderships() {
153 + return ImmutableMap.copyOf(leaderBoard);
154 + }
155 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
19 +import org.onosproject.store.service.AsyncAtomicCounter;
20 +import org.onosproject.store.service.AtomicCounterBuilder;
21 +
22 +/**
23 + * Default implementation of AtomicCounterBuilder.
24 + */
25 +public class NewDefaultAtomicCounterBuilder extends AtomicCounterBuilder {
26 +
27 + private final DistributedPrimitiveCreator base;
28 + private final DistributedPrimitiveCreator federated;
29 +
30 + public NewDefaultAtomicCounterBuilder(DistributedPrimitiveCreator base, DistributedPrimitiveCreator federated) {
31 + this.base = base;
32 + this.federated = federated;
33 + }
34 +
35 + @Override
36 + public AsyncAtomicCounter build() {
37 + DistributedPrimitiveCreator creator = partitionsDisabled() ? base : federated;
38 + return creator.newAsyncCounter(name());
39 + }
40 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
19 +import org.onosproject.store.service.AsyncConsistentMap;
20 +import org.onosproject.store.service.ConsistentMap;
21 +import org.onosproject.store.service.ConsistentMapBuilder;
22 +
23 +/**
24 + * Default {@link AsyncConsistentMap} builder.
25 + *
26 + * @param <K> type for map key
27 + * @param <V> type for map value
28 + */
29 +public class NewDefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
30 +
31 + private final DistributedPrimitiveCreator base;
32 + private final DistributedPrimitiveCreator federated;
33 +
34 + public NewDefaultConsistentMapBuilder(DistributedPrimitiveCreator base, DistributedPrimitiveCreator federated) {
35 + this.base = base;
36 + this.federated = federated;
37 + }
38 +
39 + @Override
40 + public ConsistentMap<K, V> build() {
41 + return buildAsyncMap().asConsistentMap();
42 + }
43 +
44 + @Override
45 + public AsyncConsistentMap<K, V> buildAsyncMap() {
46 + DistributedPrimitiveCreator creator = partitionsDisabled() ? base : federated;
47 + AsyncConsistentMap<K, V> map = creator.newAsyncConsistentMap(name(), serializer());
48 + map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
49 + map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
50 + return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
51 + }
52 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import java.util.Set;
19 +import java.util.concurrent.atomic.AtomicBoolean;
20 +
21 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
22 +import org.onosproject.store.primitives.TransactionId;
23 +import org.onosproject.store.service.Serializer;
24 +import org.onosproject.store.service.TransactionContext;
25 +import org.onosproject.store.service.TransactionalMap;
26 +
27 +import com.google.common.collect.Sets;
28 +
29 +/**
30 + * Default implementation of transaction context.
31 + */
32 +public class NewDefaultTransactionContext implements TransactionContext {
33 +
34 + private final AtomicBoolean isOpen = new AtomicBoolean(false);
35 + private final DistributedPrimitiveCreator creator;
36 + private final TransactionId transactionId;
37 + private final TransactionCoordinator transactionCoordinator;
38 + private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
39 +
40 + public NewDefaultTransactionContext(TransactionId transactionId,
41 + DistributedPrimitiveCreator creator,
42 + TransactionCoordinator transactionCoordinator) {
43 + this.transactionId = transactionId;
44 + this.creator = creator;
45 + this.transactionCoordinator = transactionCoordinator;
46 + }
47 +
48 + @Override
49 + public String name() {
50 + return transactionId.toString();
51 + }
52 +
53 + @Override
54 + public TransactionId transactionId() {
55 + return transactionId;
56 + }
57 +
58 + @Override
59 + public boolean isOpen() {
60 + return isOpen.get();
61 + }
62 +
63 + @Override
64 + public void begin() {
65 + if (!isOpen.compareAndSet(false, true)) {
66 + throw new IllegalStateException("TransactionContext is already open");
67 + }
68 + }
69 +
70 + @Override
71 + public boolean commit() {
72 + transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
73 + return true;
74 + }
75 +
76 + @Override
77 + public void abort() {
78 + isOpen.set(false);
79 + }
80 +
81 + @Override
82 + public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
83 + Serializer serializer) {
84 + // FIXME: Do not create duplicates.
85 + DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
86 + creator.<K, V>newAsyncConsistentMap(mapName, serializer),
87 + this,
88 + serializer);
89 + txParticipants.add(txMap);
90 + return txMap;
91 + }
92 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
19 +import org.onosproject.store.primitives.TransactionId;
20 +import org.onosproject.store.service.TransactionContext;
21 +import org.onosproject.store.service.TransactionContextBuilder;
22 +/**
23 + * Default Transaction Context Builder.
24 + */
25 +public class NewDefaultTransactionContextBuilder extends TransactionContextBuilder {
26 +
27 + private final TransactionId transactionId;
28 + private final DistributedPrimitiveCreator base;
29 + private final DistributedPrimitiveCreator federated;
30 + private final TransactionCoordinator transactionCoordinator;
31 +
32 + public NewDefaultTransactionContextBuilder(TransactionId transactionId,
33 + DistributedPrimitiveCreator base,
34 + DistributedPrimitiveCreator federated,
35 + TransactionCoordinator transactionCoordinator) {
36 + this.transactionId = transactionId;
37 + this.base = base;
38 + this.federated = federated;
39 + this.transactionCoordinator = transactionCoordinator;
40 + }
41 +
42 + @Override
43 + public TransactionContext build() {
44 + return new NewDefaultTransactionContext(transactionId,
45 + this.partitionsDisabled() ? base : federated,
46 + transactionCoordinator);
47 + }
48 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import static org.slf4j.LoggerFactory.getLogger;
19 +
20 +import java.util.Collection;
21 +import java.util.List;
22 +import java.util.Map;
23 +import java.util.UUID;
24 +import java.util.function.Supplier;
25 +import java.util.stream.Collectors;
26 +
27 +import org.apache.commons.collections.ListUtils;
28 +import org.apache.felix.scr.annotations.Activate;
29 +import org.apache.felix.scr.annotations.Component;
30 +import org.apache.felix.scr.annotations.Deactivate;
31 +import org.apache.felix.scr.annotations.Reference;
32 +import org.apache.felix.scr.annotations.ReferenceCardinality;
33 +import org.apache.felix.scr.annotations.Service;
34 +import org.onosproject.cluster.ClusterService;
35 +import org.onosproject.cluster.PartitionId;
36 +import org.onosproject.persistence.PersistenceService;
37 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
39 +import org.onosproject.store.primitives.MapUpdate;
40 +import org.onosproject.store.primitives.PartitionService;
41 +import org.onosproject.store.primitives.TransactionId;
42 +import org.onosproject.store.serializers.KryoNamespaces;
43 +import org.onosproject.store.service.AsyncConsistentMap;
44 +import org.onosproject.store.service.AtomicCounterBuilder;
45 +import org.onosproject.store.service.AtomicValueBuilder;
46 +import org.onosproject.store.service.ConsistentMap;
47 +import org.onosproject.store.service.ConsistentMapBuilder;
48 +import org.onosproject.store.service.DistributedQueueBuilder;
49 +import org.onosproject.store.service.DistributedSetBuilder;
50 +import org.onosproject.store.service.EventuallyConsistentMapBuilder;
51 +import org.onosproject.store.service.LeaderElectorBuilder;
52 +import org.onosproject.store.service.MapInfo;
53 +import org.onosproject.store.service.PartitionInfo;
54 +import org.onosproject.store.service.Serializer;
55 +import org.onosproject.store.service.StorageAdminService;
56 +import org.onosproject.store.service.StorageService;
57 +import org.onosproject.store.service.TransactionContextBuilder;
58 +import org.slf4j.Logger;
59 +
60 +import com.google.common.collect.Lists;
61 +import com.google.common.collect.Maps;
62 +import com.google.common.util.concurrent.Futures;
63 +
64 +/**
65 + * Implementation for {@code StorageService} and {@code StorageAdminService}.
66 + */
67 +@Service
68 +@Component(immediate = true, enabled = false)
69 +public class StorageManager implements StorageService, StorageAdminService {
70 +
71 + private final Logger log = getLogger(getClass());
72 +
73 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 + protected ClusterService clusterService;
75 +
76 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 + protected ClusterCommunicationService clusterCommunicator;
78 +
79 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 + protected PersistenceService persistenceService;
81 +
82 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 + protected PartitionService partitionService;
84 +
85 + private final Supplier<TransactionId> transactionIdGenerator =
86 + () -> TransactionId.from(UUID.randomUUID().toString());
87 + private DistributedPrimitiveCreator basePrimitiveCreator;
88 + private DistributedPrimitiveCreator federatedPrimitiveCreator;
89 + private AsyncConsistentMap<TransactionId, Transaction.State> transactions;
90 + private TransactionCoordinator transactionCoordinator;
91 +
92 + @Activate
93 + public void actiavte() {
94 + basePrimitiveCreator = partitionService.getDistributedPrimitiveCreator(PartitionId.from(0));
95 + Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
96 + partitionService.getAllPartitionIds().stream()
97 + .filter(id -> !id.equals(PartitionId.from(0)))
98 + .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
99 + federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
100 + transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
101 + .withName("onos-transactions")
102 + .withSerializer(Serializer.using(KryoNamespaces.API,
103 + MapUpdate.class,
104 + MapUpdate.Type.class,
105 + Transaction.class,
106 + Transaction.State.class))
107 + .buildAsyncMap();
108 + transactionCoordinator = new TransactionCoordinator(transactions);
109 + log.info("Started");
110 + }
111 +
112 + @Deactivate
113 + public void deactivate() {
114 + log.info("Stopped");
115 + }
116 +
117 + @Override
118 + public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
119 + return new EventuallyConsistentMapBuilderImpl<>(clusterService,
120 + clusterCommunicator,
121 + persistenceService);
122 + }
123 +
124 + @Override
125 + public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
126 + return new NewDefaultConsistentMapBuilder<>(basePrimitiveCreator, federatedPrimitiveCreator);
127 + }
128 +
129 + @Override
130 + public <E> DistributedSetBuilder<E> setBuilder() {
131 + return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
132 + }
133 +
134 + @Override
135 + public <E> DistributedQueueBuilder<E> queueBuilder() {
136 + // TODO: implement
137 + throw new UnsupportedOperationException();
138 + }
139 +
140 + @Override
141 + public AtomicCounterBuilder atomicCounterBuilder() {
142 + return new NewDefaultAtomicCounterBuilder(basePrimitiveCreator, federatedPrimitiveCreator);
143 + }
144 +
145 + @Override
146 + public <V> AtomicValueBuilder<V> atomicValueBuilder() {
147 + Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
148 + () -> this.<String, byte[]>consistentMapBuilder()
149 + .withName("onos-atomic-values")
150 + .withMeteringDisabled()
151 + .withSerializer(Serializer.using(KryoNamespaces.BASIC));
152 + return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
153 + }
154 +
155 + @Override
156 + public TransactionContextBuilder transactionContextBuilder() {
157 + return new NewDefaultTransactionContextBuilder(transactionIdGenerator.get(),
158 + basePrimitiveCreator,
159 + federatedPrimitiveCreator,
160 + transactionCoordinator);
161 + }
162 +
163 + @Override
164 + public LeaderElectorBuilder leaderElectorBuilder() {
165 + return new DefaultLeaderElectorBuilder(basePrimitiveCreator,
166 + federatedPrimitiveCreator);
167 + }
168 +
169 + @Override
170 + public List<MapInfo> getMapInfo() {
171 + return ListUtils.union(listMapInfo(basePrimitiveCreator), listMapInfo(federatedPrimitiveCreator));
172 + }
173 +
174 + @Override
175 + public Map<String, Long> getCounters() {
176 + Map<String, Long> result = Maps.newHashMap();
177 + result.putAll(getInMemoryDatabaseCounters());
178 + result.putAll(getPartitionedDatabaseCounters());
179 + return result;
180 + }
181 +
182 + @Override
183 + public Map<String, Long> getInMemoryDatabaseCounters() {
184 + return getCounters(basePrimitiveCreator);
185 + }
186 +
187 + @Override
188 + public Map<String, Long> getPartitionedDatabaseCounters() {
189 + return getCounters(federatedPrimitiveCreator);
190 + }
191 +
192 + public Map<String, Long> getCounters(DistributedPrimitiveCreator creator) {
193 + Map<String, Long> counters = Maps.newConcurrentMap();
194 + creator.getAsyncAtomicCounterNames()
195 + .forEach(name -> counters.put(name, creator.newAsyncCounter(name).asAtomicCounter().get()));
196 + return counters;
197 + }
198 +
199 + @Override
200 + public List<PartitionInfo> getPartitionInfo() {
201 + return Lists.newArrayList();
202 + }
203 +
204 + @Override
205 + public Collection<TransactionId> getPendingTransactions() {
206 + return Futures.getUnchecked(transactions.keySet());
207 + }
208 +
209 + private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
210 + Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
211 + return creator.getAsyncConsistentMapNames()
212 + .stream()
213 + .map(name -> {
214 + ConsistentMap<String, byte[]> map =
215 + creator.<String, byte[]>newAsyncConsistentMap(name, serializer)
216 + .asConsistentMap();
217 + return new MapInfo(name, map.size());
218 + }).collect(Collectors.toList());
219 + }
220 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import java.util.Set;
19 +import java.util.concurrent.CompletableFuture;
20 +import java.util.stream.Collectors;
21 +
22 +import org.onlab.util.Tools;
23 +import org.onosproject.store.primitives.TransactionId;
24 +import org.onosproject.store.service.AsyncConsistentMap;
25 +
26 +/**
27 + * Coordinator for a two-phase commit protocol.
28 + */
29 +public class TransactionCoordinator {
30 +
31 + private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
32 +
33 + public TransactionCoordinator(AsyncConsistentMap<TransactionId, Transaction.State> transactions) {
34 + this.transactions = transactions;
35 + }
36 +
37 + /**
38 + * Commits a transaction.
39 + * @param transactionId transaction
40 + * @return future for commit result
41 + */
42 + CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
43 + if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
44 + return CompletableFuture.completedFuture(null);
45 + }
46 +
47 + return transactions.put(transactionId, Transaction.State.PREPARING)
48 + .thenCompose(v -> this.doPrepare(transactionParticipants))
49 + .thenCompose(result -> result
50 + ? transactions.put(transactionId, Transaction.State.COMMITTING)
51 + .thenCompose(v -> doCommit(transactionParticipants))
52 + .thenApply(v -> null)
53 + : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
54 + .thenCompose(v -> doRollback(transactionParticipants))
55 + .thenApply(v -> null))
56 + .thenCompose(v -> transactions.remove(transactionId).thenApply(u -> null))
57 + .thenApply(v -> null);
58 + }
59 +
60 + private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
61 + return Tools.allOf(transactionParticipants
62 + .stream()
63 + .map(TransactionParticipant::prepare)
64 + .collect(Collectors.toList()))
65 + .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
66 + }
67 +
68 + private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
69 + return CompletableFuture.allOf(transactionParticipants.stream()
70 + .map(p -> p.commit())
71 + .toArray(CompletableFuture[]::new));
72 + }
73 +
74 + private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
75 + return CompletableFuture.allOf(transactionParticipants.stream()
76 + .map(p -> p.rollback())
77 + .toArray(CompletableFuture[]::new));
78 + }
79 +}
...\ No newline at end of file ...\ No newline at end of file