Yuta HIGUCHI

DatabaseService related fixes.

- Note: This patch does not fix the issue running as single node.

Change-Id: Iabfa548ca4e40e4ec5c9e76ae936300437e53d22
......@@ -39,6 +39,7 @@ import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.BatchWriteRequest.Builder;
import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
......@@ -108,7 +109,21 @@ public class DistributedLinkResourceStore implements LinkResourceStore {
serializer = new KryoSerializer();
Set<String> tables = databaseAdminService.listTables();
Set<String> tables = null;
int retries = 0;
do {
try {
tables = databaseAdminService.listTables();
} catch (DatabaseException e) {
log.debug("DatabaseException", e);
retries++;
if (retries > 10) {
log.error("Failed to list tables, moving on", e);
tables = new HashSet<>();
}
}
} while (tables == null);
if (!tables.contains(LINK_RESOURCE_ALLOCATIONS)) {
databaseAdminService.createTable(LINK_RESOURCE_ALLOCATIONS);
}
......@@ -116,6 +131,7 @@ public class DistributedLinkResourceStore implements LinkResourceStore {
databaseAdminService.createTable(INTENT_ALLOCATIONS);
}
log.info("Started");
}
......
......@@ -112,7 +112,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
clusterService.addListener(listener);
// wait for specified controller node to come up
return null;
return appeared;
}
@Override
......
......@@ -81,7 +81,7 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
}
}
if (handler == null) {
log.error("There was no handler for registered!");
log.error("There was no handler registered!");
return;
}
}
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Arrays;
......@@ -12,7 +13,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
......@@ -40,23 +40,28 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> {
private final Logger log = getLogger(getClass());
private final DatabaseProtocolService protocol;
private volatile ProtocolClient copycat = null;
private volatile Member currentLeader = null;
private volatile ProtocolClient client = null;
private volatile TcpMember currentLeader = null;
public DatabaseClient(DatabaseProtocolService protocol) {
this.protocol = protocol;
this.protocol = checkNotNull(protocol);
}
// FIXME This handler relies on a fact that local node is part of Raft cluster
@Override
public void handle(LeaderElectEvent event) {
Member newLeader = event.leader();
final TcpMember newLeader = event.leader();
if (newLeader != null && !newLeader.equals(currentLeader)) {
log.info("{} became the new leader", newLeader);
ProtocolClient prevClient = client;
ProtocolClient newclient = protocol.createClient(newLeader);
newclient.connect();
client = newclient;
currentLeader = newLeader;
if (copycat != null) {
copycat.close();
if (prevClient != null) {
prevClient.close();
}
copycat = protocol.createClient((TcpMember) currentLeader);
copycat.connect();
}
}
......@@ -92,7 +97,7 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> {
SubmitRequest request =
new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
log.debug("Sent {} to {}", request, currentLeader);
......
......@@ -173,9 +173,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
client = new DatabaseClient(copycatMessagingProtocol);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(client);
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
......