Jonathan Hart

Modified GossipIntentStore to use EventuallyConsistentMaps.

All IntentStore operations are now implemented.

ONOS-858

Change-Id: I5081805b61c7e25e28707b90093cae12b5a4374b
......@@ -151,7 +151,7 @@ public interface EventuallyConsistentMap<K, V> {
*
* @param listener listener to register for events
*/
public void addListener(EventuallyConsistentMapListener listener);
public void addListener(EventuallyConsistentMapListener<K, V> listener);
/**
* Removes the specified listener from the map such that it will no longer
......@@ -159,7 +159,7 @@ public interface EventuallyConsistentMap<K, V> {
*
* @param listener listener to deregister for events
*/
public void removeListener(EventuallyConsistentMapListener listener);
public void removeListener(EventuallyConsistentMapListener<K, V> listener);
/**
* Shuts down the map and breaks communication between different instances.
......
......@@ -78,7 +78,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ScheduledExecutorService backgroundExecutor;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " is already destroyed";
private static final String ERROR_DESTROYED = " map is already destroyed";
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
......@@ -154,6 +154,7 @@ public class EventuallyConsistentMapImpl<K, V>
serializerPool = builder
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(RemoveEntry.class)
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
......@@ -166,25 +167,25 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public int size() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.size();
}
@Override
public boolean isEmpty() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.anyMatch(timestamped -> timestamped.value().equals(value));
......@@ -192,7 +193,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public V get(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamped<V> value = items.get(key);
if (value != null) {
......@@ -203,7 +204,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void put(K key, V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (putInternal(key, value, timestamp)) {
......@@ -235,7 +236,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void remove(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
......@@ -261,7 +262,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
......@@ -287,7 +288,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void clear() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
......@@ -311,14 +312,14 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public Set<K> keySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.keySet();
}
@Override
public Collection<V> values() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.map(Timestamped::value)
......@@ -327,7 +328,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.entrySet().stream()
.map(e -> new Entry(e.getKey(), e.getValue().value()))
......@@ -335,15 +336,15 @@ public class EventuallyConsistentMapImpl<K, V>
}
@Override
public void addListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
public void addListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
listeners.remove(checkNotNull(listener));
}
......
......@@ -19,12 +19,12 @@ package org.onosproject.store.impl;
* Listener interested in receiving modification events for an
* EventuallyConsistentMap.
*/
public interface EventuallyConsistentMapListener {
public interface EventuallyConsistentMapListener<K, V> {
/**
* Reacts to the specified event.
*
* @param event the event
*/
public void event(EventuallyConsistentMapEvent event);
public void event(EventuallyConsistentMapEvent<K, V> event);
}
......
......@@ -16,8 +16,6 @@
package org.onosproject.store.intent.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -26,46 +24,29 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentClockService;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.onosproject.store.impl.EventuallyConsistentMap;
import org.onosproject.store.impl.EventuallyConsistentMapEvent;
import org.onosproject.store.impl.EventuallyConsistentMapImpl;
import org.onosproject.store.impl.EventuallyConsistentMapListener;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -80,20 +61,11 @@ public class GossipIntentStore
private final Logger log = getLogger(getClass());
private final ConcurrentMap<IntentId, Intent> intents =
new ConcurrentHashMap<>();
private EventuallyConsistentMap<IntentId, Intent> intents;
private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
= new ConcurrentHashMap<>();
private EventuallyConsistentMap<IntentId, IntentState> intentStates;
private final Set<IntentId> withdrawRequestedIntents
= Sets.newConcurrentHashSet();
private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
= new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentClockService intentClockService;
private EventuallyConsistentMap<IntentId, List<Intent>> installables;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -101,64 +73,39 @@ public class GossipIntentStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalIntentEvent.class)
.register(InternalSetInstallablesEvent.class)
.register(Collections.emptyList().getClass())
//.register(InternalIntentAntiEntropyEvent.class)
//.register(IntentAntiEntropyAdvertisement.class)
.build();
}
};
private ExecutorService executor;
private ScheduledExecutorService backgroundExecutor;
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
private long periodSec = 5;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
new InternalIntentCreateOrUpdateEventListener());
clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
new InternalIntentSetInstallablesListener());
clusterCommunicator.addSubscriber(
INTENT_ANTI_ENTROPY_ADVERTISEMENT,
new InternalIntentAntiEntropyAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
// start anti-entropy thread
//backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
//initialDelaySec, periodSec, TimeUnit.SECONDS);
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
// TODO this should be in BASIC namespace
.register(Collections.emptyList().getClass());
intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
intentStates = new EventuallyConsistentMapImpl<>("intent-states",
clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
installables = new EventuallyConsistentMapImpl<>("intent-installables",
clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
intentStates.addListener(new InternalIntentStatesListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdownNow();
backgroundExecutor.shutdownNow();
try {
if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
log.error("Error during executor shutdown", e);
}
intents.clear();
intents.destroy();
intentStates.destroy();
installables.destroy();
log.info("Stopped");
}
......@@ -181,76 +128,19 @@ public class GossipIntentStore
@Override
public IntentState getIntentState(IntentId intentId) {
Timestamped<IntentState> state = intentStates.get(intentId);
if (state != null) {
return state.value();
}
return null;
}
private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
switch (newState) {
case WITHDRAW_REQ:
withdrawRequestedIntents.add(intentId);
break;
case INSTALL_REQ:
case COMPILING:
case INSTALLING:
case INSTALLED:
case RECOMPILING:
case WITHDRAWING:
case WITHDRAWN:
case FAILED:
synchronized (intentStates) {
Timestamped<IntentState> existing = intentStates.get(intentId);
if (existing == null || !existing.isNewer(timestamp)) {
intentStates.put(intentId, new Timestamped<>(newState, timestamp));
}
}
break;
default:
log.warn("Unknown intent state {}", newState);
break;
}
try {
// TODO make sure it's OK if the intent is null
return IntentEvent.getEvent(newState, intents.get(intentId));
} catch (IllegalArgumentException e) {
// Transient states can't be used for events, so don't send one
return null;
}
}
private void setInstallableIntentsInternal(IntentId intentId,
List<Intent> installableIntents,
Timestamp timestamp) {
synchronized (installables) {
Timestamped<List<Intent>> existing = installables.get(intentId);
if (existing == null || !existing.isNewer(timestamp)) {
installables.put(intentId,
new Timestamped<>(installableIntents, timestamp));
}
}
return intentStates.get(intentId);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
Timestamped<List<Intent>> tInstallables = installables.get(intentId);
if (tInstallables != null) {
return tInstallables.value();
}
return null;
return installables.get(intentId);
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
List<IntentEvent> events = Lists.newArrayList();
List<BatchWrite.Operation> failed = new ArrayList<>();
Timestamp timestamp = null;
for (BatchWrite.Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
......@@ -258,19 +148,18 @@ public class GossipIntentStore
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
timestamp = intentClockService.getTimestamp(intent.id());
if (createIntentInternal(intent)) {
events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp));
notifyPeers(new InternalIntentEvent(intent.id(), intent,
INSTALL_REQ, timestamp));
}
intents.put(intent.id(), intent);
intentStates.put(intent.id(), INSTALL_REQ);
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.arg(0);
// TODO implement
IntentId intentId = op.arg(0);
intents.remove(intentId);
intentStates.remove(intentId);
installables.remove(intentId);
break;
case SET_STATE:
......@@ -279,10 +168,7 @@ public class GossipIntentStore
intent = op.arg(0);
IntentState newState = op.arg(1);
timestamp = intentClockService.getTimestamp(intent.id());
IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
events.add(externalEvent);
notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
intentStates.put(intent.id(), newState);
break;
case SET_INSTALLABLE:
......@@ -291,18 +177,14 @@ public class GossipIntentStore
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
timestamp = intentClockService.getTimestamp(intentId);
setInstallableIntentsInternal(
intentId, installableIntents, timestamp);
notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp));
installables.put(intentId, installableIntents);
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
// TODO implement
installables.remove(intentId);
break;
default:
log.warn("Unknown Operation encountered: {}", op);
......@@ -311,121 +193,34 @@ public class GossipIntentStore
}
}
notifyDelegate(events);
return failed;
}
private boolean createIntentInternal(Intent intent) {
Intent oldValue = intents.putIfAbsent(intent.id(), intent);
if (oldValue == null) {
return true;
}
log.warn("Intent ID {} already in store, throwing new update away",
intent.id());
return false;
}
private void notifyPeers(InternalIntentEvent event) {
broadcastMessage(INTENT_UPDATED_MSG, event);
}
private void notifyPeers(InternalSetInstallablesEvent event) {
broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
}
private void broadcastMessage(MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, peer);
}
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
private final class InternalIntentCreateOrUpdateEventListener
implements ClusterMessageHandler {
private final class InternalIntentStatesListener implements
EventuallyConsistentMapListener<IntentId, IntentState> {
@Override
public void handle(ClusterMessage message) {
log.debug("Received intent update event from peer: {}", message.sender());
InternalIntentEvent event = SERIALIZER.decode(message.payload());
public void event(
EventuallyConsistentMapEvent<IntentId, IntentState> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
IntentEvent externalEvent;
Intent intent = intents.get(event.key()); // TODO OK if this is null?
IntentId intentId = event.intentId();
Intent intent = event.intent();
IntentState state = event.state();
Timestamp timestamp = event.timestamp();
executor.submit(() -> {
try {
switch (state) {
case INSTALL_REQ:
createIntentInternal(intent);
// Fallthrough to setStateInternal for INSTALL_REQ
default:
notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
break;
}
} catch (Exception e) {
log.warn("Exception thrown handling intent create or update", e);
}
});
}
externalEvent = IntentEvent.getEvent(event.value(), intent);
} catch (IllegalArgumentException e) {
externalEvent = null;
}
private final class InternalIntentSetInstallablesListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received intent set installables event from peer: {}", message.sender());
InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
IntentId intentId = event.intentId();
List<Intent> installables = event.installables();
Timestamp timestamp = event.timestamp();
executor.submit(() -> {
try {
setInstallableIntentsInternal(intentId, installables, timestamp);
} catch (Exception e) {
log.warn("Exception thrown handling intent set installables", e);
notifyDelegateIfNotNull(externalEvent);
}
});
}
}
private final class InternalIntentAntiEntropyAdvertisementListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
// TODO implement
//IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
backgroundExecutor.submit(() -> {
try {
log.debug("something");
//handleAntiEntropyAdvertisement(advertisement);
} catch (Exception e) {
log.warn("Exception thrown handling intent advertisements", e);
}
});
}
}
}
......