Madan Jampani
Committed by Gerrit Code Review

PartitionManager + StoragePartition{Service,Client} implementation classes

Change-Id: I2125c5678c760e9ed9fc856d1f9ba2ac4e4a3496
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import java.util.concurrent.CompletableFuture;
19 +
20 +/**
21 + * Interface for types that can be asynchronously opened and closed.
22 + */
23 +public interface Managed<T> {
24 +
25 + /**
26 + * Opens the managed object.
27 + *
28 + * @return A completable future to be completed once the object has been opened.
29 + */
30 + CompletableFuture<Void> open();
31 +
32 + /**
33 + * Closes the managed object.
34 + *
35 + * @return A completable future to be completed once the object has been closed.
36 + */
37 + CompletableFuture<Void> close();
38 +
39 + /**
40 + * Return {@code true} if the managed object is open.
41 + * @return {@code true} if open
42 + */
43 + boolean isOpen();
44 +
45 + /**
46 + * Return {@code true} if the managed object is closed.
47 + * @return {@code true} if closed
48 + */
49 + boolean isClosed();
50 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.impl;
18 +
19 +import static org.slf4j.LoggerFactory.getLogger;
20 +
21 +import java.io.File;
22 +import java.util.Map;
23 +import java.util.Set;
24 +import java.util.concurrent.CompletableFuture;
25 +
26 +import org.apache.felix.scr.annotations.Activate;
27 +import org.apache.felix.scr.annotations.Component;
28 +import org.apache.felix.scr.annotations.Reference;
29 +import org.apache.felix.scr.annotations.ReferenceCardinality;
30 +import org.apache.felix.scr.annotations.Service;
31 +import org.onlab.util.Tools;
32 +import org.onosproject.cluster.ClusterMetadataService;
33 +import org.onosproject.cluster.ClusterService;
34 +import org.onosproject.cluster.NodeId;
35 +import org.onosproject.cluster.PartitionId;
36 +import org.onosproject.event.AbstractListenerManager;
37 +import org.onosproject.store.cluster.messaging.MessagingService;
38 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
39 +import org.onosproject.store.primitives.PartitionAdminService;
40 +import org.onosproject.store.primitives.PartitionEvent;
41 +import org.onosproject.store.primitives.PartitionEventListener;
42 +import org.onosproject.store.primitives.PartitionService;
43 +import org.slf4j.Logger;
44 +
45 +import com.google.common.collect.ImmutableSet;
46 +import com.google.common.collect.Maps;
47 +
48 +/**
49 + * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
50 + */
51 +@Component
52 +@Service
53 +public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
54 + implements PartitionService, PartitionAdminService {
55 +
56 + private final Logger log = getLogger(getClass());
57 +
58 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 + protected MessagingService messagingService;
60 +
61 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 + protected ClusterMetadataService metadataService;
63 +
64 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 + protected ClusterService clusterService;
66 +
67 + Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
68 +
69 + @Activate
70 + public void activate() {
71 + eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
72 +
73 + metadataService.getClusterMetadata()
74 + .getPartitions()
75 + .stream()
76 + .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
77 + messagingService,
78 + clusterService,
79 + CatalystSerializers.getSerializer(),
80 + new File(System.getProperty("karaf.data") + "/data/" + partition.getId()))));
81 +
82 + CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
83 + .stream()
84 + .map(StoragePartition::open)
85 + .toArray(CompletableFuture[]::new));
86 + openFuture.join();
87 + log.info("Started");
88 + }
89 +
90 + public void deactivate() {
91 + eventDispatcher.removeSink(PartitionEvent.class);
92 +
93 + CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
94 + .stream()
95 + .map(StoragePartition::close)
96 + .toArray(CompletableFuture[]::new));
97 + closeFuture.join();
98 + log.info("Stopped");
99 + }
100 +
101 + @Override
102 + public CompletableFuture<Void> leave(PartitionId partitionId) {
103 + // TODO: Implement
104 + return Tools.exceptionalFuture(new UnsupportedOperationException());
105 + }
106 +
107 + @Override
108 + public CompletableFuture<Void> join(PartitionId partitionId) {
109 + // TODO: Implement
110 + return Tools.exceptionalFuture(new UnsupportedOperationException());
111 + }
112 +
113 + @Override
114 + public int getNumberOfPartitions() {
115 + return partitions.size();
116 + }
117 +
118 + @Override
119 + public Set<PartitionId> getAllPartitionIds() {
120 + return partitions.keySet();
121 + }
122 +
123 + @Override
124 + public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
125 + return partitions.get(partitionId).client();
126 + }
127 +
128 + @Override
129 + public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
130 + StoragePartition partition = partitions.get(partitionId);
131 + return ImmutableSet.copyOf(partition.getMembers());
132 + }
133 +
134 + @Override
135 + public Set<NodeId> getActiveMembersMembers(PartitionId partitionId) {
136 + // TODO: This needs to query metadata to determine currently active
137 + // members of partition
138 + return getConfiguredMembers(partitionId);
139 + }
140 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import io.atomix.catalyst.serializer.Serializer;
19 +import io.atomix.catalyst.transport.Address;
20 +import io.atomix.resource.ResourceType;
21 +import io.atomix.variables.DistributedLong;
22 +
23 +import java.io.File;
24 +import java.util.Collection;
25 +import java.util.Optional;
26 +import java.util.concurrent.CompletableFuture;
27 +import java.util.concurrent.atomic.AtomicBoolean;
28 +
29 +import org.onosproject.cluster.ClusterService;
30 +import org.onosproject.cluster.ControllerNode;
31 +import org.onosproject.cluster.DefaultPartition;
32 +import org.onosproject.cluster.NodeId;
33 +import org.onosproject.cluster.Partition;
34 +import org.onosproject.store.cluster.messaging.MessagingService;
35 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
36 +
37 +import com.google.common.collect.Collections2;
38 +import com.google.common.collect.ImmutableSet;
39 +
40 +/**
41 + * Storage partition.
42 + */
43 +public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> {
44 +
45 + private final AtomicBoolean isOpened = new AtomicBoolean(false);
46 + private final AtomicBoolean isClosed = new AtomicBoolean(false);
47 + private final Serializer serializer;
48 + private final MessagingService messagingService;
49 + private final ClusterService clusterService;
50 + private final File logFolder;
51 + private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
52 + new ResourceType(DistributedLong.class),
53 + new ResourceType(AtomixConsistentMap.class));
54 +
55 + private NodeId localNodeId;
56 + private Optional<StoragePartitionServer> server = Optional.empty();
57 + private StoragePartitionClient client;
58 +
59 + public StoragePartition(Partition partition,
60 + MessagingService messagingService,
61 + ClusterService clusterService,
62 + Serializer serializer,
63 + File logFolder) {
64 + super(partition);
65 + this.messagingService = messagingService;
66 + this.clusterService = clusterService;
67 + this.localNodeId = clusterService.getLocalNode().id();
68 + this.serializer = serializer;
69 + this.logFolder = logFolder;
70 + }
71 +
72 + public StoragePartitionClient client() {
73 + return client;
74 + }
75 +
76 + @Override
77 + public CompletableFuture<Void> open() {
78 + return openServer().thenAccept(s -> server = Optional.of(s))
79 + .thenCompose(v-> openClient())
80 + .thenAccept(v -> isOpened.set(true))
81 + .thenApply(v -> null);
82 + }
83 +
84 + @Override
85 + public CompletableFuture<Void> close() {
86 + return closeClient().thenCompose(v -> closeServer())
87 + .thenAccept(v -> isClosed.set(true))
88 + .thenApply(v -> null);
89 + }
90 +
91 + public Collection<Address> getMemberAddresses() {
92 + return Collections2.transform(getMembers(), this::toAddress);
93 + }
94 +
95 + private CompletableFuture<StoragePartitionServer> openServer() {
96 + if (!getMembers().contains(localNodeId)) {
97 + return CompletableFuture.completedFuture(null);
98 + }
99 + StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
100 + this,
101 + serializer,
102 + () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
103 + getId(),
104 + messagingService),
105 + RESOURCE_TYPES,
106 + logFolder);
107 + return server.open().thenApply(v -> server);
108 + }
109 +
110 + private CompletableFuture<StoragePartitionClient> openClient() {
111 + client = new StoragePartitionClient(this,
112 + serializer,
113 + new CopycatTransport(CopycatTransport.Mode.CLIENT,
114 + getId(),
115 + messagingService),
116 + RESOURCE_TYPES);
117 + return client.open().thenApply(v -> client);
118 + }
119 +
120 + private CompletableFuture<Void> closeServer() {
121 + if (server.isPresent()) {
122 + return server.get().close();
123 + } else {
124 + return CompletableFuture.completedFuture(null);
125 + }
126 + }
127 +
128 + private CompletableFuture<Void> closeClient() {
129 + if (client != null) {
130 + return client.close();
131 + }
132 + return CompletableFuture.completedFuture(null);
133 + }
134 +
135 + private Address toAddress(NodeId nodeId) {
136 + ControllerNode node = clusterService.getNode(nodeId);
137 + return new Address(node.ip().toString(), node.tcpPort());
138 + }
139 +
140 + @Override
141 + public boolean isOpen() {
142 + return !isClosed.get() && isOpened.get();
143 + }
144 +
145 + @Override
146 + public boolean isClosed() {
147 + return isOpened.get() && isClosed.get();
148 + }
149 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import io.atomix.Atomix;
19 +import io.atomix.AtomixClient;
20 +import io.atomix.catalyst.transport.Transport;
21 +import io.atomix.resource.ResourceType;
22 +import io.atomix.variables.DistributedLong;
23 +
24 +import java.util.Collection;
25 +import java.util.concurrent.CompletableFuture;
26 +
27 +import org.onlab.util.HexString;
28 +import org.onosproject.store.primitives.DistributedPrimitiveCreator;
29 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
30 +import org.onosproject.store.primitives.resources.impl.AtomixCounter;
31 +import org.onosproject.store.serializers.KryoNamespaces;
32 +import org.onosproject.store.service.AsyncAtomicCounter;
33 +import org.onosproject.store.service.AsyncAtomicValue;
34 +import org.onosproject.store.service.AsyncConsistentMap;
35 +import org.onosproject.store.service.AsyncDistributedSet;
36 +import org.onosproject.store.service.AsyncLeaderElector;
37 +import org.onosproject.store.service.DistributedQueue;
38 +import org.onosproject.store.service.Serializer;
39 +
40 +import com.google.common.base.Supplier;
41 +import com.google.common.base.Suppliers;
42 +import com.google.common.collect.ImmutableSet;
43 +
44 +/**
45 + * StoragePartition client.
46 + */
47 +public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
48 +
49 + private final StoragePartition partition;
50 + private final Transport transport;
51 + private final io.atomix.catalyst.serializer.Serializer serializer;
52 + private final Collection<ResourceType> resourceTypes;
53 + private Atomix client;
54 + private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
55 + private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
56 + Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
57 + Serializer.using(KryoNamespaces.BASIC)));
58 +
59 + public StoragePartitionClient(StoragePartition partition,
60 + io.atomix.catalyst.serializer.Serializer serializer,
61 + Transport transport,
62 + Collection<ResourceType> resourceTypes) {
63 + this.partition = partition;
64 + this.serializer = serializer;
65 + this.transport = transport;
66 + this.resourceTypes = ImmutableSet.copyOf(resourceTypes);
67 + }
68 +
69 + @Override
70 + public CompletableFuture<Void> open() {
71 + if (client != null && client.isOpen()) {
72 + return CompletableFuture.completedFuture(null);
73 + }
74 + synchronized (StoragePartitionClient.this) {
75 + client = AtomixClient.builder(partition.getMemberAddresses())
76 + .withSerializer(serializer.clone())
77 + .withResourceResolver(r -> {
78 + resourceTypes.forEach(r::register);
79 + })
80 + .withTransport(transport)
81 + .build();
82 + }
83 + return client.open().thenApply(v -> null);
84 + }
85 +
86 + @Override
87 + public CompletableFuture<Void> close() {
88 + return client != null ? client.close() : CompletableFuture.completedFuture(null);
89 + }
90 +
91 + @Override
92 + public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
93 + AsyncConsistentMap<String, byte[]> rawMap =
94 + new DelegatingAsyncConsistentMap<String, byte[]>(client.get(name, AtomixConsistentMap.class).join()) {
95 + @Override
96 + public String name() {
97 + return name;
98 + }
99 + };
100 + AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
101 + key -> HexString.toHexString(serializer.encode(key)),
102 + string -> serializer.decode(HexString.fromHexString(string)),
103 + value -> value == null ? null : serializer.encode(value),
104 + bytes -> serializer.decode(bytes));
105 +
106 + return DistributedPrimitives.newCachingMap(transcodedMap);
107 + }
108 +
109 + @Override
110 + public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
111 + return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
112 + }
113 +
114 + @Override
115 + public AsyncAtomicCounter newAsyncCounter(String name) {
116 + DistributedLong distributedLong = client.get(name, DistributedLong.class).join();
117 + return new AtomixCounter(name, distributedLong);
118 + }
119 +
120 + @Override
121 + public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
122 + return new DefaultAsyncAtomicValue<>(name,
123 + serializer,
124 + onosAtomicValuesMap.get());
125 + }
126 +
127 + @Override
128 + public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
129 + // TODO: Implement
130 + throw new UnsupportedOperationException();
131 + }
132 +
133 + @Override
134 + public AsyncLeaderElector newAsyncLeaderElector(String name) {
135 + throw new UnsupportedOperationException();
136 + }
137 +
138 + @Override
139 + public boolean isOpen() {
140 + return client.isOpen();
141 + }
142 +
143 + @Override
144 + public boolean isClosed() {
145 + return client.isClosed();
146 + }
147 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import io.atomix.catalyst.serializer.Serializer;
19 +import io.atomix.catalyst.transport.Address;
20 +import io.atomix.catalyst.transport.Transport;
21 +import io.atomix.copycat.server.CopycatServer;
22 +import io.atomix.copycat.server.storage.Storage;
23 +import io.atomix.copycat.server.storage.StorageLevel;
24 +import io.atomix.manager.state.ResourceManagerState;
25 +import io.atomix.resource.ResourceRegistry;
26 +import io.atomix.resource.ResourceType;
27 +import io.atomix.resource.ResourceTypeResolver;
28 +import io.atomix.resource.ServiceLoaderResourceResolver;
29 +
30 +import java.io.File;
31 +import java.util.Collection;
32 +import java.util.Set;
33 +import java.util.concurrent.CompletableFuture;
34 +import java.util.function.Supplier;
35 +
36 +import org.onosproject.cluster.NodeId;
37 +
38 +import com.google.common.collect.ImmutableSet;
39 +import com.google.common.collect.Sets;
40 +
41 +/**
42 + * {@link StoragePartition} server.
43 + */
44 +public class StoragePartitionServer implements Managed<StoragePartitionServer> {
45 +
46 + private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
47 + private final StoragePartition partition;
48 + private final Address localAddress;
49 + private final Supplier<Transport> transport;
50 + private final Serializer serializer;
51 + private final File dataFolder;
52 + private final Collection<ResourceType> resourceTypes;
53 + private CopycatServer server;
54 +
55 + public StoragePartitionServer(Address localAddress,
56 + StoragePartition partition,
57 + Serializer serializer,
58 + Supplier<Transport> transport,
59 + Collection<ResourceType> resourceTypes,
60 + File dataFolder) {
61 + this.partition = partition;
62 + this.localAddress = localAddress;
63 + this.serializer = serializer;
64 + this.transport = transport;
65 + this.resourceTypes = ImmutableSet.copyOf(resourceTypes);
66 + this.dataFolder = dataFolder;
67 + }
68 +
69 + @Override
70 + public CompletableFuture<Void> open() {
71 + CompletableFuture<CopycatServer> serverOpenFuture;
72 + if (partition.getMemberAddresses().contains(localAddress)) {
73 + if (server != null && server.isOpen()) {
74 + return CompletableFuture.completedFuture(null);
75 + }
76 + synchronized (this) {
77 + server = server();
78 + }
79 + serverOpenFuture = server.open();
80 + } else {
81 + serverOpenFuture = CompletableFuture.completedFuture(null);
82 + }
83 + return serverOpenFuture.thenApply(v -> null);
84 + }
85 +
86 + @Override
87 + public CompletableFuture<Void> close() {
88 + // We do not close the server because doing so is equivalent to this node
89 + // leaving the cluster and we don't want that here.
90 + // The Raft protocol should take care of servers leaving unannounced.
91 + return CompletableFuture.completedFuture(null);
92 + }
93 +
94 + private CopycatServer server() {
95 + ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
96 + ResourceRegistry registry = new ResourceRegistry();
97 + resourceTypes.forEach(registry::register);
98 + resourceResolver.resolve(registry);
99 + return CopycatServer.builder(localAddress, partition.getMemberAddresses())
100 + .withName("partition-" + partition.getId())
101 + .withSerializer(serializer.clone())
102 + .withTransport(transport.get())
103 + .withStateMachine(() -> new ResourceManagerState(registry))
104 + .withStorage(Storage.builder()
105 + // FIXME: StorageLevel should be DISK
106 + .withStorageLevel(StorageLevel.MEMORY)
107 + .withSerializer(serializer.clone())
108 + .withDirectory(dataFolder)
109 + .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
110 + .build())
111 + .build();
112 + }
113 +
114 + public Set<NodeId> configuredMembers() {
115 + return Sets.newHashSet(partition.getMembers());
116 + }
117 +
118 + @Override
119 + public boolean isOpen() {
120 + return server.isOpen();
121 + }
122 +
123 + @Override
124 + public boolean isClosed() {
125 + return server.isClosed();
126 + }
127 +}