Madan Jampani

MapDB backed Copycat log implementation

......@@ -64,6 +64,12 @@
-->
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
......
......@@ -3,12 +3,17 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
......@@ -57,37 +62,37 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to ping request", e);
}
});
handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to poll request", e);
}
});
handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to sync request", e);
}
});
handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
} else {
throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
}
}
private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
private final ClusterMessage message;
public PostExecutionTask(ClusterMessage message) {
this.message = message;
}
@Override
public void accept(R response, Throwable t) {
if (t != null) {
log.error("Processing for " + message.subject() + " failed.", t);
} else {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to submit request", e);
log.error("Failed to respond to " + response.getClass().getName(), e);
}
}
});
}
}
}
......
......@@ -172,8 +172,8 @@ public class DatabaseStateMachine implements StateMachine {
try {
return SERIALIZER.encode(state);
} catch (Exception e) {
log.error("Snapshot serialization error", e);
return null;
log.error("Failed to take snapshot", e);
throw new SnapshotException(e);
}
}
......@@ -182,7 +182,8 @@ public class DatabaseStateMachine implements StateMachine {
try {
this.state = SERIALIZER.decode(data);
} catch (Exception e) {
log.error("Snapshot deserialization error", e);
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
}
}
}
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import net.kuujo.copycat.log.Entry;
import net.kuujo.copycat.log.Log;
import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
import org.mapdb.Atomic;
import org.mapdb.BTreeMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.TxBlock;
import org.mapdb.TxMaker;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.collect.Lists;
/**
* MapDB based log implementation.
*/
public class MapDBLog implements Log {
private final File dbFile;
private TxMaker txMaker;
private final StoreSerializer serializer;
private static final String LOG_NAME = "log";
private static final String SIZE_FIELD_NAME = "size";
public MapDBLog(File dbFile, StoreSerializer serializer) {
this.dbFile = dbFile;
this.serializer = serializer;
}
@Override
public void open() throws IOException {
txMaker = DBMaker
.newFileDB(dbFile)
.makeTxMaker();
}
@Override
public void close() throws IOException {
assertIsOpen();
txMaker.close();
txMaker = null;
}
@Override
public boolean isOpen() {
return txMaker != null;
}
protected void assertIsOpen() {
checkState(isOpen(), "The log is not currently open.");
}
@Override
public long appendEntry(Entry entry) {
checkArgument(entry != null, "expecting non-null entry");
return appendEntries(entry).get(0);
}
@Override
public List<Long> appendEntries(Entry... entries) {
checkArgument(entries != null, "expecting non-null entries");
return appendEntries(Arrays.asList(entries));
}
@Override
public List<Long> appendEntries(List<Entry> entries) {
assertIsOpen();
checkArgument(entries != null, "expecting non-null entries");
final List<Long> indices = Lists.newArrayList();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
for (Entry entry : entries) {
byte[] entryBytes = serializer.encode(entry);
log.put(nextIndex, entryBytes);
size.addAndGet(entryBytes.length);
indices.add(nextIndex);
nextIndex++;
}
}
});
return indices;
}
@Override
public boolean containsEntry(long index) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.containsKey(index);
} finally {
db.close();
}
}
@Override
public void delete() throws IOException {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
log.clear();
size.set(0);
}
});
}
@Override
public <T extends Entry> T firstEntry() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
} finally {
db.close();
}
}
@Override
public long firstIndex() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? 0 : log.firstKey();
} finally {
db.close();
}
}
@Override
public <T extends Entry> List<T> getEntries(long from, long to) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
if (log.isEmpty()) {
throw new LogIndexOutOfBoundsException("Log is empty");
} else if (from < log.firstKey()) {
throw new LogIndexOutOfBoundsException("From index out of bounds.");
} else if (to > log.lastKey()) {
throw new LogIndexOutOfBoundsException("To index out of bounds.");
}
List<T> entries = new ArrayList<>((int) (to - from + 1));
for (long i = from; i <= to; i++) {
T entry = serializer.decode(log.get(i));
entries.add(entry);
}
return entries;
} finally {
db.close();
}
}
@Override
public <T extends Entry> T getEntry(long index) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
byte[] entryBytes = log.get(index);
return entryBytes == null ? null : serializer.decode(entryBytes);
} finally {
db.close();
}
}
@Override
public boolean isEmpty() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty();
} finally {
db.close();
}
}
@Override
public <T extends Entry> T lastEntry() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
} finally {
db.close();
}
}
@Override
public long lastIndex() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? 0 : log.lastKey();
} finally {
db.close();
}
}
@Override
public void removeAfter(long index) {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
long startIndex = index + 1;
long endIndex = log.lastKey();
for (long i = startIndex; i <= endIndex; ++i) {
byte[] entryBytes = log.remove(i);
size.addAndGet(-1L * entryBytes.length);
}
}
});
}
@Override
public long size() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
return size.get();
} finally {
db.close();
}
}
@Override
public void sync() throws IOException {
assertIsOpen();
}
@Override
public void compact(long index, Entry entry) throws IOException {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
size.addAndGet(-1 * deletedBytes);
byte[] entryBytes = serializer.encode(entry);
byte[] existingEntry = log.put(index, entryBytes);
size.addAndGet(entryBytes.length - existingEntry.length);
db.compact();
}
});
}
}
\ No newline at end of file
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.DatabaseException;
/**
* Exception that indicates a problem with the state machine snapshotting.
*/
@SuppressWarnings("serial")
public class SnapshotException extends DatabaseException {
public SnapshotException(Throwable t) {
super(t);
}
}
package org.onlab.onos.store.service.impl;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.log.Entry;
import net.kuujo.copycat.log.Log;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.testing.EqualsTester;
/**
* Test the MapDBLog implementation.
*/
public class MapDBLogTest {
private static final String DB_FILE_NAME = "mapdbTest";
private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
Files.deleteIfExists(new File(DB_FILE_NAME).toPath());
}
@Test(expected = IllegalStateException.class)
public void testAssertOpen() {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.size();
}
@Test
public void testAppendEntry() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntry(TEST_ENTRY1);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, last, TEST_ENTRY1)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(1, log.lastIndex());
}
@Test
public void testAppendEntries() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, TEST_ENTRY1)
.addEqualityGroup(last, TEST_ENTRY3)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(3, log.lastIndex());
Assert.assertTrue(log.containsEntry(1));
Assert.assertTrue(log.containsEntry(2));
}
@Test
public void testDelete() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
log.delete();
Assert.assertEquals(0, log.size());
Assert.assertTrue(log.isEmpty());
Assert.assertEquals(0, log.firstIndex());
Assert.assertNull(log.firstEntry());
Assert.assertEquals(0, log.lastIndex());
Assert.assertNull(log.lastEntry());
}
@Test
public void testGetEntries() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
Assert.assertEquals(
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE, log.size());
List<Entry> entries = log.getEntries(2, 3);
new EqualsTester()
.addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
.addEqualityGroup(entries.get(0), TEST_ENTRY2)
.addEqualityGroup(entries.get(1), TEST_ENTRY3)
.testEquals();
}
@Test
public void testRemoveAfter() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
.testEquals();
}
@Test
public void testAddAfterRemove() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
log.appendEntry(TEST_ENTRY4);
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testClose() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
Assert.assertFalse(log.isOpen());
log.open();
Assert.assertTrue(log.isOpen());
log.close();
Assert.assertFalse(log.isOpen());
}
@Test
public void testReopen() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.close();
log.open();
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testCompact() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.compact(3, TEST_SNAPSHOT_ENTRY);
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_SNAPSHOT_ENTRY_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
}