Madan Jampani
Committed by Gerrit Code Review

Implementation for StorageAdmin and PartitionAdmin APIs

Change-Id: I48cbfae6f410425294196884cdb2ce4705fa2c3d
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.primitives;
import java.util.Set;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
......@@ -84,4 +86,16 @@ public interface DistributedPrimitiveCreator {
* @return leader elector
*/
AsyncLeaderElector newAsyncLeaderElector(String name);
/**
* Returns the names of all created {@code AsyncConsistentMap} instances.
* @return set of {@code AsyncConsistentMap} names
*/
Set<String> getAsyncConsistentMapNames();
/**
* Returns the names of all created {@code AsyncAtomicCounter} instances.
* @return set of {@code AsyncAtomicCounter} names
*/
Set<String> getAsyncAtomicCounterNames();
}
\ No newline at end of file
......
......@@ -15,9 +15,11 @@
*/
package org.onosproject.store.primitives;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
/**
* Administrative interface for partition membership changes.
......@@ -25,6 +27,12 @@ import org.onosproject.cluster.PartitionId;
public interface PartitionAdminService {
/**
* Returns the {@link PartitionInfo information} for existing partitions.
* @return list of {@code PartitionInfo}
*/
List<PartitionInfo> partitionInfo();
/**
* Leaves a partition.
*
* @param partitionId partition identifier
......
......@@ -31,6 +31,7 @@ public interface StorageAdminService {
*
* @return list of partition information
*/
@Deprecated
List<PartitionInfo> getPartitionInfo();
/**
......@@ -38,6 +39,7 @@ public interface StorageAdminService {
*
* @return list of map information
*/
@Deprecated
List<MapInfo> getMapInfo();
/**
......@@ -47,6 +49,7 @@ public interface StorageAdminService {
*
* @return mapping from counter name to that counter's next value
*/
@Deprecated
Map<String, Long> getCounters();
/**
......@@ -54,6 +57,7 @@ public interface StorageAdminService {
*
* @return mapping from counter name to that counter's next value
*/
@Deprecated
Map<String, Long> getPartitionedDatabaseCounters();
/**
......@@ -61,6 +65,7 @@ public interface StorageAdminService {
*
* @return mapping from counter name to that counter's next value
*/
@Deprecated
Map<String, Long> getInMemoryDatabaseCounters();
/**
......
......@@ -22,6 +22,7 @@ import io.atomix.copycat.client.Query;
import io.atomix.manager.state.GetResource;
import io.atomix.manager.state.GetResourceKeys;
import io.atomix.resource.ResourceQuery;
import io.atomix.variables.state.ValueCommands;
import java.io.IOException;
import java.net.URL;
......@@ -93,6 +94,8 @@ public final class CatalystSerializers {
GetResource.class,
GetResourceKeys.class,
ResourceQuery.class,
ValueCommands.Get.class,
ValueCommands.Set.class,
Query.ConsistencyLevel.class));
// ONOS classes
serializer.register(Change.class, factory);
......@@ -115,6 +118,8 @@ public final class CatalystSerializers {
serializer.register(ResourceQuery.class, factory);
serializer.register(GetResource.class, factory);
serializer.register(GetResourceKeys.class, factory);
serializer.register(ValueCommands.Get.class, factory);
serializer.register(ValueCommands.Set.class, factory);
// ConsistentMap
serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
......
......@@ -15,7 +15,12 @@
*/
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
......@@ -28,6 +33,7 @@ import io.atomix.catalyst.serializer.TypeSerializerFactory;
*/
public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFactory {
private final Logger log = getLogger(getClass());
private final TypeSerializer<?> typeSerializer;
public DefaultCatalystTypeSerializerFactory(Serializer serializer) {
......@@ -53,15 +59,25 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
int size = input.readInt();
byte[] payload = new byte[size];
input.read(payload);
return this.serializer.decode(payload);
try {
return this.serializer.decode(payload);
} catch (Exception e) {
log.warn("Failed to deserialize as type {}", clazz, e);
Throwables.propagate(e);
return null;
}
}
@Override
public void write(T object, BufferOutput<?> output,
io.atomix.catalyst.serializer.Serializer serializer) {
byte[] payload = this.serializer.encode(object);
output.writeInt(payload.length);
output.write(payload);
try {
byte[] payload = this.serializer.encode(object);
output.writeInt(payload.length);
output.write(payload);
} catch (Exception e) {
log.warn("Failed to serialize {}", object, e);
}
}
}
}
......
......@@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
......@@ -33,8 +34,10 @@ import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Bytes;
......@@ -93,6 +96,24 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
return getCreator(name).newAsyncLeaderElector(name);
}
@Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
.map(DistributedPrimitiveCreator::getAsyncConsistentMapNames)
.reduce(Sets::union)
.orElse(ImmutableSet.of());
}
@Override
public Set<String> getAsyncAtomicCounterNames() {
return members.values()
.stream()
.map(DistributedPrimitiveCreator::getAsyncAtomicCounterNames)
.reduce(Sets::union)
.orElse(ImmutableSet.of());
}
/**
* Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
* @param name primitive name
......
......@@ -19,9 +19,12 @@ package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -40,6 +43,7 @@ import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
import org.onosproject.store.primitives.PartitionEventListener;
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionInfo;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
......@@ -137,4 +141,14 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
// members of partition
return getConfiguredMembers(partitionId);
}
@Override
public List<PartitionInfo> partitionInfo() {
return partitions.values()
.stream()
.map(StoragePartition::info)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
}
\ No newline at end of file
......
......@@ -33,6 +33,7 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
......@@ -146,4 +147,13 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
public boolean isClosed() {
return isOpened.get() && isClosed.get();
}
/**
* Returns the partition information if this partition is locally managed i.e.
* this node is a active member of the partition.
* @return partition info
*/
public Optional<PartitionInfo> info() {
return server.map(StoragePartitionServer::info);
}
}
......
......@@ -22,6 +22,7 @@ import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onlab.util.HexString;
......@@ -136,6 +137,16 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
@Override
public Set<String> getAsyncConsistentMapNames() {
return client.keys(AtomixConsistentMap.class).join();
}
@Override
public Set<String> getAsyncAtomicCounterNames() {
return client.keys(DistributedLong.class).join();
}
@Override
public boolean isOpen() {
return client.isOpen();
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import io.atomix.copycat.server.cluster.Member;
import java.util.Collection;
import java.util.Set;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
/**
* Operational details for a {@code StoragePartition}.
*/
public class StoragePartitionDetails {
private final PartitionId partitionId;
private final Set<Member> activeMembers;
private final Set<Member> configuredMembers;
private final Member leader;
private final long leaderTerm;
public StoragePartitionDetails(PartitionId partitionId,
Collection<Member> activeMembers,
Collection<Member> configuredMembers,
Member leader,
long leaderTerm) {
this.partitionId = partitionId;
this.activeMembers = ImmutableSet.copyOf(activeMembers);
this.configuredMembers = ImmutableSet.copyOf(configuredMembers);
this.leader = leader;
this.leaderTerm = leaderTerm;
}
/**
* Returns the set of active members.
* @return active members
*/
public Set<Member> activeMembers() {
return activeMembers;
}
/**
* Returns the set of configured members.
* @return configured members
*/
public Set<Member> configuredMembers() {
return configuredMembers;
}
/**
* Returns the partition leader.
* @return leader
*/
public Member leader() {
return leader;
}
/**
* Returns the partition leader term.
* @return leader term
*/
public long leaderTerm() {
return leaderTerm;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("activeMembers", activeMembers)
.add("configuredMembers", configuredMembers)
.add("leader", leader)
.add("leaderTerm", leaderTerm)
.toString();
}
/**
* Returns the details as an instance of {@code PartitionInfo}.
* @return partition info
*/
public PartitionInfo toPartitionInfo() {
return new PartitionInfo(partitionId.toString(),
leaderTerm,
Lists.transform(ImmutableList.copyOf(activeMembers), m -> m.address().toString()),
leader == null ? "none" : leader.address().toString());
}
}
......@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
......@@ -124,4 +125,16 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
public boolean isClosed() {
return server.isClosed();
}
/**
* Returns the partition information.
* @return partition info
*/
public PartitionInfo info() {
return new StoragePartitionDetails(partition.getId(),
server.cluster().members(),
server.cluster().members(),
server.cluster().leader(),
server.cluster().term()).toPartitionInfo();
}
}
......
......@@ -71,8 +71,7 @@ public class AtomixCounter implements AsyncAtomicCounter {
}
@Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue,
long updateValue) {
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
return distLong.compareAndSet(expectedValue, updateValue);
}
}
\ No newline at end of file
......