Madan Jampani
Committed by Gerrit Code Review

Misc bug fixes in preparation for enabling StorageManager

Change-Id: I953414891c901e5d1f92844ca8c4eaa8c042dd53
......@@ -24,12 +24,14 @@ import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.MoreObjects;
......@@ -41,6 +43,7 @@ import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
......@@ -66,10 +69,6 @@ public class CopycatTransportConnection implements Connection {
private final String inboundMessageSubject;
private final ThreadContext context;
private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
private final AtomicInteger messagesSent = new AtomicInteger(0);
private final AtomicInteger sendFailures = new AtomicInteger(0);
private final AtomicInteger messagesReceived = new AtomicInteger(0);
private final AtomicInteger receiveFailures = new AtomicInteger(0);
CopycatTransportConnection(long connectionId,
CopycatTransport.Mode mode,
......@@ -120,12 +119,14 @@ public class CopycatTransportConnection implements Connection {
baos.toByteArray(),
context.executor())
.whenComplete((r, e) -> {
if (e == null) {
messagesSent.incrementAndGet();
} else {
sendFailures.incrementAndGet();
Throwable wrappedError = e;
if (e != null) {
Throwable rootCause = Throwables.getRootCause(e);
if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
wrappedError = new TransportException(e);
}
}
handleResponse(r, e, result, context);
handleResponse(r, wrappedError, result, context);
});
} catch (SerializationException | IOException e) {
result.completeExceptionally(e);
......@@ -172,11 +173,6 @@ public class CopycatTransportConnection implements Connection {
"No handler registered for " + request.getClass()));
}
return handler.handle(request).handle((result, error) -> {
if (error == null) {
messagesReceived.incrementAndGet();
} else {
receiveFailures.incrementAndGet();
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
baos.write(error != null ? FAILURE : SUCCESS);
context.serializer().writeObject(error != null ? error : result, baos);
......@@ -220,7 +216,6 @@ public class CopycatTransportConnection implements Connection {
if (!(other instanceof CopycatTransportConnection)) {
return false;
}
return connectionId == ((CopycatTransportConnection) other).connectionId;
}
......@@ -228,10 +223,6 @@ public class CopycatTransportConnection implements Connection {
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", connectionId)
.add("sent", messagesSent.get())
.add("received", messagesReceived.get())
.add("sendFailures", sendFailures.get())
.add("receiveFailures", receiveFailures.get())
.toString();
}
......
......@@ -18,7 +18,6 @@ package org.onosproject.store.primitives.impl;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
import java.io.File;
import java.util.Collection;
......@@ -57,7 +56,6 @@ public class StoragePartition implements Managed<StoragePartition> {
private StoragePartitionClient client;
public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
......
......@@ -52,7 +52,7 @@ public final class AtomixConsistentMapCommands {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.SEQUENTIAL;
return ConsistencyLevel.LINEARIZABLE;
}
@Override
......@@ -78,7 +78,7 @@ public final class AtomixConsistentMapCommands {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.SEQUENTIAL;
return ConsistencyLevel.BOUNDED_LINEARIZABLE;
}
@Override
......