Yuta HIGUCHI
Committed by Yuta Higuchi

IntentStore: add batch write API

Change-Id: I9d397e9dc3dc6e9ccd21ac6ddacaece79214c470
......@@ -15,8 +15,17 @@
*/
package org.onlab.onos.net.intent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.store.Store;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
......@@ -110,4 +119,165 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
*/
void removeInstalledIntents(IntentId intentId);
/**
* Returns a new empty batch write operation buider.
*
* @return BatchWrite
*/
default BatchWrite newBatchWrite() {
return new BatchWrite();
}
// default implementation simply executes them sequentially.
// Store implementation should override and implement actual batch write.
/**
* Execute writes in a batch.
*
* @param batch BatchWrite to execute
* @return failed operations
*/
default List<Operation> batchWrite(BatchWrite batch) {
List<Operation> failed = new ArrayList<>();
for (Operation op : batch.operations) {
switch (op.type) {
case CREATE_INTENT:
checkArgument(op.args.size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = (Intent) op.args.get(0);
if (createIntent(intent) == null) {
failed.add(op);
}
break;
case REMOVE_INTENT:
checkArgument(op.args.size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.args.get(0);
removeIntent(intentId);
break;
case REMOVE_INSTALLED:
checkArgument(op.args.size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = (IntentId) op.args.get(0);
removeInstalledIntents(intentId);
break;
case SET_INSTALLABLE:
checkArgument(op.args.size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = (IntentId) op.args.get(0);
@SuppressWarnings("unchecked")
List<Intent> installableIntents = (List<Intent>) op.args.get(1);
setInstallableIntents(intentId, installableIntents);
break;
case SET_STATE:
checkArgument(op.args.size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = (Intent) op.args.get(0);
IntentState newState = (IntentState) op.args.get(1);
setState(intent, newState);
break;
default:
break;
}
}
return failed;
}
public static class BatchWrite {
public enum OpType {
CREATE_INTENT,
REMOVE_INTENT,
SET_STATE,
SET_INSTALLABLE,
REMOVE_INSTALLED
}
List<Operation> operations = new ArrayList<>();
public List<Operation> operations() {
return Collections.unmodifiableList(operations);
}
public boolean isEmpty() {
return operations.isEmpty();
}
public BatchWrite createIntent(Intent intent) {
operations.add(Operation.of(OpType.CREATE_INTENT,
ImmutableList.of(intent)));
return this;
}
public BatchWrite removeIntent(IntentId intentId) {
operations.add(Operation.of(OpType.REMOVE_INTENT,
ImmutableList.of(intentId)));
return this;
}
public BatchWrite setState(Intent intent, IntentState newState) {
operations.add(Operation.of(OpType.SET_STATE,
ImmutableList.of(intent, newState)));
return this;
}
public BatchWrite setInstallableIntents(IntentId intentId, List<Intent> installableIntents) {
operations.add(Operation.of(OpType.SET_INSTALLABLE,
ImmutableList.of(intentId, installableIntents)));
return this;
}
public BatchWrite removeInstalledIntents(IntentId intentId) {
operations.add(Operation.of(OpType.REMOVE_INSTALLED,
ImmutableList.of(intentId)));
return this;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("operations", operations)
.toString();
}
public static class Operation {
final OpType type;
final ImmutableList<Object> args;
public static Operation of(OpType type, List<Object> args) {
return new Operation(type, args);
}
public Operation(OpType type, List<Object> args) {
this.type = checkNotNull(type);
this.args = ImmutableList.copyOf(args);
}
public OpType type() {
return type;
}
public ImmutableList<Object> args() {
return args;
}
@SuppressWarnings("unchecked")
public <T> T arg(int i) {
return (T) args.get(i);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("type", type)
.add("args", args)
.toString();
}
}
}
}
......
......@@ -18,6 +18,9 @@ package org.onlab.onos.store.intent.impl;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
......@@ -34,22 +37,28 @@ import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.BatchWriteRequest.Builder;
import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.impl.CMap;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -70,15 +79,21 @@ public class DistributedIntentStore
private final Logger log = getLogger(getClass());
// Assumption: IntentId will not have synonyms
private static final String INTENTS_TABLE = "intents";
private CMap<IntentId, Intent> intents;
private static final String STATES_TABLE = "intent-states";
private CMap<IntentId, IntentState> states;
// TODO left behind transient state issue: ONOS-103
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
private static final String INSTALLABLE_TABLE = "installable-intents";
private CMap<IntentId, List<Intent>> installable;
private LoadingCache<IntentId, String> keyCache;
private StoreSerializer serializer;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -137,13 +152,23 @@ public class DistributedIntentStore
}
};
intents = new CMap<>(dbAdminService, dbService, "intents", serializer);
keyCache = CacheBuilder.newBuilder()
.softValues()
.build(new CacheLoader<IntentId, String>() {
@Override
public String load(IntentId key) {
return key.toString();
}
});
intents = new IntentIdMap<>(dbAdminService, dbService, INTENTS_TABLE, serializer);
states = new CMap<>(dbAdminService, dbService, "intent-states", serializer);
states = new IntentIdMap<>(dbAdminService, dbService, STATES_TABLE, serializer);
transientStates.clear();
installable = new CMap<>(dbAdminService, dbService, "installable-intents", serializer);
installable = new IntentIdMap<>(dbAdminService, dbService, INSTALLABLE_TABLE, serializer);
log.info("Started");
}
......@@ -351,4 +376,101 @@ public class DistributedIntentStore
stopTimer(timer);
}
}
protected String strIntentId(IntentId key) {
return keyCache.getUnchecked(key);
}
/**
* Distributed Map from IntentId to some value.
*
* @param <V> Map value type
*/
final class IntentIdMap<V> extends CMap<IntentId, V> {
/**
* Creates a IntentIdMap instance.
*
* @param dbAdminService DatabaseAdminService to use for this instance
* @param dbService DatabaseService to use for this instance
* @param tableName table which this Map corresponds to
* @param serializer Value serializer
*/
public IntentIdMap(DatabaseAdminService dbAdminService,
DatabaseService dbService,
String tableName,
StoreSerializer serializer) {
super(dbAdminService, dbService, tableName, serializer);
}
@Override
protected String sK(IntentId key) {
return strIntentId(key);
}
}
@Override
public List<Operation> batchWrite(BatchWrite batch) {
List<Operation> failed = new ArrayList<>();
final Builder builder = BatchWriteRequest.newBuilder();
for (Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
checkArgument(op.args().size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
builder.putIfAbsent(INTENTS_TABLE, strIntentId(intent.id()), serializer.encode(intent));
builder.putIfAbsent(STATES_TABLE, strIntentId(intent.id()), serializer.encode(SUBMITTED));
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.arg(0);
builder.remove(INTENTS_TABLE, strIntentId(intentId));
builder.remove(STATES_TABLE, strIntentId(intentId));
builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
break;
case SET_STATE:
checkArgument(op.args().size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = op.arg(0);
IntentState newState = op.arg(1);
builder.put(STATES_TABLE, strIntentId(intent.id()), serializer.encode(newState));
break;
case SET_INSTALLABLE:
checkArgument(op.args().size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
builder.put(INSTALLABLE_TABLE, strIntentId(intentId), serializer.encode(installableIntents));
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
break;
default:
log.warn("Unknown Operation encountered: {}", op);
failed.add(op);
break;
}
}
BatchWriteResult batchWriteResult = dbService.batchWrite(builder.build());
if (batchWriteResult.isSuccessful()) {
// no-failure (except for invalid input)
return failed;
} else {
// everything failed
return batch.operations();
}
}
}
......
......@@ -18,6 +18,7 @@ package org.onlab.onos.store.intent.impl;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
......@@ -25,6 +26,7 @@ import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -38,6 +40,7 @@ import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
......@@ -46,12 +49,16 @@ import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -341,6 +348,207 @@ public class HazelcastIntentStore
}
}
@Override
public List<Operation> batchWrite(BatchWrite batch) {
// Hazelcast version will never fail for conditional failure now.
List<Operation> failed = new ArrayList<>();
List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
for (Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
checkArgument(op.args().size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(intents.putAsync(intent.id(), intent),
states.putAsync(intent.id(), SUBMITTED))));
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(intents.removeAsync(intentId),
states.removeAsync(intentId),
installable.removeAsync(intentId))));
break;
case SET_STATE:
checkArgument(op.args().size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = op.arg(0);
IntentState newState = op.arg(1);
futures.add(Pair.of(op,
ImmutableList.of(states.putAsync(intent.id(), newState))));
break;
case SET_INSTALLABLE:
checkArgument(op.args().size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
futures.add(Pair.of(op,
ImmutableList.of(installable.putAsync(intentId, installableIntents))));
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(installable.removeAsync(intentId))));
break;
default:
log.warn("Unknown Operation encountered: {}", op);
failed.add(op);
break;
}
}
// verify result
for (Pair<Operation, List<Future<?>>> future : futures) {
final Operation op = future.getLeft();
final List<Future<?>> subops = future.getRight();
switch (op.type()) {
case CREATE_INTENT:
{
Intent intent = op.arg(0);
IntentState newIntentState = SUBMITTED;
try {
Intent prevIntent = (Intent) subops.get(0).get();
IntentState prevIntentState = (IntentState) subops.get(1).get();
if (prevIntent != null || prevIntentState != null) {
log.warn("Overwriting existing Intent: {}@{} with {}@{}",
prevIntent, prevIntentState,
intent, newIntentState);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case REMOVE_INTENT:
{
IntentId intentId = op.arg(0);
try {
Intent prevIntent = (Intent) subops.get(0).get();
IntentState prevIntentState = (IntentState) subops.get(1).get();
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
if (prevIntent == null) {
log.warn("Intent {} was already removed.", intentId);
}
if (prevIntentState == null) {
log.warn("Intent {} state was already removed", intentId);
}
if (prevInstallable == null) {
log.info("Intent {} installable was already removed", intentId);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case SET_STATE:
{
Intent intent = op.arg(0);
IntentId intentId = intent.id();
IntentState newState = op.arg(1);
try {
IntentState prevIntentState = (IntentState) subops.get(0).get();
log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
// TODO sanity check and log?
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case SET_INSTALLABLE:
{
IntentId intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
try {
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
if (prevInstallable != null) {
log.warn("Overwriting Intent {} installable {} -> {}",
intentId, prevInstallable, installableIntents);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case REMOVE_INSTALLED:
{
IntentId intentId = op.arg(0);
try {
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
if (prevInstallable == null) {
log.warn("Intent {} installable was already removed", intentId);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
default:
log.warn("Unknown Operation encountered: {}", op);
if (!failed.contains(op)) {
failed.add(op);
}
break;
}
}
return failed;
}
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
@Override
......