Yuta HIGUCHI

MapDBLog: minor fixes

- minimize access to Atomic.Long size
- use Iterator to reduce BTree lookup
- should not mutate original Map inside stream

Change-Id: Ic99ea8769c379e77f1f9478c640d5e5a1bede058
...@@ -2,13 +2,16 @@ package org.onlab.onos.store.service.impl; ...@@ -2,13 +2,16 @@ package org.onlab.onos.store.service.impl;
2 2
3 import static com.google.common.base.Preconditions.checkArgument; 3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState; 4 import static com.google.common.base.Preconditions.checkState;
5 +import static com.google.common.base.Verify.verifyNotNull;
5 import static org.slf4j.LoggerFactory.getLogger; 6 import static org.slf4j.LoggerFactory.getLogger;
6 7
7 import java.io.File; 8 import java.io.File;
8 import java.io.IOException; 9 import java.io.IOException;
9 import java.util.ArrayList; 10 import java.util.ArrayList;
10 import java.util.Arrays; 11 import java.util.Arrays;
12 +import java.util.Iterator;
11 import java.util.List; 13 import java.util.List;
14 +import java.util.Map;
12 import java.util.concurrent.ConcurrentNavigableMap; 15 import java.util.concurrent.ConcurrentNavigableMap;
13 16
14 import net.kuujo.copycat.log.Entry; 17 import net.kuujo.copycat.log.Entry;
...@@ -25,8 +28,6 @@ import org.mapdb.TxMaker; ...@@ -25,8 +28,6 @@ import org.mapdb.TxMaker;
25 import org.onlab.onos.store.serializers.StoreSerializer; 28 import org.onlab.onos.store.serializers.StoreSerializer;
26 import org.slf4j.Logger; 29 import org.slf4j.Logger;
27 30
28 -import com.google.common.collect.Lists;
29 -
30 /** 31 /**
31 * MapDB based log implementation. 32 * MapDB based log implementation.
32 */ 33 */
...@@ -84,7 +85,7 @@ public class MapDBLog implements Log { ...@@ -84,7 +85,7 @@ public class MapDBLog implements Log {
84 public List<Long> appendEntries(List<Entry> entries) { 85 public List<Long> appendEntries(List<Entry> entries) {
85 assertIsOpen(); 86 assertIsOpen();
86 checkArgument(entries != null, "expecting non-null entries"); 87 checkArgument(entries != null, "expecting non-null entries");
87 - final List<Long> indices = Lists.newArrayList(); 88 + final List<Long> indices = new ArrayList<>(entries.size());
88 89
89 txMaker.execute(new TxBlock() { 90 txMaker.execute(new TxBlock() {
90 @Override 91 @Override
...@@ -92,13 +93,15 @@ public class MapDBLog implements Log { ...@@ -92,13 +93,15 @@ public class MapDBLog implements Log {
92 BTreeMap<Long, byte[]> log = getLogMap(db); 93 BTreeMap<Long, byte[]> log = getLogMap(db);
93 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); 94 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
94 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1; 95 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
96 + long addedBytes = 0;
95 for (Entry entry : entries) { 97 for (Entry entry : entries) {
96 byte[] entryBytes = serializer.encode(entry); 98 byte[] entryBytes = serializer.encode(entry);
97 log.put(nextIndex, entryBytes); 99 log.put(nextIndex, entryBytes);
98 - size.addAndGet(entryBytes.length); 100 + addedBytes += entryBytes.length;
99 indices.add(nextIndex); 101 indices.add(nextIndex);
100 nextIndex++; 102 nextIndex++;
101 } 103 }
104 + size.addAndGet(addedBytes);
102 } 105 }
103 }); 106 });
104 107
...@@ -236,12 +239,15 @@ public class MapDBLog implements Log { ...@@ -236,12 +239,15 @@ public class MapDBLog implements Log {
236 public void tx(DB db) { 239 public void tx(DB db) {
237 BTreeMap<Long, byte[]> log = getLogMap(db); 240 BTreeMap<Long, byte[]> log = getLogMap(db);
238 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); 241 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
239 - long startIndex = index + 1; 242 + long removedBytes = 0;
240 - long endIndex = log.lastKey(); 243 + ConcurrentNavigableMap<Long, byte[]> tailMap = log.tailMap(index, false);
241 - for (long i = startIndex; i <= endIndex; ++i) { 244 + Iterator<Map.Entry<Long, byte[]>> it = tailMap.entrySet().iterator();
242 - byte[] entryBytes = log.remove(i); 245 + while (it.hasNext()) {
243 - size.addAndGet(-1L * entryBytes.length); 246 + Map.Entry<Long, byte[]> entry = it.next();
247 + removedBytes += entry.getValue().length;
248 + it.remove();
244 } 249 }
250 + size.addAndGet(-removedBytes);
245 } 251 }
246 }); 252 });
247 } 253 }
...@@ -273,9 +279,16 @@ public class MapDBLog implements Log { ...@@ -273,9 +279,16 @@ public class MapDBLog implements Log {
273 BTreeMap<Long, byte[]> log = getLogMap(db); 279 BTreeMap<Long, byte[]> log = getLogMap(db);
274 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); 280 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
275 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index); 281 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
276 - long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum(); 282 + Iterator<Map.Entry<Long, byte[]>> it = headMap.entrySet().iterator();
277 - size.addAndGet(-1 * deletedBytes); 283 +
278 - byte[] entryBytes = serializer.encode(entry); 284 + long deletedBytes = 0;
285 + while (it.hasNext()) {
286 + Map.Entry<Long, byte[]> e = it.next();
287 + deletedBytes += e.getValue().length;
288 + it.remove();
289 + }
290 + size.addAndGet(-deletedBytes);
291 + byte[] entryBytes = verifyNotNull(serializer.encode(entry));
279 byte[] existingEntry = log.put(index, entryBytes); 292 byte[] existingEntry = log.put(index, entryBytes);
280 if (existingEntry != null) { 293 if (existingEntry != null) {
281 size.addAndGet(entryBytes.length - existingEntry.length); 294 size.addAndGet(entryBytes.length - existingEntry.length);
......