alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 80 changed files with 1590 additions and 279 deletions
......@@ -24,6 +24,11 @@
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
......
package org.onlab.onos.foo;
import java.io.IOException;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Message handler that echos the message back to the sender.
*/
public class NettyEchoHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) throws IOException {
//log.info("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
package org.onlab.onos.foo;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageHandler that simply logs the information.
*/
public class NettyLoggingHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) {
//log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
......@@ -2,7 +2,6 @@ package org.onlab.onos.foo;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.metrics.MetricsComponent;
......@@ -11,12 +10,17 @@ import org.onlab.metrics.MetricsManager;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleNettyClient {
private SimpleNettyClient() {
private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
private SimpleNettyClient() {
}
public static void main(String[] args)
......@@ -30,33 +34,46 @@ public final class SimpleNettyClient {
System.exit(0);
}
public static void startStandalone(String... args) throws Exception {
public static void startStandalone(String[] args) throws Exception {
String host = args.length > 0 ? args[0] : "localhost";
int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
Endpoint endpoint = new Endpoint(host, port);
messaging.activate();
metrics.activate();
MetricsFeature feature = new MetricsFeature("timers");
MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
final int warmup = 100;
log.info("connecting " + host + ":" + port + " warmup:" + warmup + " iterations:" + iterations);
for (int i = 0; i < warmup; i++) {
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
Response response = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
}
log.info("measuring async sender");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
metrics.deactivate();
}
public static class TestNettyMessagingService extends NettyMessagingService {
......
......@@ -13,33 +13,29 @@ import org.onlab.onos.cli.AbstractShellCommand;
description = "Starts the simple Netty client")
public class SimpleNettyClientCommand extends AbstractShellCommand {
@Argument(index = 0, name = "serverIp", description = "Server IP address",
//FIXME: replace these arguments with proper ones needed for the test.
@Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
String serverIp = "127.0.0.1";
String hostname = "localhost";
@Argument(index = 1, name = "workers", description = "IO workers",
@Argument(index = 1, name = "port", description = "Port",
required = false, multiValued = false)
String workers = "6";
String port = "8081";
@Argument(index = 2, name = "messageCount", description = "Message count",
@Argument(index = 2, name = "warmupCount", description = "Warm-up count",
required = false, multiValued = false)
String messageCount = "1000000";
String warmupCount = "1000";
@Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
@Argument(index = 3, name = "messageCount", description = "Message count",
required = false, multiValued = false)
String messageLength = "128";
@Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
required = false, multiValued = false)
String timeoutSecs = "60";
String messageCount = "100000";
@Override
protected void execute() {
try {
startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
startStandalone(new String[]{hostname, port, warmupCount, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
}
}
......
package org.onlab.onos.foo;
import org.onlab.netty.EchoHandler;
import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -9,7 +8,7 @@ import org.slf4j.LoggerFactory;
* Test to measure Messaging performance.
*/
public final class SimpleNettyServer {
private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static Logger log = LoggerFactory.getLogger(SimpleNettyServer.class);
private SimpleNettyServer() {}
......@@ -19,10 +18,10 @@ import org.slf4j.LoggerFactory;
}
public static void startStandalone(String[] args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
NettyMessagingService server = new NettyMessagingService(8081);
server.activate();
server.registerHandler("simple", new org.onlab.netty.LoggingHandler());
server.registerHandler("echo", new EchoHandler());
server.registerHandler("simple", new NettyLoggingHandler());
server.registerHandler("echo", new NettyEchoHandler());
}
}
......
......@@ -9,10 +9,11 @@ import org.onlab.onos.cli.AbstractShellCommand;
/**
* Starts the Simple Netty server.
*/
@Command(scope = "onos", name = "test-netty-server",
@Command(scope = "onos", name = "simple-netty-server",
description = "Starts the simple netty server")
public class SimpleNettyServerCommand extends AbstractShellCommand {
//FIXME: Replace these with parameters for
@Argument(index = 0, name = "serverIp", description = "Server IP address",
required = false, multiValued = false)
String serverIp = "127.0.0.1";
......
......@@ -7,6 +7,12 @@
<command>
<action class="org.onlab.onos.foo.TestIOServerCommand"/>
</command>
<command>
<action class="org.onlab.onos.foo.SimpleNettyServerCommand"/>
</command>
<command>
<action class="org.onlab.onos.foo.SimpleNettyClientCommand"/>
</command>
</command-bundle>
</blueprint>
......
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.CoreService;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRuleService;
......@@ -21,7 +22,8 @@ public class SummaryCommand extends AbstractShellCommand {
protected void execute() {
TopologyService topologyService = get(TopologyService.class);
Topology topology = topologyService.currentTopology();
print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
print("version=%s, nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
get(CoreService.class).version().toString(),
get(ClusterService.class).getNodes().size(),
get(DeviceService.class).getDeviceCount(),
get(LinkService.class).getLinkCount(),
......
......@@ -27,7 +27,7 @@ public class AddHostToHostIntentCommand extends AbstractShellCommand {
required = true, multiValued = false)
String two = null;
private static long id = 1;
private static long id = 0x7870001;
@Override
protected void execute() {
......
package org.onlab.onos.cli.net;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.packet.Ethernet;
/**
* Installs point-to-point connectivity intents.
*/
@Command(scope = "onos", name = "add-point-intent",
description = "Installs point-to-point connectivity intent")
public class AddPointToPointIntentCommand extends AbstractShellCommand {
@Argument(index = 0, name = "ingressDevice",
description = "Ingress Device/Port Description",
required = true, multiValued = false)
String ingressDeviceString = null;
@Argument(index = 1, name = "egressDevice",
description = "Egress Device/Port Description",
required = true, multiValued = false)
String egressDeviceString = null;
private static long id = 0x7470001;
@Override
protected void execute() {
IntentService service = get(IntentService.class);
DeviceId ingressDeviceId = DeviceId.deviceId(getDeviceId(ingressDeviceString));
PortNumber ingressPortNumber =
PortNumber.portNumber(getPortNumber(ingressDeviceString));
ConnectPoint ingress = new ConnectPoint(ingressDeviceId, ingressPortNumber);
DeviceId egressDeviceId = DeviceId.deviceId(getDeviceId(egressDeviceString));
PortNumber egressPortNumber =
PortNumber.portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
Intent intent =
new PointToPointIntent(new IntentId(id++),
selector,
treatment,
ingress,
egress);
service.submit(intent);
}
/**
* Extracts the port number portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return port number as a string, empty string if the port is not found
*/
private String getPortNumber(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(slash + 1, deviceString.length());
}
/**
* Extracts the device ID portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return device ID string
*/
private String getDeviceId(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(0, slash);
}
}
package org.onlab.onos.cli.net;
import java.util.List;
import java.util.SortedSet;
import org.apache.karaf.shell.console.Completer;
import org.apache.karaf.shell.console.completer.StringsCompleter;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.device.DeviceService;
/**
* ConnectPoint completer.
*/
public class ConnectPointCompleter implements Completer {
@Override
public int complete(String buffer, int cursor, List<String> candidates) {
// Delegate string completer
StringsCompleter delegate = new StringsCompleter();
// Fetch our service and feed it's offerings to the string completer
DeviceService service = AbstractShellCommand.get(DeviceService.class);
// Generate the device ID/port number identifiers
for (Device device : service.getDevices()) {
SortedSet<String> strings = delegate.getStrings();
for (Port port : service.getPorts(device.id())) {
strings.add(device.id().toString() + "/" + port.number());
}
}
// Now let the completer do the work for figuring out what to offer.
return delegate.complete(buffer, cursor, candidates);
}
}
......@@ -13,40 +13,41 @@ import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.IntentState;
/**
* Wipes-out the entire network information base, i.e. devices, links, hosts.
* Wipes-out the entire network information base, i.e. devices, links, hosts, intents.
*/
@Command(scope = "onos", name = "wipe-out",
description = "Wipes-out the entire network information base, i.e. devices, links, hosts")
public class WipeOutCommand extends ClustersListCommand {
private static final String DISCLAIMER = "Yes, I know it will delete everything!";
private static final String DISCLAIMER = "Delete everything please.";
@Argument(index = 0, name = "disclaimer", description = "Device ID",
required = true, multiValued = false)
required = false, multiValued = false)
String disclaimer = null;
@Override
protected void execute() {
if (!disclaimer.equals(DISCLAIMER)) {
print("I'm afraid I can't do that...");
print("You have to acknowledge by: " + DISCLAIMER);
if (disclaimer == null || !disclaimer.equals(DISCLAIMER)) {
print("I'm afraid I can't do that!\nPlease acknowledge with phrase: '%s'",
DISCLAIMER);
return;
}
print("Good bye...");
print("Wiping devices");
DeviceAdminService deviceAdminService = get(DeviceAdminService.class);
DeviceService deviceService = get(DeviceService.class);
for (Device device : deviceService.getDevices()) {
deviceAdminService.removeDevice(device.id());
}
print("Wiping hosts");
HostAdminService hostAdminService = get(HostAdminService.class);
HostService hostService = get(HostService.class);
for (Host host : hostService.getHosts()) {
hostAdminService.removeHost(host.id());
}
print("Wiping intents");
IntentService intentService = get(IntentService.class);
for (Intent intent : intentService.getIntents()) {
if (intentService.getIntentState(intent.id()) == IntentState.INSTALLED) {
......
......@@ -75,6 +75,13 @@
<ref component-id="hostIdCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.AddPointToPointIntentCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
<ref component-id="connectPointCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.ClustersListCommand"/>
......@@ -116,5 +123,6 @@
<bean id="hostIdCompleter" class="org.onlab.onos.cli.net.HostIdCompleter"/>
<bean id="intentIdCompleter" class="org.onlab.onos.cli.net.IntentIdCompleter"/>
<bean id="flowRuleStatusCompleter" class="org.onlab.onos.cli.net.FlowRuleStatusCompleter"/>
<bean id="connectPointCompleter" class="org.onlab.onos.cli.net.ConnectPointCompleter"/>
</blueprint>
......
package org.onlab.onos;
/**
* Service for interacting with the core system of the controller.
*/
public interface CoreService {
/**
* Returns the product version.
*
* @return product version
*/
Version version();
}
package org.onlab.onos;
import java.util.Objects;
import static java.lang.Integer.parseInt;
/**
* Representation of the product version.
*/
public final class Version {
public static final String FORMAT = "%d.%d.%d.%s";
private final int major;
private final int minor;
private final int patch;
private final String build;
private final String format;
// Creates a new version descriptor
private Version(int major, int minor, int patch, String build) {
this.major = major;
this.minor = minor;
this.patch = patch;
this.build = build;
this.format = String.format(FORMAT, major, minor, patch, build);
}
/**
* Creates a new version from the specified constituent numbers.
*
* @param major major version number
* @param minor minod version number
* @param patch version patch number
* @param build build string
* @return version descriptor
*/
public static Version version(int major, int minor, int patch, String build) {
return new Version(major, minor, patch, build);
}
/**
* Creates a new version by parsing the specified string.
*
* @param string version string
* @return version descriptor
*/
public static Version version(String string) {
String[] fields = string.split("[.-]");
return new Version(parseInt(fields[0]), parseInt(fields[1]),
parseInt(fields[2]), fields[3]);
}
/**
* Returns the major version number.
*
* @return major version number
*/
public int major() {
return major;
}
/**
* Returns the minor version number.
*
* @return minor version number
*/
public int minor() {
return minor;
}
/**
* Returns the version patch number.
*
* @return patch number
*/
public int patch() {
return patch;
}
/**
* Returns the version build string.
*
* @return build string
*/
public String build() {
return build;
}
@Override
public String toString() {
return format;
}
@Override
public int hashCode() {
return Objects.hash(format);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof Version) {
final Version other = (Version) obj;
return Objects.equals(this.format, other.format);
}
return false;
}
}
......@@ -34,7 +34,7 @@ public interface MastershipService {
/**
* Abandons mastership of the specified device on the local node thus
* forcing selection of a new master. If the local node is not a master
* for this device, no action will be taken.
* for this device, no master selection will occur.
*
* @param deviceId the identifier of the device
*/
......
......@@ -66,12 +66,25 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
MastershipTerm getTermFor(DeviceId deviceId);
/**
* Revokes a controller instance's mastership over a device and hands
* over mastership to another controller instance.
* Sets a controller instance's mastership role to STANDBY for a device.
* If the role is MASTER, another controller instance will be selected
* as a candidate master.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership for
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
/**
* Allows a controller instance to give up its current role for a device.
* If the role is MASTER, another controller instance will be selected
* as a candidate master.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
}
......
......@@ -3,6 +3,7 @@ package org.onlab.onos.net;
import org.onlab.onos.net.provider.ProviderId;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default edge link model implementation.
......@@ -52,10 +53,14 @@ public class DefaultEdgeLink extends DefaultLink implements EdgeLink {
* for network-to-host direction
* @return new phantom edge link
*/
public static DefaultEdgeLink createEdgeLink(HostLocation edgePort,
public static DefaultEdgeLink createEdgeLink(ConnectPoint edgePort,
boolean isIngress) {
checkNotNull(edgePort, "Edge port cannot be null");
HostLocation location = (edgePort instanceof HostLocation) ?
(HostLocation) edgePort : new HostLocation(edgePort, 0);
return new DefaultEdgeLink(ProviderId.NONE,
new ConnectPoint(HostId.NONE, PortNumber.P0),
edgePort, isIngress);
location, isIngress);
}
}
......
......@@ -22,6 +22,17 @@ public class HostLocation extends ConnectPoint {
}
/**
* Creates a new host location derived from the supplied connection point.
*
* @param connectPoint connection point
* @param time time when detected, in millis since start of epoch
*/
public HostLocation(ConnectPoint connectPoint, long time) {
super(connectPoint.deviceId(), connectPoint.port());
this.time = time;
}
/**
* Returns the time when the location was established, given in
* milliseconds since start of epoch.
*
......
......@@ -96,4 +96,13 @@ public class DefaultDeviceDescription extends AbstractDescription
.toString();
}
// default constructor for serialization
private DefaultDeviceDescription() {
this.uri = null;
this.type = null;
this.manufacturer = null;
this.hwVersion = null;
this.swVersion = null;
this.serialNumber = null;
}
}
......
......@@ -48,4 +48,9 @@ public class DefaultPortDescription extends AbstractDescription
return isEnabled;
}
// default constructor for serialization
private DefaultPortDescription() {
this.number = null;
this.isEnabled = false;
}
}
......
......@@ -42,6 +42,7 @@ public interface DeviceService {
* @param deviceId device identifier
* @return designated mastership role
*/
//XXX do we want this method here when MastershipService already does?
MastershipRole getRole(DeviceId deviceId);
......
package org.onlab.onos.net.intent;
import java.util.concurrent.Future;
import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Abstraction of entity capable of installing intents to the environment.
*/
......@@ -10,7 +14,7 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
void install(T intent);
Future<CompletedBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
......@@ -18,5 +22,5 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
void uninstall(T intent);
Future<CompletedBatchOperation> uninstall(T intent);
}
......
......@@ -12,6 +12,7 @@ public interface ClockProviderService {
/**
* Updates the mastership term for the specified deviceId.
*
* @param deviceId device identifier.
* @param term mastership term.
*/
......
package org.onlab.onos;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.onlab.onos.Version.version;
/**
* Tests of the version descriptor.
*/
public class VersionTest {
@Test
public void fromParts() {
Version v = version(1, 2, 3, "4321");
assertEquals("wrong major", 1, v.major());
assertEquals("wrong minor", 2, v.minor());
assertEquals("wrong patch", 3, v.patch());
assertEquals("wrong build", "4321", v.build());
}
@Test
public void fromString() {
Version v = version("1.2.3.4321");
assertEquals("wrong major", 1, v.major());
assertEquals("wrong minor", 2, v.minor());
assertEquals("wrong patch", 3, v.patch());
assertEquals("wrong build", "4321", v.build());
}
@Test
public void snapshot() {
Version v = version("1.2.3-SNAPSHOT");
assertEquals("wrong major", 1, v.major());
assertEquals("wrong minor", 2, v.minor());
assertEquals("wrong patch", 3, v.patch());
assertEquals("wrong build", "SNAPSHOT", v.build());
}
@Test
public void testEquals() {
new EqualsTester()
.addEqualityGroup(version("1.2.3.4321"), version(1, 2, 3, "4321"))
.addEqualityGroup(version("1.9.3.4321"), version(1, 9, 3, "4321"))
.addEqualityGroup(version("1.2.8.4321"), version(1, 2, 8, "4321"))
.addEqualityGroup(version("1.2.3.x"), version(1, 2, 3, "x"))
.testEquals();
}
}
\ No newline at end of file
......@@ -5,6 +5,7 @@ import org.junit.Test;
import org.onlab.onos.net.provider.ProviderId;
import static org.junit.Assert.assertEquals;
import static org.onlab.onos.net.DefaultEdgeLink.createEdgeLink;
import static org.onlab.onos.net.DefaultLinkTest.cp;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.HostId.hostId;
......@@ -55,4 +56,24 @@ public class DefaultEdgeLinkTest {
assertEquals("incorrect time", 123L, link.hostLocation().time());
}
@Test
public void phantomIngress() {
HostLocation hostLocation = new HostLocation(DID1, P1, 123L);
EdgeLink link = createEdgeLink(hostLocation, true);
assertEquals("incorrect dst", hostLocation, link.dst());
assertEquals("incorrect type", Link.Type.EDGE, link.type());
assertEquals("incorrect connect point", hostLocation, link.hostLocation());
assertEquals("incorrect time", 123L, link.hostLocation().time());
}
@Test
public void phantomEgress() {
ConnectPoint hostLocation = new ConnectPoint(DID1, P1);
EdgeLink link = createEdgeLink(hostLocation, false);
assertEquals("incorrect src", hostLocation, link.src());
assertEquals("incorrect type", Link.Type.EDGE, link.type());
assertEquals("incorrect connect point", hostLocation, link.hostLocation());
assertEquals("incorrect time", 0L, link.hostLocation().time());
}
}
......
package org.onlab.onos.net.intent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.onlab.onos.net.intent.IntentEvent.Type.FAILED;
import static org.onlab.onos.net.intent.IntentEvent.Type.INSTALLED;
import static org.onlab.onos.net.intent.IntentEvent.Type.SUBMITTED;
import static org.onlab.onos.net.intent.IntentEvent.Type.WITHDRAWN;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import static org.junit.Assert.*;
import static org.onlab.onos.net.intent.IntentEvent.Type.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Suite of tests for the intent service contract.
......@@ -290,17 +298,19 @@ public class IntentServiceTest {
}
@Override
public void install(TestInstallableIntent intent) {
public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
return null;
}
@Override
public void uninstall(TestInstallableIntent intent) {
public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
return null;
}
}
......
package org.onlab.onos.cluster.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.CoreService;
import org.onlab.onos.Version;
import org.onlab.util.Tools;
import java.io.File;
import java.util.List;
/**
* Core service implementation.
*/
@Component
@Service
public class CoreManager implements CoreService {
private static final File VERSION_FILE = new File("../VERSION");
private static Version version = Version.version("1.0.0-SNAPSHOT");
// TODO: work in progress
@Activate
public void activate() {
List<String> versionLines = Tools.slurp(VERSION_FILE);
if (versionLines != null && !versionLines.isEmpty()) {
version = Version.version(versionLines.get(0));
}
}
@Override
public Version version() {
return version;
}
}
......@@ -82,7 +82,7 @@ implements MastershipService, MastershipAdminService {
if (role.equals(MastershipRole.MASTER)) {
event = store.setMaster(nodeId, deviceId);
} else {
event = store.unsetMaster(nodeId, deviceId);
event = store.setStandby(nodeId, deviceId);
}
if (event != null) {
......@@ -98,13 +98,10 @@ implements MastershipService, MastershipAdminService {
@Override
public void relinquishMastership(DeviceId deviceId) {
MastershipRole role = getLocalRole(deviceId);
if (!role.equals(MastershipRole.MASTER)) {
return;
}
MastershipEvent event = store.unsetMaster(
MastershipEvent event = null;
event = store.relinquishRole(
clusterService.getLocalNode().id(), deviceId);
if (event != null) {
post(event);
}
......
......@@ -18,6 +18,7 @@ import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
......@@ -142,8 +143,12 @@ public class DeviceManager
// Applies the specified role to the device; ignores NONE
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
if (newRole != MastershipRole.NONE) {
if (newRole.equals(MastershipRole.NONE)) {
Device device = store.getDevice(deviceId);
// FIXME: Device might not be there yet. (eventual consistent)
if (device == null) {
return;
}
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
......@@ -193,16 +198,50 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
log.info("Device {} connected", deviceId);
// check my Role
MastershipRole role = mastershipService.requestRoleFor(deviceId);
if (role != MastershipRole.MASTER) {
// TODO: Do we need to explicitly tell the Provider that
// this instance is no longer the MASTER? probably not
return;
}
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(deviceId);
if (!term.master().equals(clusterService.getLocalNode().id())) {
// lost mastership after requestRole told this instance was MASTER.
return;
}
// tell clock provider if this instance is the master
clockProviderService.setMastershipTerm(deviceId, term);
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
// If there was a change of any kind, trigger role selection
// process.
// If there was a change of any kind, tell the provider
// that this instance is the master.
// Note: event can be null, if mastership was lost between
// roleRequest and store update calls.
if (event != null) {
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.requestRoleFor(deviceId));
// TODO: Check switch reconnected case. Is it assured that
// event will never be null?
// Could there be a situation MastershipService told this
// instance is the new Master, but
// event returned from the store is null?
// TODO: Confirm: Mastership could be lost after requestRole
// and createOrUpdateDevice call.
// In that case STANDBY node can
// claim itself to be master against the Device.
// Will the Node, chosen by the MastershipService, retry
// to get the MASTER role when that happen?
// FIXME: 1st argument should be deviceId, to allow setting
// certain roles even if the store returned null.
provider().roleChanged(event.subject(), role);
post(event);
}
}
......@@ -211,12 +250,23 @@ public class DeviceManager
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
// FIXME: only the MASTER should be marking off-line in normal cases,
// but if I was the last STANDBY connection, etc. and no one else
// was there to mark the device offline, this instance may need to
// temporarily request for Master Role and mark offline.
if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
log.debug("Device {} disconnected, but I am not the master", deviceId);
//let go of any role anyways
mastershipService.relinquishMastership(deviceId);
return;
}
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of being master or a candidate.
mastershipService.relinquishMastership(deviceId);
//we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
mastershipService.relinquishMastership(deviceId);
post(event);
}
}
......@@ -256,6 +306,9 @@ public class DeviceManager
// FIXME: implement response to this notification
log.warn("Failed to assert role [{}] onto Device {}", role,
deviceId);
if (role == MastershipRole.MASTER) {
mastershipService.relinquishMastership(deviceId);
}
}
}
......@@ -267,17 +320,29 @@ public class DeviceManager
}
// Intercepts mastership events
private class InternalMastershipListener
implements MastershipListener {
private class InternalMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
if (event.master().equals(clusterService.getLocalNode().id())) {
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
clockProviderService.setMastershipTerm(event.subject(), term);
applyRole(event.subject(), MastershipRole.MASTER);
final DeviceId did = event.subject();
if (isAvailable(did)) {
final NodeId myNodeId = clusterService.getLocalNode().id();
if (myNodeId.equals(event.master())) {
MastershipTerm term = termService.getMastershipTerm(did);
if (term.master().equals(myNodeId)) {
// only set the new term if I am the master
clockProviderService.setMastershipTerm(did, term);
}
applyRole(did, MastershipRole.MASTER);
} else {
applyRole(did, MastershipRole.STANDBY);
}
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
//device dead to node, give up
mastershipService.relinquishMastership(did);
applyRole(did, MastershipRole.STANDBY);
}
}
}
......
......@@ -13,12 +13,14 @@ import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -28,6 +30,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
......@@ -44,7 +47,9 @@ import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
......@@ -67,7 +72,8 @@ public class IntentManager
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
private ExecutorService executor;
private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
......@@ -86,6 +92,8 @@ public class IntentManager
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newSingleThreadExecutor(namedThreads("onos-intents"));
monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
......@@ -94,6 +102,8 @@ public class IntentManager
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
executor.shutdown();
monitorExecutor.shutdown();
log.info("Stopped");
}
......@@ -240,14 +250,23 @@ public class IntentManager
}
}
// FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
// TODO: implement compilation traversing tree structure
/**
* Compiles an intent recursively.
*
* @param intent intent
* @return result of compilation
*/
private List<InstallableIntent> compileIntent(Intent intent) {
if (intent instanceof InstallableIntent) {
return ImmutableList.of((InstallableIntent) intent);
}
List<InstallableIntent> installable = new ArrayList<>();
// TODO do we need to registerSubclassCompiler?
for (Intent compiled : getCompiler(intent).compile(intent)) {
InstallableIntent installableIntent = (InstallableIntent) compiled;
installable.add(installableIntent);
installable.addAll(compileIntent(compiled));
}
return installable;
}
......@@ -261,6 +280,7 @@ public class IntentManager
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
......@@ -268,17 +288,20 @@ public class IntentManager
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
getInstaller(installable).install(installable);
Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
installFutures.add(future);
}
}
eventDispatcher.post(store.setState(intent, INSTALLED));
// FIXME we have to wait for the installable intents
//eventDispatcher.post(store.setState(intent, INSTALLED));
monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
uninstallIntent(intent);
uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
executeRecompilingPhase(intent);
// FIXME
//executeRecompilingPhase(intent);
}
}
......@@ -327,12 +350,14 @@ public class IntentManager
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
uninstallIntent(intent);
uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
store.removeInstalledIntents(intent.id());
eventDispatcher.post(store.setState(intent, WITHDRAWN));
// FIXME need to clean up
//store.removeInstalledIntents(intent.id());
// FIXME
//eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
......@@ -340,14 +365,17 @@ public class IntentManager
*
* @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent) {
private void uninstallIntent(Intent intent, IntentState nextState) {
List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
getInstaller(installable).uninstall(installable);
Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
uninstallFutures.add(future);
}
}
monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
......@@ -422,9 +450,10 @@ public class IntentManager
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
uninstallIntent(intent);
uninstallIntent(intent, RECOMPILING);
executeRecompilingPhase(intent);
//FIXME
//executeRecompilingPhase(intent);
}
if (compileAllFailed) {
......@@ -460,4 +489,44 @@ public class IntentManager
}
}
private class IntentInstallMonitor implements Runnable {
private final Intent intent;
private final List<Future<CompletedBatchOperation>> futures;
private final IntentState nextState;
public IntentInstallMonitor(Intent intent,
List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
this.intent = intent;
this.futures = futures;
this.nextState = nextState;
}
private void updateIntent(Intent intent) {
if (nextState == RECOMPILING) {
executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
eventDispatcher.post(store.setState(intent, nextState));
} else {
log.warn("Invalid next intent state {} for intent {}", nextState, intent);
}
}
@Override
public void run() {
for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
Future<CompletedBatchOperation> future = i.next();
if (future.isDone()) {
// TODO: we may want to get the future here
i.remove();
}
}
if (futures.isEmpty()) {
updateIntent(intent);
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
}
}
}
}
......
......@@ -5,7 +5,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -15,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
......@@ -57,8 +58,26 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
intentManager.unregisterInstaller(PathIntent.class);
}
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
return future;
// try {
// //FIXME don't do this here
// future.get();
// } catch (InterruptedException | ExecutionException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
@Override
public void install(PathIntent intent) {
public Future<CompletedBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
......@@ -74,20 +93,14 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
//flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return applyBatch(rules);
}
@Override
public void uninstall(PathIntent intent) {
public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
......@@ -103,15 +116,131 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
//flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return applyBatch(rules);
}
// TODO refactor below this line... ----------------------------
/**
* Generates the series of MatchActionOperations from the
* {@link FlowBatchOperation}.
* <p>
* FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
* <p>
* FIXME: MatchActionOperations should have dependency field to the other
* match action operations, and this method should use this.
*
* @param op the {@link FlowBatchOperation} object
* @return the list of {@link MatchActionOperations} objects
*/
/*
private List<MatchActionOperations>
generateMatchActionOperationsList(FlowBatchOperation op) {
// MatchAction operations at head (ingress) switches.
MatchActionOperations headOps = matchActionService.createOperationsList();
// MatchAction operations at rest of the switches.
MatchActionOperations tailOps = matchActionService.createOperationsList();
MatchActionOperations removeOps = matchActionService.createOperationsList();
for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
if (e.getOperator() == FlowBatchOperation.Operator.ADD) {
generateInstallMatchActionOperations(e, tailOps, headOps);
} else if (e.getOperator() == FlowBatchOperation.Operator.REMOVE) {
generateRemoveMatchActionOperations(e, removeOps);
} else {
throw new UnsupportedOperationException(
"FlowManager supports ADD and REMOVE operations only.");
}
}
return Arrays.asList(tailOps, headOps, removeOps);
}
*/
/**
* Generates MatchActionOperations for an INSTALL FlowBatchOperation.
* <p/>
* FIXME: Currently only supports flows that generate exactly two match
* action operation sets.
*
* @param e Flow BatchOperationEntry
* @param tailOps MatchActionOperation set that the tail
* MatchActionOperations will be placed in
* @param headOps MatchActionOperation set that the head
* MatchActionOperations will be placed in
*/
/*
private void generateInstallMatchActionOperations(
BatchOperationEntry<Operator, ?> e,
MatchActionOperations tailOps,
MatchActionOperations headOps) {
if (!(e.getTarget() instanceof Flow)) {
throw new IllegalStateException(
"The target is not Flow object: " + e.getTarget());
}
// Compile flows to match-actions
Flow flow = (Flow) e.getTarget();
List<MatchActionOperations> maOps = flow.compile(
e.getOperator(), matchActionService);
verifyNotNull(maOps, "Could not compile the flow: " + flow);
verify(maOps.size() == 2,
"The flow generates unspported match-action operations.");
// Map FlowId to MatchActionIds
for (MatchActionOperations maOp : maOps) {
for (MatchActionOperationEntry entry : maOp.getOperations()) {
flowMatchActionsMap.put(
KryoFactory.serialize(flow.getId()),
KryoFactory.serialize(entry.getTarget()));
}
}
// Merge match-action operations
for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
tailOps.addOperation(mae);
}
for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
headOps.addOperation(mae);
}
}
*/
/**
* Generates MatchActionOperations for a REMOVE FlowBatchOperation.
*
* @param e Flow BatchOperationEntry
* @param removeOps MatchActionOperation set that the remove
* MatchActionOperations will be placed in
*/
/*
private void generateRemoveMatchActionOperations(
BatchOperationEntry<Operator, ?> e,
MatchActionOperations removeOps) {
if (!(e.getTarget() instanceof FlowId)) {
throw new IllegalStateException(
"The target is not a FlowId object: " + e.getTarget());
}
// Compile flows to match-actions
FlowId flowId = (FlowId) e.getTarget();
for (byte[] matchActionIdBytes :
flowMatchActionsMap.remove(KryoFactory.serialize(flowId))) {
MatchActionId matchActionId = KryoFactory.deserialize(matchActionIdBytes);
removeOps.addOperation(new MatchActionOperationEntry(
MatchActionOperations.Operator.REMOVE, matchActionId));
}
}
*/
}
......
package org.onlab.onos.net.intent.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultEdgeLink;
import org.onlab.onos.net.DefaultPath;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.IdGenerator;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.PathIntent;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.PathService;
/**
* A intent compiler for {@link org.onlab.onos.net.intent.HostToHostIntent}.
*/
@Component(immediate = true)
public class PointToPointIntentCompiler
implements IntentCompiler<PointToPointIntent> {
private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PathService pathService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
private IdGenerator<IntentId> intentIdGenerator;
@Activate
public void activate() {
IdBlockAllocator idBlockAllocator = new DummyIdBlockAllocator();
intentIdGenerator = new IdBlockAllocatorBasedIntentIdGenerator(idBlockAllocator);
intentManager.registerCompiler(PointToPointIntent.class, this);
}
@Deactivate
public void deactivate() {
intentManager.unregisterCompiler(PointToPointIntent.class);
}
@Override
public List<Intent> compile(PointToPointIntent intent) {
Path path = getPath(intent.ingressPoint(), intent.egressPoint());
List<Link> links = new ArrayList<>();
links.add(DefaultEdgeLink.createEdgeLink(intent.ingressPoint(), true));
links.addAll(path.links());
links.add(DefaultEdgeLink.createEdgeLink(intent.egressPoint(), false));
return Arrays.asList(createPathIntent(new DefaultPath(PID, links, path.cost() + 2,
path.annotations()),
intent));
}
/**
* Creates a path intent from the specified path and original
* connectivity intent.
*
* @param path path to create an intent for
* @param intent original intent
*/
private Intent createPathIntent(Path path,
PointToPointIntent intent) {
return new PathIntent(intentIdGenerator.getNewId(),
intent.selector(), intent.treatment(),
path.src(), path.dst(), path);
}
/**
* Computes a path between two ConnectPoints.
*
* @param one start of the path
* @param two end of the path
* @return Path between the two
* @throws PathNotFoundException if a path cannot be found
*/
private Path getPath(ConnectPoint one, ConnectPoint two) {
Set<Path> paths = pathService.getPaths(one.deviceId(), two.deviceId());
if (paths.isEmpty()) {
throw new PathNotFoundException("No path from " + one + " to " + two);
}
// TODO: let's be more intelligent about this eventually
return paths.iterator().next();
}
}
package org.onlab.onos.net.device.impl;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipServiceAdapter;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.Device;
......@@ -27,7 +35,9 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.store.trivial.impl.SimpleDeviceStore;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -56,6 +66,8 @@ public class DeviceManagerTest {
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private static final NodeId NID_LOCAL = new NodeId("local");
private static final IpPrefix LOCALHOST = IpPrefix.valueOf("127.0.0.1");
private DeviceManager mgr;
......@@ -75,6 +87,8 @@ public class DeviceManagerTest {
mgr.store = new SimpleDeviceStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.mastershipService = new TestMastershipService();
mgr.clusterService = new TestClusterService();
mgr.clockProviderService = new TestClockProviderService();
mgr.activate();
service.addListener(listener);
......@@ -258,7 +272,8 @@ public class DeviceManagerTest {
}
}
private static class TestMastershipService extends MastershipServiceAdapter {
private static class TestMastershipService
extends MastershipServiceAdapter {
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
......@@ -273,6 +288,59 @@ public class DeviceManagerTest {
public MastershipRole requestRoleFor(DeviceId deviceId) {
return MastershipRole.MASTER;
}
@Override
public MastershipTermService requestTermService() {
return new MastershipTermService() {
@Override
public MastershipTerm getMastershipTerm(DeviceId deviceId) {
// FIXME: just returning something not null
return MastershipTerm.of(NID_LOCAL, 1);
}
};
}
}
// code clone
private final class TestClusterService implements ClusterService {
ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
@Override
public ControllerNode getLocalNode() {
return local;
}
@Override
public Set<ControllerNode> getNodes() {
return null;
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return null;
}
@Override
public State getState(NodeId nodeId) {
return null;
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
}
}
private final class TestClockProviderService implements
ClockProviderService {
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
// TODO Auto-generated method stub
}
}
}
......
......@@ -2,8 +2,7 @@ package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
// TODO: ClusterMessage should be aware about how to serialize the payload
// TODO: Should payload type be made generic?
// TODO: Should payload type be ByteBuffer?
/**
* Base message for cluster-wide communications.
*/
......@@ -12,7 +11,6 @@ public class ClusterMessage {
private final NodeId sender;
private final MessageSubject subject;
private final byte[] payload;
// TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
......
......@@ -45,4 +45,9 @@ public class MessageSubject {
MessageSubject that = (MessageSubject) obj;
return Objects.equals(this.value, that.value);
}
// for serializer
protected MessageSubject() {
this.value = "";
}
}
......
......@@ -3,12 +3,9 @@ package org.onlab.onos.store.cluster.messaging.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -26,8 +23,10 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
......@@ -50,8 +49,6 @@ public class ClusterCommunicationManager
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
// FIXME: `members` should go away and should be using ClusterService
private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
......@@ -59,11 +56,14 @@ public class ClusterCommunicationManager
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(ClusterMessage.class)
.register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
.register(byte[].class)
.register(MessageSubject.class, new MessageSubjectSerializer())
.build()
.populate(1);
}
......@@ -73,7 +73,15 @@ public class ClusterCommunicationManager
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
messagingService = new NettyMessagingService(localNode.tcpPort());
NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
netty.activate();
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("NettyMessagingService#activate", e);
}
messagingService = netty;
log.info("Started");
}
......@@ -86,7 +94,7 @@ public class ClusterCommunicationManager
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
for (ControllerNode node : members.values()) {
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
......@@ -107,11 +115,12 @@ public class ClusterCommunicationManager
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
ControllerNode node = members.get(toNodeId);
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
messagingService.sendAsync(nodeEp,
message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
......@@ -137,7 +146,7 @@ public class ClusterCommunicationManager
@Override
public void addNode(ControllerNode node) {
members.put(node.id(), node);
//members.put(node.id(), node);
}
@Override
......@@ -146,7 +155,7 @@ public class ClusterCommunicationManager
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
//members.remove(node.id());
}
// Sends a heart beat to all peers.
......@@ -181,7 +190,7 @@ public class ClusterCommunicationManager
}
}
private static class InternalClusterMessageHandler implements MessageHandler {
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
......@@ -191,8 +200,13 @@ public class ClusterCommunicationManager
@Override
public void handle(Message message) {
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(clusterMessage);
try {
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(clusterMessage);
} catch (Exception e) {
log.error("Exception caught during ClusterMessageHandler", e);
throw e;
}
}
}
}
......
......@@ -82,7 +82,7 @@ public final class Timestamped<T> {
// Default constructor for serialization
@Deprecated
protected Timestamped() {
private Timestamped() {
this.value = null;
this.timestamp = null;
}
......
......@@ -42,6 +42,7 @@ import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
......@@ -113,14 +114,18 @@ public class GossipDeviceStore
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class)
.register(InternalPortEvent.class)
.register(InternalPortStatusEvent.class)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(Timestamp.class)
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class)
.register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.build()
.populate(1);
}
......@@ -132,6 +137,10 @@ public class GossipDeviceStore
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
......@@ -175,7 +184,7 @@ public class GossipDeviceStore
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event or providerId: "
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
......@@ -278,7 +287,18 @@ public class GossipDeviceStore
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
return markOfflineInternal(deviceId, timestamp);
DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {}",
deviceId);
try {
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
deviceId);
}
}
return event;
}
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
......@@ -566,7 +586,16 @@ public class GossipDeviceStore
public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
// TODO: broadcast removal event
if (event != null) {
log.info("Notifying peers of a device removed topology event for deviceId: {}",
deviceId);
try {
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
deviceId);
}
}
return event;
}
......@@ -809,6 +838,22 @@ public class GossipDeviceStore
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
......@@ -828,15 +873,46 @@ public class GossipDeviceStore
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
}
}
private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device offline event from peer: {}", message.sender());
InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
markOfflineInternal(deviceId, timestamp);
}
}
private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device removed event from peer: {}", message.sender());
InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
removeDeviceInternal(deviceId, timestamp);
}
}
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
......
......@@ -3,13 +3,15 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by GossipDeviceStore.
* MessageSubjects used by GossipDeviceStore peer-peer communication.
*/
public final class GossipDeviceStoreMessageSubjects {
private GossipDeviceStoreMessageSubjects() {}
public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
}
......
......@@ -35,4 +35,11 @@ public class InternalDeviceEvent {
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
// for serializer
protected InternalDeviceEvent() {
this.providerId = null;
this.deviceId = null;
this.deviceDescription = null;
}
}
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalDeviceEvent}.
*/
public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
/**
* Creates a serializer for {@link InternalDeviceEvent}.
*/
public InternalDeviceEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.deviceDescription());
}
@Override
public InternalDeviceEvent read(Kryo kryo, Input input,
Class<InternalDeviceEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<DeviceDescription> deviceDescription
= (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
/**
* Information published by GossipDeviceStore to notify peers of a device
* going offline.
*/
public class InternalDeviceOfflineEvent {
private final DeviceId deviceId;
private final Timestamp timestamp;
/**
* Creates a InternalDeviceOfflineEvent.
* @param deviceId identifier of device going offline.
* @param timestamp timestamp of when the device went offline.
*/
public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
this.deviceId = deviceId;
this.timestamp = timestamp;
}
public DeviceId deviceId() {
return deviceId;
}
public Timestamp timestamp() {
return timestamp;
}
// for serializer
@SuppressWarnings("unused")
private InternalDeviceOfflineEvent() {
deviceId = null;
timestamp = null;
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalDeviceOfflineEvent}.
*/
public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
/**
* Creates a serializer for {@link InternalDeviceOfflineEvent}.
*/
public InternalDeviceOfflineEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.timestamp());
}
@Override
public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
Class<InternalDeviceOfflineEvent> type) {
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
return new InternalDeviceOfflineEvent(deviceId, timestamp);
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
/**
* Information published by GossipDeviceStore to notify peers of a device
* being administratively removed.
*/
public class InternalDeviceRemovedEvent {
private final DeviceId deviceId;
private final Timestamp timestamp;
/**
* Creates a InternalDeviceRemovedEvent.
* @param deviceId identifier of the removed device.
* @param timestamp timestamp of when the device was administratively removed.
*/
public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
this.deviceId = deviceId;
this.timestamp = timestamp;
}
public DeviceId deviceId() {
return deviceId;
}
public Timestamp timestamp() {
return timestamp;
}
// for serializer
@SuppressWarnings("unused")
private InternalDeviceRemovedEvent() {
deviceId = null;
timestamp = null;
}
}
......@@ -37,4 +37,11 @@ public class InternalPortEvent {
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
// for serializer
protected InternalPortEvent() {
this.providerId = null;
this.deviceId = null;
this.portDescriptions = null;
}
}
......
package org.onlab.onos.store.device.impl;
import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalPortEvent}.
*/
public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
/**
* Creates a serializer for {@link InternalPortEvent}.
*/
public InternalPortEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalPortEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.portDescriptions());
}
@Override
public InternalPortEvent read(Kryo kryo, Input input,
Class<InternalPortEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<List<PortDescription>> portDescriptions
= (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
return new InternalPortEvent(providerId, deviceId, portDescriptions);
}
}
......@@ -35,4 +35,11 @@ public class InternalPortStatusEvent {
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
// for serializer
protected InternalPortStatusEvent() {
this.providerId = null;
this.deviceId = null;
this.portDescription = null;
}
}
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link InternalPortStatusEvent}.
*/
public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
/**
* Creates a serializer for {@link InternalPortStatusEvent}.
*/
public InternalPortStatusEventSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
kryo.writeClassAndObject(output, event.providerId());
kryo.writeClassAndObject(output, event.deviceId());
kryo.writeClassAndObject(output, event.portDescription());
}
@Override
public InternalPortStatusEvent read(Kryo kryo, Input input,
Class<InternalPortStatusEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
return new InternalPortStatusEvent(providerId, deviceId, portDescription);
}
}
......@@ -3,7 +3,6 @@ package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
......@@ -11,6 +10,9 @@ import com.esotericsoftware.kryo.io.Output;
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
/**
* Creates a serializer for {@link ClusterMessage}.
*/
public ClusterMessageSerializer() {
// does not accept null
super(false);
......
......@@ -14,7 +14,7 @@ import com.esotericsoftware.kryo.io.Output;
public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
/**
* Default constructor.
* Creates a serializer for {@link MastershipBasedTimestamp}.
*/
public MastershipBasedTimestampSerializer() {
// non-null, immutable
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
/**
* Creates a serializer for {@link MessageSubject}.
*/
public MessageSubjectSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, MessageSubject object) {
output.writeString(object.value());
}
@Override
public MessageSubject read(Kryo kryo, Input input,
Class<MessageSubject> type) {
return new MessageSubject(input.readString());
}
}
......@@ -16,7 +16,7 @@ import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.Serializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -34,7 +34,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
protected Serializer serializer;
protected StoreSerializer serializer;
protected HazelcastInstance theInstance;
......
......@@ -2,7 +2,7 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.Serializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
......@@ -18,7 +18,7 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final Serializer serializer;
private final StoreSerializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
......@@ -27,7 +27,7 @@ public final class OptionalCacheLoader<K, V> extends
* @param serializer to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(Serializer serializer, IMap<byte[], byte[]> rawMap) {
public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
......
......@@ -7,9 +7,9 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
/**
* Serializer implementation using Kryo.
* StoreSerializer implementation using Kryo.
*/
public class KryoSerializer implements Serializer {
public class KryoSerializer implements StoreSerializer {
private final Logger log = LoggerFactory.getLogger(getClass());
protected KryoPool serializerPool;
......
......@@ -6,7 +6,7 @@ import java.nio.ByteBuffer;
/**
* Service to serialize Objects into byte array.
*/
public interface Serializer {
public interface StoreSerializer {
/**
* Serializes the specified object into bytes.
......
......@@ -174,7 +174,7 @@ public class SimpleMastershipStore
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
......@@ -214,4 +214,9 @@ public class SimpleMastershipStore
return backup;
}
@Override
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
return setStandby(nodeId, deviceId);
}
}
......
......@@ -129,22 +129,22 @@ public class SimpleMastershipStoreTest {
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.unsetMaster(N1, DID1);
sms.setStandby(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
sms.unsetMaster(N1, DID1);
sms.setStandby(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
assertNull("wrong event", sms.unsetMaster(N1, DID1));
assertNull("wrong event", sms.setStandby(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
}
//helper to populate master/backup structures
......
......@@ -153,6 +153,7 @@
description="ONOS sample playground application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-config" version="1.0.0"
......
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
~
~ This program and the accompanying materials are made available under the
~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
~ and is available at http://www.eclipse.org/legal/epl-v10.html
-->
<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
name="net.onrc.onos-1.0.0">
<repository>mvn:net.onrc.onos/onos-features/1.0.0-SNAPSHOT/xml/features</repository>
<feature name="thirdparty" version="1.0.0"
description="ONOS 3rd party dependencies">
<bundle>mvn:com.google.code.findbugs/annotations/2.0.2</bundle>
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:com.google.guava/guava/17.0</bundle>
<bundle>mvn:com.google.guava/guava/15.0</bundle>
</feature>
<feature name="base" version="1.0.0"
description="ONOS Base">
<feature>scr</feature>
<feature>thirdparty</feature>
<bundle>mvn:net.onrc.onos.sb/onos-sb/0.0.1</bundle>
<bundle>mvn:org.projectfloodlight/openflowj/0.3.6-SNAPSHOT</bundle>
</feature>
</features>
......@@ -17,14 +17,11 @@
<description>ONOS OpenFlow controller subsystem API</description>
<repositories>
<!-- FIXME: for Loxigen. Decide how to use Loxigen before release. -->
<!-- FIXME: for Loxigen + optical experimenter. Decide how to use Loxigen before release. -->
<repository>
<id>sonatype-oss-snapshot</id>
<name>Sonatype OSS snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<id>onlab-temp</id>
<name>ON.lab temporary repository</name>
<url>http://mavenrepo.onlab.us:8081/nexus/content/repositories/releases</url>
</repository>
</repositories>
......@@ -32,7 +29,8 @@
<dependency>
<groupId>org.projectfloodlight</groupId>
<artifactId>openflowj</artifactId>
<version>0.3.8-SNAPSHOT</version>
<!-- FIXME once experimenter gets merged to upstream -->
<version>0.3.8-optical_experimenter</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
......
......@@ -981,11 +981,13 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
// switch was a duplicate-dpid, calling the method below would clear
// all state for the original switch (with the same dpid),
// which we obviously don't want.
log.info("{}:removal called");
sw.removeConnectedSwitch();
} else {
// A duplicate was disconnected on this ChannelHandler,
// this is the same switch reconnecting, but the original state was
// not cleaned up - XXX check liveness of original ChannelHandler
log.info("{}:duplicate found");
duplicateDpidFound = Boolean.FALSE;
}
} else {
......
......@@ -307,9 +307,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
connectedSwitches.remove(dpid);
OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
if (sw == null) {
log.warn("sw was null for {}", dpid);
sw = activeEqualSwitches.remove(dpid);
}
for (OpenFlowSwitchListener l : ofSwitchListener) {
log.warn("removal for {}", dpid);
l.switchRemoved(dpid);
}
}
......
......@@ -9,10 +9,14 @@ export KARAF_ZIP=${KARAF_ZIP:-~/Downloads/apache-karaf-3.0.1.zip}
export KARAF_TAR=${KARAF_TAR:-~/Downloads/apache-karaf-3.0.1.tar.gz}
export KARAF_DIST=$(basename $KARAF_ZIP .zip)
# Fallback build number us derived from from the user name & time
export BUILD_NUMBER=${BUILD_NUMBER:-$(id -un)~$(date +'%Y/%m/%d@%H:%M')}
# ONOS Version and onos.tar.gz staging environment
export ONOS_VERSION=${ONOS_VERSION:-1.0.0-SNAPSHOT}
export ONOS_POM_VERSION="1.0.0-SNAPSHOT"
export ONOS_VERSION=${ONOS_VERSION:-1.0.0.$BUILD_NUMBER}
export ONOS_BITS=onos-${ONOS_VERSION%~*}
export ONOS_STAGE_ROOT=${ONOS_STAGE_ROOT:-/tmp}
export ONOS_BITS=onos-$ONOS_VERSION
export ONOS_STAGE=$ONOS_STAGE_ROOT/$ONOS_BITS
export ONOS_TAR=$ONOS_STAGE.tar.gz
......
......@@ -49,7 +49,7 @@ export ONOS_FEATURES="${ONOS_FEATURES:-webconsole,onos-api,onos-core,onos-cli,on
# ONOS Patching ----------------------------------------------------------------
# Patch the Apache Karaf distribution file to add ONOS features repository
perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-features/$ONOS_VERSION/xml/features|" \
perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-features/$ONOS_POM_VERSION/xml/features|" \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution file to load ONOS features
......@@ -57,10 +57,14 @@ perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution with ONOS branding bundle
cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_VERSION/onos-branding-*.jar \
cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_POM_VERSION/onos-branding-*.jar \
$ONOS_STAGE/$KARAF_DIST/lib
# Patch in the ONOS version file
echo $ONOS_VERSION > $ONOS_STAGE/VERSION
# Now package up the ONOS tar file
cd $ONOS_STAGE_ROOT
COPYFILE_DISABLE=1 tar zcf $ONOS_TAR $ONOS_BITS
ls -l $ONOS_TAR >&2
rm -r $ONOS_STAGE
......
......@@ -33,6 +33,7 @@ alias obs='onos-build-selective'
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
alias go='ob && ot && onos -w'
alias pub='onos-push-update-bundle'
# Short-hand for tailing the ONOS (karaf) log
......
......@@ -4,6 +4,7 @@
#-------------------------------------------------------------------------------
export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/}
export JAVA_OPTS="-Xms256M -Xmx2048M"
cd /opt/onos
/opt/onos/apache-karaf-3.0.1/bin/karaf "$@"
......
......@@ -15,7 +15,7 @@ bundle=$(echo $(basename $jar .jar) | sed 's/-[0-9].*//g')
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for node in $nodes; do
scp -q $jar $ONOS_USER@$node:.m2/repository/$jar
scp -q $jar $ONOS_USER@$node:$ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar
ssh $ONOS_USER@$node "ls -l $ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar"
ssh $ONOS_USER@$node "$ONOS_INSTALL_DIR/bin/onos \"bundle:update -f $bundle\"" 2>/dev/null
done
......
#!/usr/bin/python
# Launches mininet with Tower topology configuration.
import sys, tower
net = tower.Tower(cip=sys.argv[1])
net.run()
package org.onlab.metrics;
import java.io.File;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -10,9 +8,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
......@@ -56,6 +56,7 @@ import com.codahale.metrics.Timer;
@Component(immediate = true)
public final class MetricsManager implements MetricsService {
private final Logger log = LoggerFactory.getLogger(getClass());
/**
* Registry to hold the Components defined in the system.
*/
......@@ -69,15 +70,20 @@ public final class MetricsManager implements MetricsService {
/**
* Default Reporter for this metrics manager.
*/
private final CsvReporter reporter;
//private final Slf4jReporter reporter;
private final ConsoleReporter reporter;
public MetricsManager() {
this.metricsRegistry = new MetricRegistry();
this.reporter = CsvReporter.forRegistry(metricsRegistry)
.formatFor(Locale.US)
// this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
// .outputTo(log)
// .convertRatesTo(TimeUnit.SECONDS)
// .convertDurationsTo(TimeUnit.MICROSECONDS)
// .build();
this.reporter = ConsoleReporter.forRegistry(this.metricsRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MICROSECONDS)
.build(new File("/var/onos/log/metrics/"));
.build();
}
@Activate
......
......@@ -4,6 +4,12 @@ import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
public abstract class Tools {
......@@ -66,4 +72,24 @@ public abstract class Tools {
}
}
/**
* Slurps the contents of a file into a list of strings, one per line.
*
* @param path file path
* @return file contents
*/
public static List<String> slurp(File path) {
try (BufferedReader br = new BufferedReader(new FileReader(path))) {
List<String> lines = new ArrayList<>();
String line;
while ((line = br.readLine()) != null) {
lines.add(line);
}
return lines;
} catch (IOException e) {
return null;
}
}
}
......
......@@ -8,11 +8,16 @@ import io.netty.handler.codec.ReplayingDecoder;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
......@@ -57,4 +62,10 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(false, "Must not be here");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
package org.onlab.netty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
......@@ -11,6 +14,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
private final Logger log = LoggerFactory.getLogger(getClass());
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
......@@ -31,11 +36,6 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write preamble
out.writeBytes(PREAMBLE);
try {
SERIALIZER.encode(message);
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = SERIALIZER.encode(message);
// write payload length
......@@ -47,4 +47,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write payload.
out.writeBytes(payload);
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
......@@ -248,6 +248,7 @@ public class NettyMessagingService implements MessagingService {
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
......
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsManager;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleClient {
private SimpleClient() {
}
public static void main(String... args) throws Exception {
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
messaging.activate();
metrics.activate();
MetricsFeature feature = new MetricsFeature("timers");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
final int warmup = 100;
for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
"Hello World".getBytes());
System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
}
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
}
}
}
package org.onlab.netty;
//FIXME: Should be move out to test or app
public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
}