Aaron Kruglikov
Committed by Gerrit Code Review

Moving from local to netty transport.

Change-Id: Id37af6fa4d0971fd34ed18951196dde47bc4a12d
......@@ -9,6 +9,12 @@ COMPILE_DEPS = [
TEST_DEPS = [
'//lib:TEST',
'//core/api:onos-api-tests',
'//lib:netty-transport',
'//lib:catalyst-transport',
'//lib:netty-handler',
'//lib:netty-buffer',
'//lib:netty-codec',
]
osgi_jar_with_tests (
......
......@@ -73,5 +73,22 @@
<artifactId>atomix</artifactId>
<version>1.0.0-rc8</version>
</dependency>
<dependency>
<groupId>io.atomix.catalyst</groupId>
<artifactId>catalyst-netty</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>io.atomix.catalyst</groupId>
<artifactId>catalyst-transport</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......
......@@ -166,11 +166,13 @@ public final class AsyncConsistentMultimapCommands {
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(value, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
value = serializer.readObject(buffer);
}
}
......@@ -548,6 +550,9 @@ public final class AsyncConsistentMultimapCommands {
*/
public static class Get extends
KeyQuery<Versioned<Collection<? extends byte[]>>> {
public Get() {
}
public Get(String key) {
super(key);
}
......
......@@ -19,6 +19,7 @@ package org.onosproject.store.primitives.resources.impl;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
......@@ -170,16 +171,19 @@ public class AsyncConsistentSetMultimapState extends ResourceStateMachine
*/
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
try {
if (backingMap.values().isEmpty()) {
return false;
}
Match<byte[]> match = Match.ifValue(commit.operation().value());
return backingMap
.values()
.stream()
.anyMatch(valueList ->
valueList
.values()
.stream()
.anyMatch(byteValue ->
match.matches(byteValue)));
valueList
.values()
.stream()
.anyMatch(byteValue ->
match.matches(byteValue)));
} finally {
commit.close();
}
......@@ -230,7 +234,7 @@ public class AsyncConsistentSetMultimapState extends ResourceStateMachine
*/
protected Set<String> keySet(Commit<? extends KeySet> commit) {
try {
return backingMap.keySet();
return ImmutableSet.copyOf(backingMap.keySet());
} finally {
commit.close();
}
......@@ -444,7 +448,7 @@ public class AsyncConsistentSetMultimapState extends ResourceStateMachine
@Override
public Collection<? extends byte[]> values() {
return valueCountdownMap.keySet();
return ImmutableSet.copyOf(valueCountdownMap.keySet());
}
@Override
......@@ -747,4 +751,4 @@ public class AsyncConsistentSetMultimapState extends ResourceStateMachine
}
}
}
}
}
......
......@@ -182,6 +182,18 @@ public final class AtomixLeaderElectorCommands {
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(nodeId, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
nodeId = serializer.readObject(buffer);
}
}
/**
......
......@@ -16,6 +16,8 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.ImmutableSet;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
......@@ -287,10 +289,10 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
try {
NodeId nodeId = commit.operation().nodeId();
return Maps.filterEntries(elections, e -> {
return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Leader leader = leadership(e.getKey()).leader();
return leader != null && leader.nodeId().equals(nodeId);
}).keySet();
}).keySet());
} finally {
commit.close();
}
......
......@@ -20,27 +20,18 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import com.google.common.io.Files;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.local.LocalTransport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.resource.ResourceType;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......@@ -49,7 +40,6 @@ import static org.junit.Assert.assertTrue;
/**
* Tests the {@link AsyncConsistentSetMultimap}.
*/
@Ignore
public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
private final File testDir = Files.createTempDir();
private final String keyOne = "hello";
......@@ -66,6 +56,7 @@ public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
valueTwo,
valueThree,
valueFour);
private final AtomicInteger port = new AtomicInteger(49200);
@Override
protected ResourceType resourceType() {
......@@ -411,7 +402,6 @@ public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
clearTests();
}
private AsyncConsistentSetMultimap createResource(int clusterSize) {
try {
createCopycatServers(clusterSize);
......@@ -424,24 +414,6 @@ public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
}
}
@Override
protected CopycatServer createCopycatServer(Address address) {
CopycatServer server = CopycatServer.builder(address)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory(testDir + "/" + address.port())
.build())
.withStateMachine(ResourceManagerState::new)
.withSerializer(serializer.clone())
.withHeartbeatInterval(Duration.ofMillis(25))
.withElectionTimeout(Duration.ofMillis(50))
.withSessionTimeout(Duration.ofMillis(100))
.build();
copycatServers.add(server);
return server;
}
/**
* Returns two arrays contain the same set of elements,
* regardless of order.
......
......@@ -15,19 +15,9 @@
*/
package org.onosproject.store.primitives.resources.impl;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import io.atomix.resource.ResourceType;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
......@@ -37,13 +27,27 @@ import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
@Ignore
public class AtomixConsistentMapTest extends AtomixTestBase {
@Override
......
......@@ -20,7 +20,6 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
......@@ -36,7 +35,6 @@ import io.atomix.resource.ResourceType;
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
@Ignore
public class AtomixLeaderElectorTest extends AtomixTestBase {
NodeId node1 = new NodeId("node1");
......
......@@ -15,19 +15,18 @@
*/
package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.*;
import org.junit.Ignore;
import org.junit.Test;
import io.atomix.Atomix;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
/**git s
* Unit tests for {@link AtomixCounter}.
*/
@Ignore
public class AtomixLongTest extends AtomixTestBase {
@Override
......
......@@ -15,17 +15,22 @@
*/
package org.onosproject.store.primitives.resources.impl;
import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.local.LocalServerRegistry;
import io.atomix.catalyst.transport.local.LocalTransport;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.resource.ResourceType;
import org.junit.After;
import org.junit.Before;
import org.onlab.junit.TestTools;
import org.onosproject.store.primitives.impl.CatalystSerializers;
import java.io.File;
import java.io.IOException;
......@@ -35,12 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.onosproject.store.primitives.impl.CatalystSerializers;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Base class for various Atomix* tests.
......@@ -48,7 +48,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
public abstract class AtomixTestBase {
private static final File TEST_DIR = new File("target/test-logs");
protected LocalServerRegistry registry;
protected int port;
protected final AtomicInteger port = new AtomicInteger(49200);
protected List<Address> members;
protected List<CopycatClient> copycatClients = new ArrayList<>();
protected List<CopycatServer> copycatServers = new ArrayList<>();
......@@ -69,7 +69,8 @@ public abstract class AtomixTestBase {
* @return The next server address.
*/
private Address nextAddress() {
Address address = new Address("localhost", port++);
Address address = new Address("127.0.0.1",
TestTools.findAvailablePort(port.getAndIncrement()));
members.add(address);
return address;
}
......@@ -82,13 +83,16 @@ public abstract class AtomixTestBase {
List<CopycatServer> servers = new ArrayList<>();
List<Address> members = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
members.add(nextAddress());
}
for (int i = 0; i < nodes; i++) {
CopycatServer server = createCopycatServer(members.get(i));
server.bootstrap(members).thenRun(latch::countDown);
Address address = nextAddress();
members.add(address);
CopycatServer server = createCopycatServer(address);
if (members.size() <= 1) {
server.bootstrap().thenRun(latch::countDown).join();
} else {
server.join(members).thenRun(latch::countDown);
}
servers.add(server);
}
......@@ -102,11 +106,10 @@ public abstract class AtomixTestBase {
*/
protected CopycatServer createCopycatServer(Address address) {
CopycatServer server = CopycatServer.builder(address)
.withTransport(new LocalTransport(registry))
.withTransport(NettyTransport.builder().withThreads(1).build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(TEST_DIR + "/" + address.port())
.build())
.withStorageLevel(StorageLevel.MEMORY)
.build())
.withStateMachine(ResourceManagerState::new)
.withSerializer(serializer.clone())
.withHeartbeatInterval(Duration.ofMillis(25))
......@@ -122,7 +125,6 @@ public abstract class AtomixTestBase {
public void clearTests() throws Exception {
registry = new LocalServerRegistry();
members = new ArrayList<>();
port = 5000;
CompletableFuture<Void> closeClients =
CompletableFuture.allOf(atomixClients.stream()
......@@ -165,7 +167,7 @@ public abstract class AtomixTestBase {
protected AtomixClient createAtomixClient() {
CountDownLatch latch = new CountDownLatch(1);
AtomixClient client = AtomixClient.builder()
.withTransport(new LocalTransport(registry))
.withTransport(NettyTransport.builder().withThreads(1).build())
.withSerializer(serializer.clone())
.build();
client.connect(members).thenRun(latch::countDown);
......
......@@ -762,6 +762,15 @@ remote_jar (
)
remote_jar (
name = 'catalyst-concurrent',
out = 'catalyst-concurrent-1.1.1.jar',
url = 'mvn:io.atomix.catalyst:catalyst-concurrent:jar:1.1.1',
sha1 = 'a7f3499b9815d83f65137abe0146238e447514c7',
maven_coords = 'io.atomix.catalyst:catalyst-concurrent:1.1.1',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'netty-transport-native-epoll',
out = 'netty-transport-native-epoll-4.0.36.Final.jar',
url = 'mvn:io.netty:netty-transport-native-epoll:jar:4.0.36.Final',
......@@ -771,6 +780,24 @@ remote_jar (
)
remote_jar (
name = 'catalyst-netty',
out = 'catalyst-netty-1.1.1.jar',
url = 'mvn:io.atomix.catalyst:catalyst-netty:jar:1.1.1',
sha1 = '8e9e5e6d8fdf01be26aa8a8eb07f762f5f4d4eb4',
maven_coords = 'io.atomix.catalyst:catalyst-netty:1.1.1',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-transport',
out = 'catalyst-transport-1.1.1.jar',
url = 'mvn:io.atomix.catalyst:catalyst-transport:jar:1.1.1',
sha1 = '2b38cb9ee3b5817b017072a886006461824d00c6',
maven_coords = 'io.atomix.catalyst:catalyst-transport:1.1.1',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'objenesis',
out = 'objenesis-2.2.jar',
url = 'mvn:org.objenesis:objenesis:jar:2.2',
......
......@@ -156,7 +156,10 @@
"netty-common": "mvn:io.netty:netty-common:4.0.36.Final",
"netty-handler": "mvn:io.netty:netty-handler:4.0.36.Final",
"netty-transport": "mvn:io.netty:netty-transport:4.0.36.Final",
"catalyst-concurrent": "mvn:io.atomix.catalyst:catalyst-concurrent:1.1.1",
"netty-transport-native-epoll": "mvn:io.netty:netty-transport-native-epoll:4.0.36.Final",
"catalyst-netty": "mvn:io.atomix.catalyst:catalyst-netty:1.1.1",
"catalyst-transport": "mvn:io.atomix.catalyst:catalyst-transport:1.1.1",
"objenesis": "mvn:org.objenesis:objenesis:2.2",
"openflowj": "mvn:org.onosproject:openflowj:0.9.4.onos",
"org.apache.felix.scr": "mvn:org.apache.felix:org.apache.felix.scr:1.8.2",
......