Aaron Kruglikov
Committed by Gerrit Code Review

Modifying eventually consistent map and tests to make use of the persistence service.

Change-Id: I44ffcabb9d765a1c70c2790366c6d7381416dac6
......@@ -30,7 +30,8 @@ import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.*;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
/**
* Testing version of an Eventually Consistent Map.
......
......@@ -69,6 +69,12 @@
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-persistence</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.8</version>
......@@ -110,5 +116,4 @@
<artifactId>onlab-thirdparty</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -55,6 +55,7 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
......@@ -128,6 +129,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
protected String nodeIdToUri(NodeId nodeId) {
ControllerNode node = clusterService.getNode(nodeId);
return String.format("onos://%s:%d", node.ip(), node.tcpPort());
......@@ -312,7 +316,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
return new EventuallyConsistentMapBuilderImpl<>(clusterService,
clusterCommunicator);
clusterCommunicator,
persistenceService);
}
@Override
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.ecmap;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -52,6 +53,8 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
private boolean convergeFaster = false;
private boolean persistent = false;
private boolean persistentMap = false;
private final PersistenceService persistenceService;
/**
* Creates a new eventually consistent map builder.
......@@ -60,7 +63,9 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
* @param clusterCommunicator cluster communication service
*/
public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
ClusterCommunicationService clusterCommunicator,
PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
}
......@@ -133,6 +138,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
@Override
public EventuallyConsistentMapBuilder<K, V> withPersistence() {
checkNotNull(this.persistenceService);
persistent = true;
return this;
}
......@@ -156,6 +162,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
antiEntropyPeriod,
antiEntropyTimeUnit,
convergeFaster,
persistent);
persistent,
persistenceService);
}
}
......
......@@ -28,6 +28,7 @@ import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
......@@ -37,6 +38,7 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -81,6 +83,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
private final BiFunction<K, V, Timestamp> timestampProvider;
......@@ -116,7 +119,9 @@ public class EventuallyConsistentMapImpl<K, V>
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
private final boolean persistent;
private final PersistentStore<K, V> persistentStore;
private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
/**
* Creates a new eventually consistent map shared amongst multiple instances.
......@@ -158,9 +163,32 @@ public class EventuallyConsistentMapImpl<K, V>
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent) {
boolean persistent,
PersistenceService persistenceService) {
this.mapName = mapName;
items = Maps.newConcurrentMap();
this.serializer = createSerializer(serializerBuilder);
this.persistenceService = persistenceService;
this.persistent =
persistent;
if (persistent) {
items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
.withName(PERSISTENT_LOCAL_MAP_NAME)
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
return EventuallyConsistentMapImpl.this.serializer.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
}
})
.build();
} else {
items = Maps.newConcurrentMap();
}
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
......@@ -168,8 +196,6 @@ public class EventuallyConsistentMapImpl<K, V>
this.clusterCommunicator = clusterCommunicator;
this.localNodeId = clusterService.getLocalNode().id();
this.serializer = createSerializer(serializerBuilder);
this.timestampProvider = timestampProvider;
if (peerUpdateFunction != null) {
......@@ -198,20 +224,6 @@ public class EventuallyConsistentMapImpl<K, V>
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
this.persistent = persistent;
if (this.persistent) {
String dataDirectory = System.getProperty("karaf.data", "./data");
String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
ExecutorService dbExecutor =
newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
persistentStore.readInto(items);
} else {
this.persistentStore = null;
}
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
......@@ -373,15 +385,6 @@ public class EventuallyConsistentMapImpl<K, V>
return existing;
}
});
if (updated.get()) {
if (persistent) {
if (tombstone.isPresent()) {
persistentStore.update(key, tombstone.get());
} else {
persistentStore.remove(key);
}
}
}
return previousValue.get();
}
......@@ -455,6 +458,7 @@ public class EventuallyConsistentMapImpl<K, V>
/**
* Returns true if newValue was accepted i.e. map is updated.
*
* @param key key
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
......@@ -473,9 +477,6 @@ public class EventuallyConsistentMapImpl<K, V>
}
return existing;
});
if (updated.get() && persistent) {
persistentStore.update(key, newValue);
}
return updated.get();
}
......
......@@ -42,6 +42,7 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractEvent;
import org.onosproject.persistence.impl.PersistenceManager;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
......@@ -81,6 +82,7 @@ public class EventuallyConsistentMapImplTest {
private EventuallyConsistentMap<String, String> ecMap;
private PersistenceManager persistenceService;
private ClusterService clusterService;
private ClusterCommunicationService clusterCommunicator;
private SequentialClockService<String, String> clockService;
......@@ -136,6 +138,8 @@ public class EventuallyConsistentMapImplTest {
clusterCommunicator = createMock(ClusterCommunicationService.class);
persistenceService = new PersistenceManager();
persistenceService.activate();
// Add expectation for adding cluster message subscribers which
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
......@@ -153,11 +157,12 @@ public class EventuallyConsistentMapImplTest {
.register(TestTimestamp.class);
ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
clusterService, clusterCommunicator)
clusterService, clusterCommunicator, persistenceService)
.withName(MAP_NAME)
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
.withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
.withPersistence()
.build();
// Reset ready for tests to add their own expectations
......