Praseed Balakrishnan

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

......@@ -157,7 +157,9 @@ public class FooComponent {
final String someTable = "admin";
final String someKey = "long";
if (!dbAdminService.listTables().contains(someTable)) {
dbAdminService.createTable(someTable);
}
VersionedValue vv = dbService.get(someTable, someKey);
if (vv == null) {
......
......@@ -223,11 +223,6 @@ public class BgpSessionManager {
synchronized void routeUpdates(BgpSession bgpSession,
Collection<BgpRouteEntry> addedBgpRouteEntries,
Collection<BgpRouteEntry> deletedBgpRouteEntries) {
//
// TODO: Merge the updates from different BGP Peers,
// by choosing the best route.
//
// Process the deleted route entries
for (BgpRouteEntry bgpRouteEntry : deletedBgpRouteEntries) {
processDeletedRoute(bgpSession, bgpRouteEntry);
......
......@@ -22,6 +22,7 @@ public class ReadResult {
/**
* Returns the status of the read operation.
* @return read operation status
*/
public ReadStatus status() {
return status;
......
......@@ -269,7 +269,7 @@ implements MastershipService, MastershipAdminService {
@Override
public void notify(MastershipEvent event) {
log.info("dispatching mastership event {}", event);
log.trace("dispatching mastership event {}", event);
eventDispatcher.post(event);
}
......
......@@ -174,7 +174,7 @@ public class ClusterMessagingProtocol
public ProtocolClient createClient(TcpMember member) {
ControllerNode remoteNode = getControllerNode(member.host(), member.port());
checkNotNull(remoteNode,
"A valid controller node is expected for %s:%s",
"No matching ONOS Node for %s:%s",
member.host(), member.port());
return new ClusterMessagingProtocolClient(
clusterCommunicator, clusterService.getLocalNode(), remoteNode);
......
......@@ -20,11 +20,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
//import net.jodah.expiringmap.ExpiringMap;
//import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
//import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;
import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
......@@ -34,19 +35,22 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Database update event handler.
* Plugs into the database update stream and track the TTL of entries added to
* the database. For tables with pre-configured finite TTL, this class has
* mechanisms for expiring (deleting) old, expired entries from the database.
*/
public class DatabaseUpdateEventHandler implements
public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final MessageSubject DATABASE_UPDATES =
new MessageSubject("database-update-event");
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
private DatabaseService databaseService;
private ClusterService cluster;
......@@ -54,29 +58,32 @@ public class DatabaseUpdateEventHandler implements
private final Member localMember;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
//private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
DatabaseUpdateEventHandler(Member localMember) {
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(Member localMember) {
this.localMember = localMember;
}
@Override
public void tableModified(TableModificationEvent event) {
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
switch (event.type()) {
case ROW_DELETED:
if (isLocalMemberLeader.get()) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
cluster.getLocalNode().id(),
DATABASE_UPDATES,
clusterCommunicator.broadcast(new ClusterMessage(cluster
.getLocalNode().id(), DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database table modification event.", e);
log.error(
"Failed to broadcast a database table modification event.",
e);
}
}
break;
......@@ -94,15 +101,11 @@ public class DatabaseUpdateEventHandler implements
// make this explicit instead of relying on a negative value
// to indicate no expiration.
if (expirationTimeMillis > 0) {
tableEntryExpirationMap.put(tableName, null);
/*
ExpiringMap.builder()
tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
.expiration(expirationTimeMillis, TimeUnit.SECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
.expirationPolicy(ExpirationPolicy.CREATED)
.build());
*/
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
......@@ -111,27 +114,40 @@ public class DatabaseUpdateEventHandler implements
tableEntryExpirationMap.remove(tableName);
}
/*
private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
private class ExpirationObserver implements
ExpirationListener<DatabaseRow, VersionedValue> {
@Override
public void expired(DatabaseRow key, Void value) {
public void expired(DatabaseRow key, VersionedValue value) {
try {
// TODO: The safety of this check needs to be verified.
// Couple of issues:
// 1. It is very likely that only one member should attempt deletion of the entry from database.
// 2. A potential race condition exists where the entry expires, but before its can be deleted
// from the database, a new entry is added or existing entry is updated.
// That means ttl and expiration should be for a given version.
if (isLocalMemberLeader.get()) {
databaseService.remove(key.tableName, key.key);
if (!databaseService.removeIfVersionMatches(key.tableName,
key.key, value.version())) {
log.info("Entry in the database changed before right its TTL expiration.");
}
} else {
// If this node is not the current leader, we should never
// let the expiring entries drop off
// Under stable conditions (i.e no leadership switch) the
// current leader will initiate
// a database remove and this instance will get notified
// of a tableModification event causing it to remove from
// the map.
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(key.tableName);
if (map != null) {
map.put(key, value);
}
}
} catch (Exception e) {
log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
log.warn(
"Failed to delete entry from the database after ttl expiration. Will retry eviction",
e);
tableEntryExpirationMap.get(key.tableName).put(
new DatabaseRow(key.tableName, key.key), value);
}
}
}
*/
@Override
public void handle(LeaderElectEvent event) {
......@@ -140,6 +156,9 @@ public class DatabaseUpdateEventHandler implements
}
}
/**
* Wrapper class for a database row identifier.
*/
private class DatabaseRow {
String tableName;
......@@ -160,8 +179,8 @@ public class DatabaseUpdateEventHandler implements
}
DatabaseRow that = (DatabaseRow) obj;
return Objects.equals(this.tableName, that.tableName) &&
Objects.equals(this.key, that.key);
return Objects.equals(this.tableName, that.tableName)
&& Objects.equals(this.key, that.key);
}
@Override
......
......@@ -67,6 +67,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
// FIXME: point to appropriate path
public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
// Current working dir seems to be /opt/onos/apache-karaf-3.0.2
......
......@@ -237,8 +237,8 @@ public class DatabaseStateMachine implements StateMachine {
WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
results.add(putResult);
tableModificationEvent = (previousValue == null) ?
TableModificationEvent.rowAdded(request.tableName(), request.key()) :
TableModificationEvent.rowUpdated(request.tableName(), request.key());
TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
break;
case REMOVE:
......@@ -249,7 +249,7 @@ public class DatabaseStateMachine implements StateMachine {
results.add(removeResult);
if (removedValue != null) {
tableModificationEvent =
TableModificationEvent.rowDeleted(request.tableName(), request.key());
TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
}
break;
......
......@@ -29,15 +29,14 @@ public interface DatabaseUpdateEventListener {
/**
* Notifies listeners of a table created event.
* @param tableName
* @param expirationTimeMillis
* @param tableName name of the table created
* @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
*/
public void tableCreated(String tableName, int expirationTimeMillis);
/**
* Notifies listeners of a table deleted event.
* @param tableName
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
}
\ No newline at end of file
......
......@@ -33,7 +33,8 @@ public class DistributedLockManager implements LockService {
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
......@@ -61,11 +62,7 @@ public class DistributedLockManager implements LockService {
@Override
public Lock create(String path) {
return new DistributedLock(
path,
databaseService,
clusterService,
this);
return new DistributedLock(path, databaseService, clusterService, this);
}
@Override
......@@ -80,21 +77,19 @@ public class DistributedLockManager implements LockService {
throw new UnsupportedOperationException();
}
protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
long waitTimeMillis,
int leaseDurationMillis) {
protected CompletableFuture<Void> lockIfAvailable(Lock lock,
long waitTimeMillis, int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
locksToAcquire.put(
lock.path(),
new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
leaseDurationMillis, future));
return future;
}
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
.decode(message.payload());
if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
return;
}
......@@ -110,15 +105,20 @@ public class DistributedLockManager implements LockService {
return;
}
Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
synchronized (existingRequests) {
Iterator<LockRequest> existingRequestIterator = existingRequests
.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (request.expirationTime().isAfter(DateTime.now())) {
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(request.leaseDurationMillis())) {
if (request.lock().tryLock(
request.leaseDurationMillis())) {
request.future().complete(null);
existingRequests.remove(0);
existingRequestIterator.remove();
}
}
}
}
......@@ -133,14 +133,12 @@ public class DistributedLockManager implements LockService {
private final int leaseDurationMillis;
private final CompletableFuture<Void> future;
public LockRequest(
Lock lock,
long waitTimeMillis,
int leaseDurationMillis,
CompletableFuture<Void> future) {
public LockRequest(Lock lock, long waitTimeMillis,
int leaseDurationMillis, CompletableFuture<Void> future) {
this.lock = lock;
this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.expirationTime = DateTime.now().plusMillis(
(int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
......
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.VersionedValue;
/**
* A table modification event.
*/
......@@ -17,41 +19,46 @@ public final class TableModificationEvent {
private final String tableName;
private final String key;
private final VersionedValue value;
private final Type type;
/**
* Creates a new row deleted table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key when it was deleted.
* @return table modification event.
*/
public static TableModificationEvent rowDeleted(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
public static TableModificationEvent rowDeleted(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_DELETED);
}
/**
* Creates a new row added table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key
* @return table modification event.
*/
public static TableModificationEvent rowAdded(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
public static TableModificationEvent rowAdded(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_ADDED);
}
/**
* Creates a new row updated table modification event.
* @param tableName table name.
* @param key row key
* @param newValue value
* @return table modification event.
*/
public static TableModificationEvent rowUpdated(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
public static TableModificationEvent rowUpdated(String tableName, String key, VersionedValue newValue) {
return new TableModificationEvent(tableName, key, newValue, Type.ROW_UPDATED);
}
private TableModificationEvent(String tableName, String key, Type type) {
private TableModificationEvent(String tableName, String key, VersionedValue value, Type type) {
this.tableName = tableName;
this.key = key;
this.value = value;
this.type = type;
}
......@@ -72,6 +79,15 @@ public final class TableModificationEvent {
}
/**
* Returns the value associated with the key. If the event for a deletion, this
* method returns value that was deleted.
* @return row value
*/
public VersionedValue value() {
return value;
}
/**
* Returns the type of table modification event.
* @return event type.
*/
......
......@@ -176,7 +176,7 @@
com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
-->
<merge-policy>com.hazelcast.map.merge.PassThroughMergePolicy</merge-policy>
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
</map>
......
......@@ -13,6 +13,8 @@ jar=$(find org/onlab -type f -name '*.jar' | grep -e $1 | grep -v -e -tests | he
bundle=$(echo $(basename $jar .jar) | sed 's/-[0-9].*//g')
echo "pushing bundle: $bundle"
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for node in $nodes; do
scp -q $jar $ONOS_USER@$node:.m2/repository/$jar
......
......@@ -39,30 +39,23 @@
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat</artifactId>
<version>${copycat.version}</version>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.3.1</version>
</dependency>
<!-- Commented out due to Chronicle + OSGi issue
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-chronicle</artifactId>
<artifactId>copycat</artifactId>
<version>${copycat.version}</version>
</dependency>
-->
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-tcp</artifactId>
<version>${copycat.version}</version>
</dependency>
<!-- chronicle transitive dependency
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.1.0</version>
</dependency>
-->
</dependencies>
<build>
......@@ -89,20 +82,19 @@
</filter>
<filter>
<artifact>net.kuujo.copycat:*</artifact>
<artifact>net.jodah.expiringmap:*</artifact>
<includes>
<include>net/kuujo/copycat/**</include>
<include>net/jodah/expiringmap/**</include>
</includes>
</filter>
<!-- chronicle transitive dependency
<filter>
<artifact>net.java.dev.jna:*</artifact>
<artifact>net.kuujo.copycat:*</artifact>
<includes>
<include>com/sun/jna/**</include>
<include>net/kuujo/copycat/**</include>
</includes>
</filter>
-->
</filters>
</configuration>
<executions>
......@@ -120,7 +112,7 @@
<configuration>
<instructions>
<Export-Package>
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*;net.jodah.expiringmap.*
</Export-Package>
</instructions>
</configuration>
......
......@@ -146,3 +146,24 @@
border: 0;
}
/* Web Socket Closed Mask (starts hidden) */
#topo-mask {
display: none;
position: absolute;
top: 0;
left: 0;
width: 10000px;
height: 8000px;
z-index: 5000;
background-color: rgba(0,0,0,0.75);
padding: 60px;
}
#topo-mask p {
margin: 8px 20px;
color: #ddd;
font-size: 14pt;
font-style: italic;
}
......
......@@ -151,6 +151,7 @@
debug: false
},
webSock,
sid = 0,
deviceLabelIndex = 0,
hostLabelIndex = 0,
detailPane,
......@@ -169,7 +170,8 @@
nodeG,
linkG,
node,
link;
link,
mask;
// ==============================
// For Debugging / Development
......@@ -193,10 +195,6 @@
function testMe(view) {
view.alert('test');
detailPane.show();
setTimeout(function () {
detailPane.hide();
}, 3000);
}
function abortIfLive() {
......@@ -1059,6 +1057,7 @@
webSock.ws = new WebSocket(webSockUrl());
webSock.ws.onopen = function() {
noWebSock(false);
};
webSock.ws.onmessage = function(m) {
......@@ -1070,6 +1069,7 @@
webSock.ws.onclose = function(m) {
webSock.ws = null;
noWebSock(true);
};
},
......@@ -1089,7 +1089,9 @@
};
var sid = 0;
function noWebSock(b) {
mask.style('display',b ? 'block' : 'none');
}
// TODO: use cache of pending messages (key = sid) to reconcile responses
......@@ -1273,6 +1275,11 @@
}
function para(sel, text) {
sel.append('p').text(text);
}
// ==============================
// View life-cycle callbacks
......@@ -1367,6 +1374,12 @@
.on('tick', tick);
network.drag = d3u.createDragBehavior(network.force, selectCb, atDragEnd);
// create mask layer for when we lose connection to server.
mask = view.$div.append('div').attr('id','topo-mask');
para(mask, 'Oops!');
para(mask, 'Web-socket connection to server closed...');
para(mask, 'Try refreshing the page.');
}
function load(view, ctx, flags) {
......