tom

Merge remote-tracking branch 'origin/master'

......@@ -30,6 +30,10 @@ public final class SimpleNettyClient {
System.exit(0);
}
public static void startStandalone(String... args) throws Exception {
String host = args.length > 0 ? args[0] : "localhost";
int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
messaging.activate();
......@@ -37,28 +41,26 @@ public final class SimpleNettyClient {
MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
final int warmup = 10000;
for (int i = 0; i < warmup; i++) {
messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8081), "echo",
.sendAndReceive(new Endpoint(host, port), "echo",
"Hello World".getBytes());
}
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
final int iterations = 10000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
context.stop();
}
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8081), "echo",
.sendAndReceive(new Endpoint(host, port), "echo",
"Hello World".getBytes());
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
......
......@@ -14,30 +14,26 @@ import org.onlab.onos.cli.AbstractShellCommand;
public class SimpleNettyClientCommand extends AbstractShellCommand {
//FIXME: replace these arguments with proper ones needed for the test.
@Argument(index = 0, name = "serverIp", description = "Server IP address",
@Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
String serverIp = "127.0.0.1";
String host = "localhost";
@Argument(index = 1, name = "workers", description = "IO workers",
@Argument(index = 3, name = "port", description = "Port",
required = false, multiValued = false)
String workers = "6";
String port = "8081";
@Argument(index = 2, name = "messageCount", description = "Message count",
required = false, multiValued = false)
String messageCount = "1000000";
@Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
@Argument(index = 1, name = "warmupCount", description = "Warm-up count",
required = false, multiValued = false)
String messageLength = "128";
String warmup = "10000";
@Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
@Argument(index = 2, name = "messageCount", description = "Message count",
required = false, multiValued = false)
String timeoutSecs = "60";
String messageCount = "5000000";
@Override
protected void execute() {
try {
startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
startStandalone(new String[]{host, port, warmup, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
......
......@@ -42,6 +42,7 @@ import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
......@@ -117,11 +118,11 @@ public class GossipDeviceStore
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class)
.register(InternalPortEvent.class)
.register(InternalPortStatusEvent.class)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class)
.register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.build()
.populate(1);
}
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalDeviceEvent}.
*/
public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
/**
* Creates a serializer for {@link InternalDeviceEvent}.
*/
public InternalDeviceEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.deviceDescription());
}
@Override
public InternalDeviceEvent read(Kryo kryo, Input input,
Class<InternalDeviceEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<DeviceDescription> deviceDescription = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
}
}
package org.onlab.onos.store.device.impl;
import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalPortEvent}.
*/
public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
/**
* Creates a serializer for {@link InternalPortEvent}.
*/
public InternalPortEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalPortEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.portDescriptions());
}
@Override
public InternalPortEvent read(Kryo kryo, Input input,
Class<InternalPortEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<List<PortDescription>> portDescriptions = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
return new InternalPortEvent(providerId, deviceId, portDescriptions);
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalPortStatusEvent}.
*/
public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
/**
* Creates a serializer for {@link InternalPortStatusEvent}.
*/
public InternalPortStatusEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.portDescription());
}
@Override
public InternalPortStatusEvent read(Kryo kryo, Input input,
Class<InternalPortStatusEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
return new InternalPortStatusEvent(providerId, deviceId, portDescription);
}
}