Madan Jampani

Bugfixes for DistributedLockManager functionality

Added a method called broadcastIncludeSelf to ClusterCommunicationService.
Cosmetic improvements: added toString methods

Change-Id: I1d58720c29e6f8642f950670c3a6d95a7019a491
......@@ -21,7 +21,9 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
//import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -43,6 +45,8 @@ import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.onlab.onos.store.service.LockService;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
......@@ -72,6 +76,9 @@ public class FooComponent {
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected DatabaseService dbService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected LockService lockService;
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
......@@ -92,9 +99,10 @@ public class FooComponent {
log.info("Couldn't find DB service");
} else {
log.info("Found DB service");
longIncrementor();
executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
//longIncrementor();
//lockUnlock();
//executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
//executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
}
log.info("Started");
}
......@@ -152,6 +160,31 @@ public class FooComponent {
}
}
private void lockUnlock() {
try {
final String locksTable = "onos-locks";
if (!dbAdminService.listTables().contains(locksTable)) {
dbAdminService.createTable(locksTable, 10000);
}
Lock lock = lockService.create("foo-bar");
log.info("Requesting lock");
lock.lock(10000);
//try {
//Thread.sleep(5000);
//} catch (InterruptedException e) {
// TODO Auto-generated catch block
//e.printStackTrace();
//}
log.info("Acquired Lock");
log.info("Do I have the lock: {} ", lock.isLocked());
//lock.unlock();
log.info("Do I have the lock: {} ", lock.isLocked());
} finally {
log.info("Done");
}
}
private void longIncrementor() {
try {
final String someTable = "admin";
......
......@@ -38,6 +38,15 @@ public interface ClusterCommunicationService {
boolean broadcast(ClusterMessage message) throws IOException;
/**
* Broadcast a message to all controller nodes including self.
*
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
* @throws IOException when I/O exception of some sort has occurred
*/
boolean broadcastIncludeSelf(ClusterMessage message) throws IOException;
/**
* Sends a message to the specified controller node.
*
* @param message message to send
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service;
import java.util.List;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
......@@ -41,6 +42,13 @@ public final class BatchReadRequest {
return readRequests;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("readRequests", readRequests)
.toString();
}
/**
* Builder for BatchReadRequest.
*/
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service;
import java.util.List;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
......@@ -30,4 +31,11 @@ public class BatchReadResult {
public int batchSize() {
return readResults.size();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("readResults", readResults)
.toString();
}
}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service;
import java.util.List;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
......@@ -41,6 +42,13 @@ public final class BatchWriteRequest {
return writeRequests.size();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("writeRequests", writeRequests)
.toString();
}
/**
* Builder for BatchWriteRequest.
*/
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service;
import java.util.List;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
......@@ -43,4 +44,11 @@ public class BatchWriteResult {
public int batchSize() {
return writeResults.size();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("writeResults", writeResults)
.toString();
}
}
......
package org.onlab.onos.store.service;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.onlab.onos.cluster.ControllerNode;
......@@ -31,9 +31,9 @@ public interface DatabaseAdminService {
/**
* Lists all the tables in the database.
* @return list of table names.
* @return set of table names.
*/
public List<String> listTables();
public Set<String> listTables();
/**
* Deletes a table from the database.
......
......@@ -64,6 +64,7 @@ public class ReadResult {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("status", status)
.add("tableName", tableName)
.add("key", key)
.add("value", value)
......
......@@ -114,7 +114,7 @@ public class WriteRequest {
*/
public static WriteRequest removeIfVersionMatches(String tableName, String key,
long previousVersion) {
return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
return new WriteRequest(REMOVE_IF_VERSION, tableName, key,
null, previousVersion, null);
}
......@@ -129,7 +129,7 @@ public class WriteRequest {
*/
public static WriteRequest removeIfValueMatches(String tableName, String key,
byte[] oldValue) {
return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
null, ANY_VERSION, checkNotNull(oldValue));
}
......
......@@ -116,6 +116,15 @@ public class ClusterCommunicationManager
}
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) throws IOException {
boolean ok = true;
for (ControllerNode node : clusterService.getNodes()) {
ok = unicast(message, node.id()) && ok;
}
return ok;
}
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
......@@ -209,4 +218,4 @@ public class ClusterCommunicationManager
rawMessage.respond(response);
}
}
}
\ No newline at end of file
}
......
......@@ -3,6 +3,7 @@ package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -37,7 +38,7 @@ public class DatabaseClient {
public boolean createTable(String tableName, int ttlMillis) {
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
......@@ -65,9 +66,9 @@ public class DatabaseClient {
}
}
public List<String> listTables() {
public Set<String> listTables() {
CompletableFuture<List<String>> future = copycat.submit("listTables");
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
......
......@@ -33,7 +33,6 @@ import net.kuujo.copycat.event.LeaderElectEvent;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
......@@ -51,9 +50,6 @@ public class DatabaseEntryExpirationTracker implements
private final Logger log = LoggerFactory.getLogger(getClass());
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
......@@ -61,9 +57,9 @@ public class DatabaseEntryExpirationTracker implements
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(
Member localMember,
......@@ -78,31 +74,38 @@ public class DatabaseEntryExpirationTracker implements
@Override
public void tableModified(TableModificationEvent event) {
log.debug("Received a table modification event {}", event);
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
Long eventVersion = event.value().version();
switch (event.type()) {
case ROW_DELETED:
map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
// FIXME: The broadcast message should be sent to self.
clusterCommunicator.broadcast(new ClusterMessage(
localNode.id(), DATABASE_UPDATES,
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error(
"Failed to broadcast a database table modification event.",
e);
log.error("Failed to broadcast a database row deleted event.", e);
}
}
break;
case ROW_ADDED:
case ROW_UPDATED:
map.put(row, null);
// To account for potential reordering of notifications,
// check to make sure we are replacing an old version with a new version
Long currentVersion = map.get(row);
if (currentVersion == null || currentVersion < eventVersion) {
map.put(row, eventVersion);
}
break;
default:
break;
......@@ -111,60 +114,56 @@ public class DatabaseEntryExpirationTracker implements
@Override
public void tableCreated(TableMetadata metadata) {
log.debug("Received a table created event {}", metadata);
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
// TODO: make the expiration policy configurable.
// Do we need to support expiration based on last access time?
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
@Override
public void tableDeleted(String tableName) {
log.debug("Received a table deleted event for table ({})", tableName);
tableEntryExpirationMap.remove(tableName);
}
private class ExpirationObserver implements
ExpirationListener<DatabaseRow, VersionedValue> {
ExpirationListener<DatabaseRow, Long> {
@Override
public void expired(DatabaseRow key, VersionedValue value) {
public void expired(DatabaseRow row, Long version) {
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
if (!databaseService.removeIfVersionMatches(key.tableName,
key.key, value.version())) {
log.info("Entry in the database changed before right its TTL expiration.");
if (!databaseService.removeIfVersionMatches(row.tableName,
row.key, version)) {
log.info("Entry in database was updated right before its expiration.");
} else {
log.info("Successfully expired old entry with key ({}) from table ({})",
row.key, row.tableName);
}
} else {
// If this node is not the current leader, we should never
// let the expiring entries drop off
// Under stable conditions (i.e no leadership switch) the
// current leader will initiate
// a database remove and this instance will get notified
// of a tableModification event causing it to remove from
// the map.
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(key.tableName);
// Only the current leader will expire keys from database.
// Everyone else function as standby just in case they need to take over
if (map != null) {
map.put(key, value);
map.putIfAbsent(row, version);
}
}
} catch (Exception e) {
log.warn(
"Failed to delete entry from the database after ttl expiration. Will retry eviction",
e);
tableEntryExpirationMap.get(key.tableName).put(
new DatabaseRow(key.tableName, key.key), value);
log.warn("Failed to delete entry from the database after ttl "
+ "expiration. Operation will be retried.", e);
map.putIfAbsent(row, version);
}
}
}
@Override
public void handle(LeaderElectEvent event) {
if (localMember.equals(event.leader())) {
isLocalMemberLeader.set(true);
}
isLocalMemberLeader.set(localMember.equals(event.leader()));
}
/**
......@@ -212,12 +211,12 @@ public class DatabaseEntryExpirationTracker implements
continue;
}
Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
......
......@@ -7,7 +7,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
......@@ -19,6 +18,7 @@ import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
......@@ -160,18 +160,22 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
log.info("Starting cluster: {}", cluster);
DatabaseEntryExpirationTracker expirationTracker =
new DatabaseEntryExpirationTracker(
clusterConfig.getLocalMember(),
clusterService.getLocalNode(),
clusterCommunicator,
this);
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(
new DatabaseEntryExpirationTracker(
clusterConfig.getLocalMember(),
clusterService.getLocalNode(),
clusterCommunicator,
this));
stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
copycat.start();
client = new DatabaseClient(copycat);
......@@ -207,7 +211,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
@Override
public List<String> listTables() {
public Set<String> listTables() {
return client.listTables();
}
......
......@@ -30,6 +30,7 @@ import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -65,6 +66,7 @@ public class DatabaseStateMachine implements StateMachine {
.register(WriteStatus.class)
// TODO: Move this out ?
.register(TableModificationEvent.class)
.register(TableModificationEvent.Type.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
......@@ -85,7 +87,8 @@ public class DatabaseStateMachine implements StateMachine {
}
@Command
public boolean createTable(String tableName, int ttlMillis) {
public boolean createTableWithExpiration(String tableName) {
int ttlMillis = 10000;
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
......@@ -266,6 +269,7 @@ public class DatabaseStateMachine implements StateMachine {
// notify listeners of table mod events.
for (DatabaseUpdateEventListener listener : listeners) {
for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
log.info("Publishing table modification event: {}", tableModificationEvent);
listener.tableModified(tableModificationEvent);
}
}
......@@ -345,6 +349,15 @@ public class DatabaseStateMachine implements StateMachine {
public int ttlMillis() {
return ttlMillis;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("tableName", tableName)
.add("expireOldEntries", expireOldEntries)
.add("ttlMillis", ttlMillis)
.toString();
}
}
@Override
......
......@@ -80,6 +80,7 @@ public class DistributedLock implements Lock {
return false;
}
}
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
......@@ -95,9 +96,11 @@ public class DistributedLock implements Lock {
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
return false;
} else {
return true;
}
}
return true;
return false;
}
@Override
......@@ -105,6 +108,7 @@ public class DistributedLock implements Lock {
if (!isLocked()) {
return;
} else {
isLocked.set(false);
databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
}
}
......
......@@ -94,6 +94,8 @@ public class DistributedLockManager implements LockService {
return;
}
log.info("Received a lock available event for path: {}", event.key());
String path = event.key();
if (!locksToAcquire.containsKey(path)) {
return;
......@@ -159,4 +161,4 @@ public class DistributedLockManager implements LockService {
return future;
}
}
}
\ No newline at end of file
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.VersionedValue;
import com.google.common.base.MoreObjects;
/**
* A table modification event.
*/
......@@ -9,7 +11,6 @@ public final class TableModificationEvent {
/**
* Type of table modification event.
*
*/
public enum Type {
ROW_ADDED,
......@@ -94,4 +95,14 @@ public final class TableModificationEvent {
public Type type() {
return type;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("type", type)
.add("tableName", tableName)
.add("key", key)
.add("version", value.version())
.toString();
}
}
......