Madan Jampani
Committed by Gerrit Code Review

Updates to ConsistentMap and LeaderElector state machines

Change-Id: I7734b253a56fef7300a8a094a3cfc8c1b45c2453
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
......@@ -35,6 +36,7 @@ import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.PartitionInfo;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
......@@ -44,6 +46,8 @@ import com.google.common.collect.Sets;
*/
public class StoragePartitionServer implements Managed<StoragePartitionServer> {
private final Logger log = getLogger(getClass());
private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
private final StoragePartition partition;
private final Address localAddress;
......@@ -81,7 +85,13 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
return serverOpenFuture.thenApply(v -> null);
return serverOpenFuture.whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started server for partition {}", partition.getId());
} else {
log.info("Failed to start server for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
}
@Override
......@@ -105,7 +115,6 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
.withStorage(Storage.builder()
// FIXME: StorageLevel should be DISK
.withStorageLevel(StorageLevel.MEMORY)
.withSerializer(serializer.clone())
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
.build())
......
......@@ -17,12 +17,12 @@ package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Consistency;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -49,7 +49,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
private static final String CHANGE_SUBJECT = "change";
public static final String CHANGE_SUBJECT = "changeEvents";
public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
super(client, options);
......@@ -68,14 +68,8 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.
});
}
private void handleEvent(MapEvent<String, byte[]> event) {
mapEventListeners.forEach(listener -> listener.event(event));
}
@Override
public AtomixConsistentMap with(Consistency consistency) {
super.with(consistency);
return this;
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
}
@Override
......
......@@ -18,7 +18,9 @@ package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
......@@ -514,4 +516,28 @@ public final class AtomixConsistentMapCommands {
.toString();
}
}
/**
* Map command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
@Override
public void resolve(SerializerRegistry registry) {
registry.register(ContainsKey.class, -761);
registry.register(ContainsValue.class, -762);
registry.register(Get.class, -763);
registry.register(EntrySet.class, -764);
registry.register(Values.class, -765);
registry.register(KeySet.class, -766);
registry.register(Clear.class, -767);
registry.register(IsEmpty.class, -768);
registry.register(Size.class, -769);
registry.register(Listen.class, -770);
registry.register(Unlisten.class, -771);
registry.register(TransactionPrepare.class, -772);
registry.register(TransactionCommit.class, -773);
registry.register(TransactionRollback.class, -774);
registry.register(UpdateAndGet.class, -775);
}
}
}
......
......@@ -30,6 +30,7 @@ import io.atomix.resource.ResourceStateMachine;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
......@@ -39,11 +40,26 @@ import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -52,13 +68,11 @@ import static com.google.common.base.Preconditions.checkState;
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements
SessionListener, Snapshottable {
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
.newHashMap();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
@Override
......@@ -74,36 +88,23 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
@Override
protected void configure(StateMachineExecutor executor) {
// Listeners
executor.register(AtomixConsistentMapCommands.Listen.class,
this::listen);
executor.register(AtomixConsistentMapCommands.Unlisten.class,
this::unlisten);
executor.register(Listen.class, this::listen);
executor.register(Unlisten.class, this::unlisten);
// Queries
executor.register(AtomixConsistentMapCommands.ContainsKey.class,
this::containsKey);
executor.register(AtomixConsistentMapCommands.ContainsValue.class,
this::containsValue);
executor.register(AtomixConsistentMapCommands.EntrySet.class,
this::entrySet);
executor.register(AtomixConsistentMapCommands.Get.class, this::get);
executor.register(AtomixConsistentMapCommands.IsEmpty.class,
this::isEmpty);
executor.register(AtomixConsistentMapCommands.KeySet.class,
this::keySet);
executor.register(AtomixConsistentMapCommands.Size.class, this::size);
executor.register(AtomixConsistentMapCommands.Values.class,
this::values);
executor.register(ContainsKey.class, this::containsKey);
executor.register(ContainsValue.class, this::containsValue);
executor.register(EntrySet.class, this::entrySet);
executor.register(Get.class, this::get);
executor.register(IsEmpty.class, this::isEmpty);
executor.register(KeySet.class, this::keySet);
executor.register(Size.class, this::size);
executor.register(Values.class, this::values);
// Commands
executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
this::updateAndGet);
executor.register(UpdateAndGet.class, this::updateAndGet);
executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
this::prepare);
executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
this::commit);
executor.register(
AtomixConsistentMapCommands.TransactionRollback.class,
this::rollback);
executor.register(TransactionPrepare.class, this::prepare);
executor.register(TransactionCommit.class, this::commit);
executor.register(TransactionRollback.class, this::rollback);
}
@Override
......@@ -120,12 +121,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a contains key commit.
*
* @param commit
* containsKey commit
* @param commit containsKey commit
* @return {@code true} if map contains key
*/
protected boolean containsKey(
Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
protected boolean containsKey(Commit<? extends ContainsKey> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key())) != null;
} finally {
......@@ -136,12 +135,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a contains value commit.
*
* @param commit
* containsValue commit
* @param commit containsValue commit
* @return {@code true} if map contains value
*/
protected boolean containsValue(
Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
try {
Match<byte[]> valueMatch = Match
.ifValue(commit.operation().value());
......@@ -159,8 +156,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
* get commit
* @return value mapped to key
*/
protected Versioned<byte[]> get(
Commit<? extends AtomixConsistentMapCommands.Get> commit) {
protected Versioned<byte[]> get(Commit<? extends Get> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key()));
} finally {
......@@ -171,11 +167,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a count commit.
*
* @param commit
* size commit
* @param commit size commit
* @return number of entries in map
*/
protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
protected int size(Commit<? extends Size> commit) {
try {
return mapEntries.size();
} finally {
......@@ -186,12 +181,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles an is empty commit.
*
* @param commit
* isEmpty commit
* @param commit isEmpty commit
* @return {@code true} if map is empty
*/
protected boolean isEmpty(
Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
try {
return mapEntries.isEmpty();
} finally {
......@@ -202,14 +195,12 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a keySet commit.
*
* @param commit
* keySet commit
* @param commit keySet commit
* @return set of keys in map
*/
protected Set<String> keySet(
Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
protected Set<String> keySet(Commit<? extends KeySet> commit) {
try {
return mapEntries.keySet();
return mapEntries.keySet().stream().collect(Collectors.toSet());
} finally {
commit.close();
}
......@@ -218,15 +209,12 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a values commit.
*
* @param commit
* values commit
* @param commit values commit
* @return collection of values in map
*/
protected Collection<Versioned<byte[]>> values(
Commit<? extends AtomixConsistentMapCommands.Values> commit) {
protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
try {
return mapEntries.values().stream().map(this::toVersioned)
.collect(Collectors.toList());
return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
} finally {
commit.close();
}
......@@ -239,8 +227,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
* entrySet commit
* @return set of map entries
*/
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
try {
return mapEntries
.entrySet()
......@@ -256,12 +243,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a update and get commit.
*
* @param commit
* updateAndGet commit
* @param commit updateAndGet commit
* @return update result
*/
protected MapEntryUpdateResult<String, byte[]> updateAndGet(
Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
String key = commit.operation().key();
MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
......@@ -286,8 +271,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
}
if (updateType == INSERT || updateType == UPDATE) {
mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
} else {
commit.close();
}
notify(new MapEvent<>("", key, newMapValue, oldMapValue));
publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
}
......@@ -295,12 +282,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a clear commit.
*
* @param commit
* clear commit
* @param commit clear commit
* @return clear result
*/
protected MapEntryUpdateResult.Status clear(
Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
try {
Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
.entrySet().iterator();
......@@ -310,7 +295,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
MapEntryValue value = entry.getValue();
Versioned<byte[]> removedValue = new Versioned<>(value.value(),
value.version());
notify(new MapEvent<>("", key, null, removedValue));
publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
value.discard();
iterator.remove();
}
......@@ -323,11 +308,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles a listen commit.
*
* @param commit
* listen commit
* @param commit listen commit
*/
protected void listen(
Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
protected void listen(Commit<? extends Listen> commit) {
Long sessionId = commit.session().id();
listeners.put(sessionId, commit);
commit.session()
......@@ -335,8 +318,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
state -> {
if (state == Session.State.CLOSED
|| state == Session.State.EXPIRED) {
Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
.remove(sessionId);
Commit<? extends Listen> listener = listeners.remove(sessionId);
if (listener != null) {
listener.close();
}
......@@ -347,14 +329,12 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
/**
* Handles an unlisten commit.
*
* @param commit
* unlisten commit
* @param commit unlisten commit
*/
protected void unlisten(
Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
Commit<? extends Unlisten> commit) {
try {
Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
.remove(commit.session());
Commit<? extends Listen> listener = listeners.remove(commit.session());
if (listener != null) {
listener.close();
}
......@@ -364,25 +344,12 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
}
/**
* Triggers a change event.
*
* @param value
* map event
*/
private void notify(MapEvent<String, byte[]> value) {
listeners.values().forEach(
commit -> commit.session().publish("change", value));
}
/**
* Handles an prepare commit.
*
* @param commit
* transaction prepare commit
* @param commit transaction prepare commit
* @return prepare result
*/
protected PrepareResult prepare(
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
boolean ok = false;
try {
MapTransaction<String, byte[]> transaction = commit.operation().transaction();
......@@ -403,8 +370,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
}
}
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
// modified keys as locked for updates.
pendingTransactions.put(transaction.transactionId(), commit);
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
......@@ -422,11 +388,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
* @param commit transaction commit commit
* @return commit result
*/
protected CommitResult commit(
Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
......@@ -437,8 +402,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
......@@ -448,11 +414,15 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
newValue = new TransactionalCommit(key,
versionCounter.incrementAndGet(), completer);
}
mapEntries.put(key, newValue);
// Notify map listeners
notify(new MapEvent<>("", key, toVersioned(newValue),
toVersioned(previousValue)));
eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
if (newValue != null) {
mapEntries.put(key, newValue);
}
if (previousValue != null) {
previousValue.discard();
}
}
publish(eventsToPublish);
return CommitResult.OK;
} finally {
commit.close();
......@@ -465,12 +435,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
* @param commit transaction rollback commit
* @return rollback result
*/
protected RollbackResult rollback(
Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
......@@ -486,8 +454,14 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
}
}
private MapEntryUpdateResult.Status validate(
AtomixConsistentMapCommands.UpdateAndGet update) {
/**
* Computes the update status that would result if the specified update were to applied to
* the state machine.
*
* @param update update
* @return status
*/
private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
MapEntryValue existingValue = mapEntries.get(update.key());
if (existingValue == null && update.value() == null) {
return MapEntryUpdateResult.Status.NOOP;
......@@ -504,9 +478,22 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
: MapEntryUpdateResult.Status.PRECONDITION_FAILED;
}
/**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
* @param value map entry value
* @return versioned instance
*/
private Versioned<byte[]> toVersioned(MapEntryValue value) {
return value == null ? null : new Versioned<>(value.value(),
value.version());
return value == null ? null : new Versioned<>(value.value(), value.version());
}
/**
* Publishes events to listeners.
*
* @param events list of map event to publish
*/
private void publish(List<MapEvent<String, byte[]>> events) {
listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
}
@Override
......@@ -529,8 +516,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
}
private void closeListener(Long sessionId) {
Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
.remove(sessionId);
Commit<? extends Listen> commit = listeners.remove(sessionId);
if (commit != null) {
commit.close();
}
......@@ -566,11 +552,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
*/
private class NonTransactionalCommit implements MapEntryValue {
private final long version;
private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
private final Commit<? extends UpdateAndGet> commit;
public NonTransactionalCommit(
long version,
Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
this.version = version;
this.commit = commit;
}
......@@ -598,12 +582,12 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
private class TransactionalCommit implements MapEntryValue {
private final String key;
private final long version;
private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
public TransactionalCommit(
String key,
long version,
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
this.key = key;
this.version = version;
this.completer = commit;
......
......@@ -17,7 +17,6 @@ package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Consistency;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
......@@ -26,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onlab.util.SharedExecutors;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
......@@ -37,8 +37,8 @@ import com.google.common.collect.Sets;
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
public class AtomixLeaderElector
extends Resource<AtomixLeaderElector, Resource.Options> implements AsyncLeaderElector {
public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
......@@ -62,13 +62,8 @@ public class AtomixLeaderElector
}
private void handleEvent(Change<Leadership> change) {
leadershipChangeListeners.forEach(l -> l.accept(change));
}
@Override
public AtomixLeaderElector with(Consistency consistency) {
super.with(consistency);
return this;
SharedExecutors.getSingleThreadExecutor().execute(() ->
leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
......
......@@ -21,13 +21,16 @@ import java.util.Set;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
......@@ -232,6 +235,18 @@ public final class AtomixLeaderElectorCommands {
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(topic);
buffer.writeString(nodeId.toString());
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = buffer.readString();
nodeId = new NodeId(buffer.readString());
}
}
/**
......@@ -263,6 +278,16 @@ public final class AtomixLeaderElectorCommands {
.add("topic", topic)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(topic);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = buffer.readString();
}
}
/**
......@@ -306,5 +331,34 @@ public final class AtomixLeaderElectorCommands {
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(topic);
buffer.writeString(nodeId.toString());
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = buffer.readString();
nodeId = new NodeId(buffer.readString());
}
}
/**
* Map command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
@Override
public void resolve(SerializerRegistry registry) {
registry.register(Run.class, -861);
registry.register(Withdraw.class, -862);
registry.register(Anoint.class, -863);
registry.register(GetAllLeaderships.class, -864);
registry.register(GetElectedTopics.class, -865);
registry.register(GetLeadership.class, -866);
registry.register(Listen.class, -867);
registry.register(Unlisten.class, -868);
}
}
}
......
......@@ -41,6 +41,14 @@ import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
......@@ -59,7 +67,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
private final Logger log = getLogger(getClass());
private Map<String, AtomicLong> termCounters = new HashMap<>();
private Map<String, ElectionState> elections = new HashMap<>();
private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
ElectionState.class,
Registration.class);
......@@ -67,16 +75,16 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
@Override
protected void configure(StateMachineExecutor executor) {
// Notification
executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
executor.register(Listen.class, this::listen);
executor.register(Unlisten.class, this::unlisten);
// Commands
executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
executor.register(Run.class, this::run);
executor.register(Withdraw.class, this::withdraw);
executor.register(Anoint.class, this::anoint);
// Queries
executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
executor.register(GetLeadership.class, this::leadership);
executor.register(GetAllLeaderships.class, this::allLeaderships);
executor.register(GetElectedTopics.class, this::electedTopics);
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
......@@ -96,7 +104,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
*
* @param commit listen commit
*/
public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
public void listen(Commit<? extends Listen> commit) {
if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
commit.close();
}
......@@ -107,9 +115,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
*
* @param commit unlisten commit
*/
public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
public void unlisten(Commit<? extends Unlisten> commit) {
try {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
Commit<? extends Listen> listener = listeners.remove(commit.session().id());
if (listener != null) {
listener.close();
}
......@@ -123,7 +131,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @param commit commit entry
* @return topic leader. If no previous leader existed this is the node that just entered the race.
*/
public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
public Leadership run(Commit<? extends Run> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
......@@ -154,7 +162,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
* @param commit withdraw commit
*/
public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
public void withdraw(Commit<? extends Withdraw> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
......@@ -174,7 +182,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @param commit anoint commit
* @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
*/
public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
public boolean anoint(Commit<? extends Anoint> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
......@@ -197,7 +205,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @param commit GetLeadership commit
* @return leader
*/
public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
public Leadership leadership(Commit<? extends GetLeadership> commit) {
String topic = commit.operation().topic();
try {
return leadership(topic);
......@@ -211,7 +219,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @param commit commit entry
* @return set of topics for which the node is the leader
*/
public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
try {
NodeId nodeId = commit.operation().nodeId();
return Maps.filterEntries(elections, e -> {
......@@ -228,8 +236,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @param commit GetAllLeaderships commit
* @return topic to leader mapping
*/
public Map<String, Leadership> allLeaderships(
Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
try {
return Maps.transformEntries(elections, (k, v) -> leadership(k));
} finally {
......