Madan Jampani
Committed by Gerrit Code Review

Support for reacting to underlying copycat client session state changes

Change-Id: If8af43f81963653da3584167d7a9813456ce3773
......@@ -18,9 +18,22 @@ package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.client.ConnectionStrategies;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.RecoveryStrategies;
import io.atomix.copycat.client.RetryStrategies;
import io.atomix.copycat.client.ServerSelectionStrategies;
import io.atomix.manager.ResourceClient;
import io.atomix.manager.state.ResourceManagerException;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.resource.ResourceType;
import io.atomix.resource.util.ResourceRegistry;
import io.atomix.variables.DistributedLong;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -53,6 +66,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
private Atomix client;
private CopycatClient copycatClient;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
......@@ -72,11 +86,12 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
return CompletableFuture.completedFuture(null);
}
synchronized (StoragePartitionClient.this) {
client = AtomixClient.builder(partition.getMemberAddresses())
.withResourceTypes(StoragePartition.RESOURCE_TYPES)
.withSerializer(serializer.clone())
.withTransport(transport)
.build();
copycatClient = newCopycatClient(partition.getMemberAddresses(),
transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
copycatClient.onStateChange(state -> log.info("Client state {}", state));
client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.open().whenComplete((r, e) -> {
if (e == null) {
......@@ -154,4 +169,30 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
public boolean isOpen() {
return client.isOpen();
}
private CopycatClient newCopycatClient(Collection<Address> members,
Transport transport,
io.atomix.catalyst.serializer.Serializer serializer,
Collection<ResourceType> resourceTypes) {
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
CopycatClient client = CopycatClient.builder(members)
.withServerSelectionStrategy(ServerSelectionStrategies.ANY)
.withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
.withRecoveryStrategy(RecoveryStrategies.RECOVER)
.withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
.withTransport(transport)
.withSerializer(serializer)
.withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
.build();
client.serializer().resolve(new ResourceManagerTypeResolver());
for (ResourceType type : registry.types()) {
try {
type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException(e);
}
}
return client;
}
}
......