Madan Jampani
Committed by Gerrit Code Review

Method name refactor in DistributedPrimitive + Builder for AsyncLeaderElector

Change-Id: I59be6e66665c0b12d02106bd5c722e9fa38dd7a1
Showing 28 changed files with 276 additions and 41 deletions
......@@ -21,6 +21,7 @@ import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.StorageService;
......@@ -62,4 +63,9 @@ public class VtnStorageServiceAdapter implements StorageService {
public TransactionContextBuilder transactionContextBuilder() {
return null;
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Synchronous;
/**
* Default implementation for a {@code LeaderElector} backed by a {@link AsyncLeaderElector}.
*/
public class DefaultLeaderElector extends Synchronous<AsyncLeaderElector> implements LeaderElector {
private final AsyncLeaderElector asyncElector;
private final long operationTimeoutMillis;
public DefaultLeaderElector(AsyncLeaderElector asyncElector, long operationTimeoutMillis) {
super(asyncElector);
this.asyncElector = asyncElector;
this.operationTimeoutMillis = operationTimeoutMillis;
}
@Override
public Leadership run(String topic, NodeId nodeId) {
return complete(asyncElector.run(topic, nodeId));
}
@Override
public void withdraw(String topic) {
complete(asyncElector.withdraw(topic));
}
@Override
public boolean anoint(String topic, NodeId nodeId) {
return complete(asyncElector.anoint(topic, nodeId));
}
@Override
public Leadership getLeadership(String topic) {
return complete(asyncElector.getLeadership(topic));
}
@Override
public Map<String, Leadership> getLeaderships() {
return complete(asyncElector.getLeaderships());
}
@Override
public void addChangeListener(Consumer<Change<Leadership>> consumer) {
complete(asyncElector.addChangeListener(consumer));
}
@Override
public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
complete(asyncElector.removeChangeListener(consumer));
}
private <T> T complete(CompletableFuture<T> future) {
try {
return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageException.Interrupted();
} catch (TimeoutException e) {
throw new StorageException.Timeout();
} catch (ExecutionException e) {
throw new StorageException(e.getCause());
}
}
}
......@@ -25,7 +25,7 @@ import org.onosproject.store.primitives.DefaultAtomicCounter;
public interface AsyncAtomicCounter extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.COUNTER;
}
......
......@@ -32,7 +32,7 @@ import org.onosproject.store.primitives.DefaultAtomicValue;
public interface AsyncAtomicValue<V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.VALUE;
}
......
......@@ -55,7 +55,7 @@ import org.onosproject.store.primitives.TransactionId;
public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.CONSISTENT_MAP;
}
......
......@@ -33,7 +33,7 @@ import org.onosproject.store.primitives.DefaultDistributedSet;
public interface AsyncDistributedSet<E> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.SET;
}
......
......@@ -22,6 +22,7 @@ import java.util.function.Consumer;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.DefaultLeaderElector;
/**
* Distributed mutual exclusion primitive.
......@@ -45,7 +46,7 @@ import org.onosproject.event.Change;
public interface AsyncLeaderElector extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.LEADER_ELECTOR;
}
......@@ -102,4 +103,23 @@ public interface AsyncLeaderElector extends DistributedPrimitive {
* @return CompletableFuture that is completed when the operation completes
*/
CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer);
/**
* Returns a new {@link LeaderElector} that is backed by this instance.
*
* @param timeoutMillis timeout duration for the returned LeaderElector operations
* @return new {@code LeaderElector} instance
*/
default LeaderElector asLeaderElector(long timeoutMillis) {
return new DefaultLeaderElector(this, timeoutMillis);
}
/**
* Returns a new {@link LeaderElector} that is backed by this instance and with a default operation timeout.
*
* @return new {@code LeaderElector} instance
*/
default LeaderElector asLeaderElector() {
return asLeaderElector(DEFAULT_OPERTATION_TIMEOUT_MILLIS);
}
}
......
......@@ -21,7 +21,7 @@ package org.onosproject.store.service;
public interface AtomicCounter extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.COUNTER;
}
......
......@@ -81,7 +81,7 @@ public interface DistributedPrimitive {
* Returns the type of primitive.
* @return primitive type
*/
Type type();
Type primitiveType();
/**
* Returns the application owning this primitive.
......
......@@ -42,7 +42,7 @@ import java.util.function.BiFunction;
public interface EventuallyConsistentMap<K, V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.EVENTUALLY_CONSISTENT_MAP;
}
......
......@@ -29,7 +29,7 @@ import org.onosproject.event.Change;
public interface LeaderElector extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.LEADER_ELECTOR;
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for constructing new {@link AsyncLeaderElector} instances.
*/
public abstract class LeaderElectorBuilder
extends DistributedPrimitiveBuilder<LeaderElectorBuilder, AsyncLeaderElector> {
public LeaderElectorBuilder() {
super(DistributedPrimitive.Type.LEADER_ELECTOR);
}
}
......@@ -21,6 +21,7 @@ import java.util.function.Function;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
......@@ -58,6 +59,14 @@ public class MapTransaction<K, V> {
return updates;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transactionId", transactionId)
.add("updates", updates)
.toString();
}
/**
* Maps this instance to another {@code MapTransaction} with different key and value types.
*
......
......@@ -75,6 +75,13 @@ public interface StorageService {
<V> AtomicValueBuilder<V> atomicValueBuilder();
/**
* Creates a new LeaderElectorBuilder.
*
* @return leader elector builder
*/
LeaderElectorBuilder leaderElectorBuilder();
/**
* Creates a new transaction context builder.
*
* @return a builder for a transaction context.
......
......@@ -37,8 +37,8 @@ public abstract class Synchronous<T extends DistributedPrimitive> implements Dis
}
@Override
public Type type() {
return primitive.type();
public Type primitiveType() {
return primitive.primitiveType();
}
@Override
......
......@@ -36,7 +36,7 @@ import org.onosproject.store.primitives.TransactionId;
public interface TransactionContext extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.TRANSACTION_CONTEXT;
}
......
......@@ -19,6 +19,7 @@ package org.onosproject.store.service;
import java.util.function.Function;
import org.joda.time.DateTime;
import org.onlab.util.ByteArraySizeHashPrinter;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
......@@ -140,7 +141,7 @@ public class Versioned<V> {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("value", value)
.add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
.add("version", version)
.add("creationTime", new DateTime(creationTime))
.toString();
......
......@@ -33,7 +33,7 @@ public class ConsistentMapAdapter<K, V> implements ConsistentMap<K, V> {
}
@Override
public DistributedPrimitive.Type type() {
public DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.CONSISTENT_MAP;
}
......
......@@ -34,7 +34,7 @@ public class EventuallyConsistentMapAdapter<K, V> implements EventuallyConsisten
}
@Override
public Type type() {
public Type primitiveType() {
return Type.EVENTUALLY_CONSISTENT_MAP;
}
......
......@@ -53,4 +53,9 @@ public class StorageServiceAdapter implements StorageService {
public TransactionContextBuilder transactionContextBuilder() {
return null;
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}
}
......
......@@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -32,6 +33,7 @@ import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
......@@ -71,18 +73,23 @@ public class CopycatTransportServer implements Server {
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
int senderPort = sender.port();
Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
AtomicBoolean newConnection = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnection.set(true);
return new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
senderAddress,
messagingService,
getOrCreateContext(context));
try {
InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
int senderPort = sender.port();
Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
return new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
senderAddress,
messagingService,
getOrCreateContext(context));
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
......
......@@ -76,6 +76,7 @@ import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
......@@ -360,6 +361,11 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
throw new UnsupportedOperationException();
}
@Override
public List<MapInfo> getMapInfo() {
List<MapInfo> maps = Lists.newArrayList();
maps.addAll(getMapInfo(inMemoryDatabase));
......
......@@ -54,29 +54,29 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
}
@Override
public T read(Class<T> clazz, BufferInput<?> input,
io.atomix.catalyst.serializer.Serializer serializer) {
int size = input.readInt();
byte[] payload = new byte[size];
input.read(payload);
public void write(T object, BufferOutput<?> buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
try {
return this.serializer.decode(payload);
byte[] payload = this.serializer.encode(object);
buffer.writeInt(payload.length);
buffer.write(payload);
} catch (Exception e) {
log.warn("Failed to deserialize as type {}", clazz, e);
Throwables.propagate(e);
return null;
log.warn("Failed to serialize {}", object, e);
}
}
@Override
public void write(T object, BufferOutput<?> output,
io.atomix.catalyst.serializer.Serializer serializer) {
public T read(Class<T> type, BufferInput<?> buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
byte[] payload = this.serializer.encode(object);
output.writeInt(payload.length);
output.write(payload);
byte[] payload = new byte[size];
buffer.read(payload);
return this.serializer.decode(payload);
} catch (Exception e) {
log.warn("Failed to serialize {}", object, e);
log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
Throwables.propagate(e);
return null;
}
}
}
......
......@@ -117,7 +117,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
}
@Override
public DistributedPrimitive.Type type() {
public DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.QUEUE;
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.LeaderElectorBuilder;
/**
* Default implementation of {@code LeaderElectorBuilder}.
*/
public class DefaultLeaderElectorBuilder extends LeaderElectorBuilder {
private final DistributedPrimitiveCreator base;
private final DistributedPrimitiveCreator federated;
public DefaultLeaderElectorBuilder(DistributedPrimitiveCreator base, DistributedPrimitiveCreator federated) {
this.base = base;
this.federated = federated;
}
@Override
public AsyncLeaderElector build() {
DistributedPrimitiveCreator creator = partitionsDisabled() ? base : federated;
return creator.newAsyncLeaderElector(name());
}
}
......@@ -33,6 +33,7 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.collect.Collections2;
......@@ -51,6 +52,7 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
private final File logFolder;
private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
private NodeId localNodeId;
......
......@@ -29,6 +29,7 @@ import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
......@@ -133,7 +134,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
throw new UnsupportedOperationException();
return client.get(name, AtomixLeaderElector.class).join();
}
@Override
......
......@@ -40,6 +40,7 @@ import org.onlab.util.Match;
import org.onosproject.app.ApplicationState;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.NodeId;
......@@ -49,6 +50,7 @@ import org.onosproject.core.DefaultApplication;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.Version;
import org.onosproject.event.Change;
import org.onosproject.incubator.net.domain.IntentDomainId;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.Annotations;
......@@ -204,8 +206,10 @@ import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
import org.onosproject.net.resource.link.MplsLabelResourceRequest;
import org.onosproject.security.Permission;
import org.onosproject.store.Timestamp;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Versioned;
......@@ -327,6 +331,8 @@ public final class KryoNamespaces {
Link.Type.class,
Link.State.class,
Timestamp.class,
Change.class,
Leader.class,
Leadership.class,
LeadershipEvent.class,
LeadershipEvent.Type.class,
......@@ -493,6 +499,9 @@ public final class KryoNamespaces {
.register(ExtensionSelectorType.class)
.register(ExtensionTreatmentType.class)
.register(TransactionId.class)
.register(MapTransaction.class)
.register(MapUpdate.class)
.register(MapUpdate.Type.class)
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)
......