Yuta HIGUCHI
Committed by Yuta Higuchi

DistributedIntentStore: CopyCat version of Distributed intent store

- old DistributedIntentStore renamed to Hazelcast~ and is by default disabled

Change-Id: I386eaf6c136f8a2fbebb4268d20b1395249e77ea
......@@ -34,6 +34,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
// FIXME This is not distributed yet.
@Component(immediate = true)
@Service
public class DistributedIntentBatchQueue implements IntentBatchService {
......
......@@ -16,15 +16,12 @@
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentEvent;
......@@ -32,10 +29,13 @@ import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.impl.CMap;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
......@@ -52,7 +52,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class DistributedIntentStore
extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
extends AbstractStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
/** Valid parking state, which can transition to INSTALLED. */
......@@ -64,22 +64,29 @@ public class DistributedIntentStore
private final Logger log = getLogger(getClass());
// Assumption: IntentId will not have synonyms
private SMap<IntentId, Intent> intents;
private SMap<IntentId, IntentState> states;
private CMap<IntentId, Intent> intents;
private CMap<IntentId, IntentState> states;
// TODO left behind transient state issue: ONOS-103
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
private SMap<IntentId, List<Intent>> installable;
private CMap<IntentId, List<Intent>> installable;
private StoreSerializer serializer;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseAdminService dbAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseService dbService;
@Override
@Activate
public void activate() {
// FIXME: We need a way to add serializer for intents which has been plugged-in.
// As a short term workaround, relax Kryo config to
// registrationRequired=false
super.activate();
super.serializer = new KryoSerializer() {
serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......@@ -89,24 +96,15 @@ public class DistributedIntentStore
.build()
.populate(1);
}
};
// TODO: enable near cache, allow read from backup for this IMap
IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
intents = new SMap<>(rawIntents , super.serializer);
intents = new CMap<>(dbAdminService, dbService, "intents", serializer);
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
states = new SMap<>(rawStates , super.serializer);
EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
states.addEntryListener(listener , false);
states = new CMap<>(dbAdminService, dbService, "intent-states", serializer);
transientStates.clear();
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
installable = new SMap<>(rawInstallables , super.serializer);
installable = new CMap<>(dbAdminService, dbService, "installable-intents", serializer);
log.info("Started");
}
......@@ -118,8 +116,8 @@ public class DistributedIntentStore
@Override
public IntentEvent createIntent(Intent intent) {
Intent existing = intents.putIfAbsent(intent.id(), intent);
if (existing != null) {
boolean absent = intents.putIfAbsent(intent.id(), intent);
if (!absent) {
// duplicate, ignore
return null;
} else {
......@@ -173,34 +171,47 @@ public class DistributedIntentStore
IntentEvent.Type type = null;
final IntentState prevParking;
boolean transientStateChangeOnly = false;
boolean updated;
// parking state transition
switch (state) {
case SUBMITTED:
prevParking = states.putIfAbsent(id, SUBMITTED);
prevParking = states.get(id);
verify(prevParking == null,
"Illegal state transition attempted from %s to SUBMITTED",
prevParking);
updated = states.putIfAbsent(id, SUBMITTED);
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
type = IntentEvent.Type.SUBMITTED;
break;
case INSTALLED:
prevParking = states.replace(id, INSTALLED);
prevParking = states.get(id);
verify(PRE_INSTALLED.contains(prevParking),
"Illegal state transition attempted from %s to INSTALLED",
prevParking);
updated = states.replace(id, prevParking, INSTALLED);
verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALLED);
type = IntentEvent.Type.INSTALLED;
break;
case FAILED:
prevParking = states.replace(id, FAILED);
prevParking = states.get(id);
updated = states.replace(id, prevParking, FAILED);
verify(updated, "Conditional replace %s => %s failed", prevParking, FAILED);
type = IntentEvent.Type.FAILED;
break;
case WITHDRAWN:
prevParking = states.replace(id, WITHDRAWN);
prevParking = states.get(id);
verify(PRE_WITHDRAWN.contains(prevParking),
"Illegal state transition attempted from %s to WITHDRAWN",
prevParking);
updated = states.replace(id, prevParking, WITHDRAWN);
verify(updated, "Conditional replace %s => %s failed", prevParking, WITHDRAWN);
type = IntentEvent.Type.WITHDRAWN;
break;
default:
transientStateChangeOnly = true;
prevParking = null;
......@@ -233,22 +244,4 @@ public class DistributedIntentStore
public void removeInstalledIntents(IntentId intentId) {
installable.remove(intentId);
}
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
@Override
public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
final Member myself = theInstance.getCluster().getLocalMember();
if (!myself.equals(event.getMember())) {
// When Intent state was modified by remote node,
// clear local transient state.
final IntentId intentId = event.getKey();
IntentState oldState = transientStates.remove(intentId);
if (oldState != null) {
log.debug("{} state updated remotely, removing transient state {}",
intentId, oldState);
}
}
}
}
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Verify.verify;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = false, enabled = false)
@Service
public class HazelcastIntentStore
extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
/** Valid parking state, which can transition to INSTALLED. */
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, FAILED);
/** Valid parking state, which can transition to WITHDRAWN. */
private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
private final Logger log = getLogger(getClass());
// Assumption: IntentId will not have synonyms
private SMap<IntentId, Intent> intents;
private SMap<IntentId, IntentState> states;
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
private SMap<IntentId, List<Intent>> installable;
@Override
@Activate
public void activate() {
// FIXME: We need a way to add serializer for intents which has been plugged-in.
// As a short term workaround, relax Kryo config to
// registrationRequired=false
super.activate();
super.serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.build()
.populate(1);
}
};
// TODO: enable near cache, allow read from backup for this IMap
IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
intents = new SMap<>(rawIntents , super.serializer);
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
states = new SMap<>(rawStates , super.serializer);
EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
states.addEntryListener(listener , false);
transientStates.clear();
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
installable = new SMap<>(rawInstallables , super.serializer);
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public IntentEvent createIntent(Intent intent) {
Intent existing = intents.putIfAbsent(intent.id(), intent);
if (existing != null) {
// duplicate, ignore
return null;
} else {
return this.setState(intent, IntentState.SUBMITTED);
}
}
@Override
public IntentEvent removeIntent(IntentId intentId) {
Intent intent = intents.remove(intentId);
installable.remove(intentId);
if (intent == null) {
// was already removed
return null;
}
IntentEvent event = this.setState(intent, WITHDRAWN);
states.remove(intentId);
transientStates.remove(intentId);
// TODO: Should we callremoveInstalledIntents if this Intent was
return event;
}
@Override
public long getIntentCount() {
return intents.size();
}
@Override
public Iterable<Intent> getIntents() {
return ImmutableSet.copyOf(intents.values());
}
@Override
public Intent getIntent(IntentId intentId) {
return intents.get(intentId);
}
@Override
public IntentState getIntentState(IntentId id) {
final IntentState localState = transientStates.get(id);
if (localState != null) {
return localState;
}
return states.get(id);
}
@Override
public IntentEvent setState(Intent intent, IntentState state) {
final IntentId id = intent.id();
IntentEvent.Type type = null;
final IntentState prevParking;
boolean transientStateChangeOnly = false;
// parking state transition
switch (state) {
case SUBMITTED:
prevParking = states.putIfAbsent(id, SUBMITTED);
verify(prevParking == null,
"Illegal state transition attempted from %s to SUBMITTED",
prevParking);
type = IntentEvent.Type.SUBMITTED;
break;
case INSTALLED:
prevParking = states.replace(id, INSTALLED);
verify(PRE_INSTALLED.contains(prevParking),
"Illegal state transition attempted from %s to INSTALLED",
prevParking);
type = IntentEvent.Type.INSTALLED;
break;
case FAILED:
prevParking = states.replace(id, FAILED);
type = IntentEvent.Type.FAILED;
break;
case WITHDRAWN:
prevParking = states.replace(id, WITHDRAWN);
verify(PRE_WITHDRAWN.contains(prevParking),
"Illegal state transition attempted from %s to WITHDRAWN",
prevParking);
type = IntentEvent.Type.WITHDRAWN;
break;
default:
transientStateChangeOnly = true;
prevParking = null;
break;
}
if (!transientStateChangeOnly) {
log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
}
// Update instance local state, which includes non-parking state transition
final IntentState prevTransient = transientStates.put(id, state);
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
if (type == null) {
return null;
}
return new IntentEvent(type, intent);
}
@Override
public void setInstallableIntents(IntentId intentId, List<Intent> result) {
installable.put(intentId, result);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
return installable.get(intentId);
}
@Override
public void removeInstalledIntents(IntentId intentId) {
installable.remove(intentId);
}
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
@Override
public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
final Member myself = theInstance.getCluster().getLocalMember();
if (!myself.equals(event.getMember())) {
// When Intent state was modified by remote node,
// clear local transient state.
final IntentId intentId = event.getKey();
IntentState oldState = transientStates.remove(intentId);
if (oldState != null) {
log.debug("{} state updated remotely, removing transient state {}",
intentId, oldState);
}
}
}
}
}
......@@ -321,7 +321,7 @@ public class DatabaseStateMachine implements StateMachine {
private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
private long versionCounter = 1;
public Map<String, VersionedValue> getTable(String tableName) {
Map<String, VersionedValue> getTable(String tableName) {
return tableData.get(tableName);
}
......
......@@ -55,6 +55,7 @@ public class MapDBLog implements Log {
.mmapFileEnableIfSupported()
.cacheSize(cacheSize)
.makeTxMaker();
log.info("Raft log file: {}", dbFile.getCanonicalPath());
}
@Override
......