ADARA Networks
Committed by Gerrit Code Review

RabbitMQ Integration - Updates changeset 11110 - Review comments incorporated

Change-Id: I0bfd7838b87d55769165b21dc735e1ba4468b611
......@@ -76,6 +76,7 @@
<module>scalablegateway</module>
<module>bmv2-demo</module>
<module>yms</module>
<module>rabbitmq</module>
</modules>
......
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//incubator/api:onos-incubator-api',
'//lib:guava',
'//lib:gson',
'//lib:amqp-client',
]
BUNDLES = [
'//apps/rabbitmq:onos-apps-rabbitmq',
]
osgi_jar (
deps = COMPILE_DEPS,
)
onos_app (
title = 'Rabbit MQ APP',
category = 'Traffic Steering',
url = 'http://onosproject.org',
description = 'Rabbit MQ application.',
required_apps = [ 'org.onosproject.proxyarp' ],
included_bundles = BUNDLES,
)
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016-present Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<app name="org.onosproject.rabbitmq" origin="ON.Lab" version="${project.version}"
category="Traffic Steering" url="http://onosproject.org" title="Rabbit MQ App"
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
features="${project.artifactId}">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
<!-- <artifact>mvn:${project.groupId}/onos-app-routing-api/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-app-routing/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-app-proxyarp/${project.version}</artifact> -->
</app>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Copyright 2016-present Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
<feature name="${project.artifactId}" version="${project.version}"
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/onos-app-rabbitmq/${project.version}</bundle>
<bundle>mvn:com.rabbitmq/amqp-client/3.6.1</bundle>
<bundle>mvn:com.google.code.gson/gson/2.6.2</bundle>
</feature>
</features>
<?xml version="1.0" encoding="UTF-8"?>
<!-- ~ Copyright 2016-present Open Networking Laboratory ~ ~ Licensed under
the Apache License, Version 2.0 (the "License"); ~ you may not use this file
except in compliance with the License. ~ You may obtain a copy of the License
at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by
applicable law or agreed to in writing, software ~ distributed under the
License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. ~ See the License for the specific
language governing permissions and ~ limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-apps</artifactId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-app-rabbitmq</artifactId>
<packaging>bundle</packaging>
<description>Rabbit MQ application</description>
<properties>
<rabbitmq.version>3.6.1</rabbitmq.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-routing</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-routing-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-proxyarp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-net</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.api;
/**
* Declares the constants used in this module.
*/
public final class MQConstants {
// No instantiation
private MQConstants() {
}
/**
* MQ correlation id.
*/
public static final String CORRELATION_ID = "correlation_id";
/**
* MQ exchange name.
*/
public static final String EXCHANGE_NAME_PROPERTY = "EXCHANGE_NAME_PROPERTY";
/**
* MQ routing key.
*/
public static final String ROUTING_KEY_PROPERTY = "ROUTING_KEY_PROPERTY";
/**
* MQ queue name.
*/
public static final String QUEUE_NAME_PROPERTY = "QUEUE_NAME_PROPERTY";
/**
* Switch id connected to onos controller published via json.
*/
public static final String SWITCH_ID = "switch_id";
/**
* Switch's infrastructure device name published via json.
*/
public static final String INFRA_DEVICE_NAME = "infra_device_name";
/**
* Captured event type published via json.
*/
public static final String EVENT_TYPE = "event_type";
/**
* Signifies device event in json.
*/
public static final String DEVICE_EVENT = "DEVICE_EVENT";
/**
* Port connect via switch.
*/
public static final String PORT_NUMBER = "port_number";
/**
* Describes port status enabled or disabled.
*/
public static final String PORT_ENABLED = "port_enabled";
/**
* Specifies port speed.
*/
public static final String PORT_SPEED = "port_speed";
/**
* Specifies sub event types like device added, device updated etc.
*/
public static final String SUB_EVENT_TYPE = "sub_event_type";
/**
* Specifies hardware version of the switch.
*/
public static final String HW_VERSION = "hw_version";
/**
* Specifies switch's manufacturer.
*/
public static final String MFR = "mfr";
/**
* Specifies the serial number of the connected switch.
*/
public static final String SERIAL = "serial";
/**
* Specifies software version of the switch.
*/
public static final String SW_VERSION = "sw_version";
/**
* Specifies chassis id of the switch.
*/
public static final String CHASIS_ID = "chassis_id";
/**
* Specifies event occurence time.
*/
public static final String OCC_TIME = "occurrence_time";
/**
* Specifies switch's available time.
*/
public static final String AVAILABLE = "available_time";
/**
* Specifies packet_in port details.
*/
public static final String IN_PORT = "in_port";
/**
* Specifies port is logical or not.
*/
public static final String LOGICAL = "logical";
/**
* Specifies packet recieved time.
*/
public static final String RECIEVED = "received";
/**
* Specifies message type.
*/
public static final String MSG_TYPE = "msg_type";
/**
* Specifies packet type.
*/
public static final String PKT_TYPE = "PACKET_IN";
/**
* Specifies sub message type under msg_type.
*/
public static final String SUB_MSG_TYPE = "sub_msg_type";
/**
* Specifies Ethernet type of the packet.
*/
public static final String ETH_TYPE = "eth_type";
/**
* Source MAC address of the packet.
*/
public static final String SRC_MAC_ADDR = "src_mac_address";
/**
* Destination MAC address of the packet.
*/
public static final String DEST_MAC_ADDR = "dest_mac_address";
/**
* Specifies VLAN ID of the packet.
*/
public static final String VLAN_ID = "vlan_id";
/**
* Specifies if the packet is a Broadcast or not.
*/
public static final String B_CAST = "is_bcast";
/**
* Specifies if the packet is a Multicast or not.
*/
public static final String M_CAST = "is_mcast";
/**
* Specifies if the packet is padded or not.
*/
public static final String PAD = "pad";
/**
* Specifies priority of the packet.
*/
public static final String PRIORITY_CODE = "priority_code";
/**
* Specifies length of the payload.
*/
public static final String DATA_LEN = "data_length";
/**
* Packet payload(raw) in unicode format.
*/
public static final String PAYLOAD = "payload";
/**
* Network topology type TopologyEvent.Type.
*/
public static final String TOPO_TYPE = "topology_type";
/**
* Represents number of strongly connected components in the topology.
*/
public static final String CLUSTER_COUNT = "cluster_count";
/**
* Cost for doing topology computation.
*/
public static final String COMPUTE_COST = "compute_cost";
/**
* Represents topology creation time.
*/
public static final String CREATE_TIME = "creation_time";
/**
* Represents number of infrastructure devices in the topology.
*/
public static final String DEVICE_COUNT = "device_count";
/**
* Represents number of links in the topology.
*/
public static final String LINK_COUNT = "link_count";
/**
* Represents links destination DeviceId.
*/
public static final String DEST = "dst";
/**
* Represents links source DeviceId.
*/
public static final String SRC = "src";
/**
* True if the link is expected, false otherwise.
*/
public static final String EXPECTED = "expected";
/**
* Represents link state ACTIVE or INACTIVE.
*/
public static final String STATE = "state";
/**
* Represents link type like LINK_ADDED, LINK_UPDATE, LINK_REMOVED.
*/
public static final String LINK_TYPE = "link_type";
/**
* Represents the rabbit mq server properties stored in resources directory.
*/
public static final String MQ_PROP_NAME = "rabbitmq.properties";
/**
* Represents rabbit mq module name for app initialization.
*/
public static final String ONOS_APP_NAME = "org.onosproject.rabbitmq";
/**
* Represents rabbit mq publisher correlation identifier.
*/
public static final String SENDER_COR_ID = "rmq.sender.correlation.id";
/**
* Represents rabbit mq server protocol.
*/
public static final String SERVER_PROTO = "rmq.server.protocol";
/**
* Represents rabbit mq server user name.
*/
public static final String SERVER_UNAME = "rmq.server.username";
/**
* Represents rabbit mq server password.
*/
public static final String SERVER_PWD = "rmq.server.password";
/**
* Represents rabbit mq server address.
*/
public static final String SERVER_ADDR = "rmq.server.ip.address";
/**
* Represents rabbit mq server port.
*/
public static final String SERVER_PORT = "rmq.server.port";
/**
* Represents rabbit mq server vhost.
*/
public static final String SERVER_VHOST = "rmq.server.vhost";
/**
* Represents rabbit mq server exchange.
*/
public static final String SENDER_EXCHG = "rmq.sender.exchange";
/**
* Represents rabbit mq server routing key binds exchange and queue.
*/
public static final String ROUTE_KEY = "rmq.sender.routing.key";
/**
* Represents rabbit mq server queue for message delivery.
*/
public static final String SENDER_QUEUE = "rmq.sender.queue";
/**
* Represents rabbit mq server topic.
*/
public static final String TOPIC = "topic";
/**
* Represents correlation ID of the sender.
*/
public static final String COR_ID = "onos->rmqserver";
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.api;
import org.onosproject.event.Event;
import org.onosproject.net.packet.PacketContext;
/**
* Service apis for publishing device and packet events.
*/
public interface MQService {
/**
* Publishes device/link/topology events to MQ server.
*
* @param event the event type
*/
void publish(Event<? extends Enum, ?> event);
/**
* Publishes packet context message to MQ server.
*
* @param context for processing an inbound packet
*/
void publish(PacketContext context);
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.api;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.onosproject.rabbitmq.impl.BrokerHost;
import org.onosproject.rabbitmq.impl.MessageContext;
/**
* API for registering producer with server.
*/
public interface MQTransport {
/**
* Registers MQ client with the server.
*
* @param host the broker host
* @param channelConf the mq channel configurations
* @param queue the message context
* @return the sender handle
*/
Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
BlockingQueue<MessageContext> queue);
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.api;
/**
* Interface for declaring a start, publish and stop api's for mq transactions.
*/
public interface Manageable {
/**
* Establishes connection with MQ server.
*/
void start();
/**
* Publishes onos events on to MQ server.
*/
void publish();
/**
* Releases connection and channels.
*/
void stop();
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* package for api declarations.
*/
package org.onosproject.rabbitmq.api;
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.impl;
/**
* Represents the URL pointing to MQ Server. Used to connect to MQ Server.
*/
public class BrokerHost {
private final String url;
/**
* Sets the MQ Server URL.
*
* @param url represents url of the MQ Server
*/
public BrokerHost(String url) {
this.url = url;
}
/**
* Returns the MQ Server URL.
*
* @return url of the MQ Server
*/
public String getUrl() {
return url;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BrokerHost that = (BrokerHost) o;
return url != null ? url.equals(that.url) : that.url == null;
}
@Override
public int hashCode() {
return url != null ? url.hashCode() : 0;
}
@Override
public String toString() {
return url;
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.impl;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import org.onosproject.rabbitmq.api.Manageable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import static org.onosproject.rabbitmq.api.MQConstants.*;
/**
* Connects client with server using start API, publish the messages received
* from onos events and disconnect the client from server using stop API.
*/
public class MQSender implements Manageable {
private static final String E_CREATE_CHAN =
"Error creating the RabbitMQ channel";
private static final String E_PUBLISH_CHAN =
"Error in publishing to the RabbitMQ channel";
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
private static final int RECOVERY_INTERVAL = 15000;
private final BlockingQueue<MessageContext> outQueue;
private final String exchangeName;
private final String routingKey;
private final String queueName;
private final String url;
private ExecutorService executorService;
private Connection conn;
private Channel channel;
/**
* Creates a MQSender initialized with the specified parameters.
*
* @param outQueue represents message context
* @param exchangeName represents mq exchange name
* @param routingKey represents bound routing key
* @param queueName represents mq queue name
* @param url represents the mq server url
*/
public MQSender(BlockingQueue<MessageContext> outQueue, String exchangeName,
String routingKey, String queueName, String url) {
this.outQueue = outQueue;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
this.queueName = queueName;
this.url = url;
}
/**
* Sets the executor service.
*
* @param executorService the executor service to use
*/
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void start() {
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
try {
factory.setUri(url);
if (executorService != null) {
conn = factory.newConnection(executorService);
} else {
conn = factory.newConnection();
}
channel = conn.createChannel();
channel.exchangeDeclare(exchangeName, TOPIC, true);
/*
* Setting the following parameters to queue
* durable - true
* exclusive - false
* autoDelete - false
* arguments - null
*/
channel.queueDeclare(this.queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
} catch (Exception e) {
log.error(E_CREATE_CHAN, e);
}
}
@Override
public void publish() {
try {
MessageContext input = outQueue.poll();
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.correlationId(COR_ID).build(),
input.getBody());
String message1 = new String(input.getBody(), "UTF-8");
log.debug(" [x] Sent: '{}'", message1);
} catch (Exception e) {
log.error(E_PUBLISH_CHAN, e);
}
}
@Override
public void stop() {
try {
channel.close();
conn.close();
} catch (IOException e) {
log.error("Error closing the rabbit MQ connection", e);
} catch (TimeoutException e) {
log.error("Timeout exception in closing the rabbit MQ connection",
e);
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.impl;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.exception.ExceptionUtils;
import static org.onosproject.rabbitmq.api.MQConstants.*;
import org.onosproject.event.Event;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.rabbitmq.api.MQService;
import org.onosproject.rabbitmq.api.Manageable;
import org.onosproject.rabbitmq.util.MQUtil;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
import com.google.common.collect.Maps;
/**
* Default implementation of {@link MQService}.
*/
public class MQServiceImpl implements MQService {
private static final Logger log = LoggerFactory.getLogger(
MQServiceImpl.class);
private final BlockingQueue<MessageContext> msgOutQueue =
new LinkedBlockingQueue<>(10);
private Manageable manageSender;
private String correlationId;
/**
* Initializes using ComponentContext.
*
* @param context ComponentContext from OSGI
*/
public MQServiceImpl(ComponentContext context) {
initializeProducers(context);
}
/**
* Initializes MQ sender and receiver with RMQ server.
*
* @param context ComponentContext from OSGI
*/
private void initializeProducers(ComponentContext context) {
BrokerHost rfHost;
Properties prop = MQUtil.getProp(context);
if (prop == null) {
log.error("RabbitMQ configuration file not found...");
return;
}
try {
correlationId = prop.getProperty(SENDER_COR_ID);
rfHost = new BrokerHost(MQUtil.getMqUrl(
prop.getProperty(SERVER_PROTO),
prop.getProperty(SERVER_UNAME),
prop.getProperty(SERVER_PWD),
prop.getProperty(SERVER_ADDR),
prop.getProperty(SERVER_PORT),
prop.getProperty(SERVER_VHOST)));
manageSender = registerProducer(rfHost,
MQUtil.rfProducerChannelConf(
prop.getProperty(SENDER_EXCHG),
prop.getProperty(ROUTE_KEY),
prop.getProperty(SENDER_QUEUE)),
msgOutQueue);
} catch (Exception e) {
throw new RuntimeException(e);
}
manageSender.start();
}
/**
* Returns the handle to call an api for publishing messages to RMQ server.
*/
private Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
BlockingQueue<MessageContext> msgOutQueue) {
return new MQTransportImpl().registerProducer(host, channelConf, msgOutQueue);
}
private byte[] bytesOf(JsonObject jo) {
return jo.toString().getBytes();
}
/**
* Publishes Device, Topology &amp; Link event message to MQ server.
*
* @param event Event received from the corresponding service like topology, device etc
*/
@Override
public void publish(Event<? extends Enum, ?> event) {
byte[] body = null;
if (null == event) {
log.error("Captured event is null...");
return;
}
if (event instanceof DeviceEvent) {
body = bytesOf(MQUtil.json((DeviceEvent) event));
} else if (event instanceof TopologyEvent) {
body = bytesOf(MQUtil.json((TopologyEvent) event));
} else if (event instanceof LinkEvent) {
body = bytesOf(MQUtil.json((LinkEvent) event));
} else {
log.error("Invalid event: '{}'", event);
}
processAndPublishMessage(body);
}
/**
* Publishes packet message to MQ server.
*
* @param context Context of the packet recieved including details like mac, length etc
*/
@Override
public void publish(PacketContext context) {
byte[] body = bytesOf(MQUtil.json(context));
processAndPublishMessage(body);
}
/*
* Constructs message context and publish it to rabbit mq server.
*
* @param body Byte stream of the event's JSON data
*/
private void processAndPublishMessage(byte[] body) {
Map<String, Object> props = Maps.newHashMap();
props.put(CORRELATION_ID, correlationId);
MessageContext mc = new MessageContext(body, props);
try {
msgOutQueue.put(mc);
String message = new String(body, "UTF-8");
log.debug(" [x] Sent '{}'", message);
} catch (InterruptedException | UnsupportedEncodingException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
manageSender.publish();
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.impl;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.onosproject.rabbitmq.api.MQTransport;
import org.onosproject.rabbitmq.api.Manageable;
import static org.onosproject.rabbitmq.api.MQConstants.*;
/**
* Provides handle to call MQSender for message delivery.
*/
public class MQTransportImpl implements MQTransport {
@Override
public Manageable registerProducer(BrokerHost host,
Map<String, String> channelConf,
BlockingQueue<MessageContext> queue) {
String exchangeName = channelConf.get(EXCHANGE_NAME_PROPERTY);
String routingKey = channelConf.get(ROUTING_KEY_PROPERTY);
String queueName = channelConf.get(QUEUE_NAME_PROPERTY);
return new MQSender(queue, exchangeName, routingKey, queueName,
host.getUrl());
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.impl;
import java.io.Serializable;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Represents message context like data in byte stream and mq properties for
* message delivery.
*/
public class MessageContext implements Serializable {
private static final long serialVersionUID = -4174900539976805047L;
private static final String NULL_ERR =
"The body and properties should be present";
private final Map<String, Object> properties;
private final byte[] body;
/**
* Initializes MessageContext class.
*
* @param body Byte stream of the event's JSON data
* @param properties Map of the Message Queue properties
*/
public MessageContext(byte[] body, Map<String, Object> properties) {
this.body = checkNotNull(body, NULL_ERR);
this.properties = checkNotNull(properties, NULL_ERR);
}
/**
* Returns the Message Properties Map.
*
* @return Map of the Message Queue properties
*/
public Map<String, Object> getProperties() {
return properties;
}
/**
* Returns the Message Properties Map.
*
* @return Byte stream of the event's JSON data
*/
public byte[] getBody() {
return body;
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package for mq implementation.
*/
package org.onosproject.rabbitmq.impl;
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.listener;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import java.util.concurrent.ExecutorService;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.link.ProbedLinkProvider;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.rabbitmq.api.MQConstants;
import org.onosproject.rabbitmq.api.MQService;
import org.onosproject.rabbitmq.impl.MQServiceImpl;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Listens to events generated from Device Event/PKT_IN/Topology/Link.
* Then publishes events to rabbitmq server via publish() api.
*/
@Component(immediate = true)
public class MQEventHandler extends AbstractProvider
implements ProbedLinkProvider {
private static final Logger log = LoggerFactory.getLogger(
MQEventHandler.class);
private static final String PROVIDER_NAME = MQConstants.ONOS_APP_NAME;
private static final int PKT_PROC_PRIO = 1;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
private MQService mqService;
private DeviceListener deviceListener;
protected ExecutorService eventExecutor;
private final InternalPacketProcessor packetProcessor =
new InternalPacketProcessor();
private final LinkListener linkListener = new InternalLinkListener();
private final TopologyListener topologyListener =
new InternalTopologyListener();
/**
* Initialize parent class with provider.
*/
public MQEventHandler() {
super(new ProviderId("rabbitmq", PROVIDER_NAME));
}
@Activate
protected void activate(ComponentContext context) {
mqService = new MQServiceImpl(context);
eventExecutor = newSingleThreadScheduledExecutor(
groupedThreads("onos/deviceevents", "events-%d", log));
deviceListener = new InternalDeviceListener();
deviceService.addListener(deviceListener);
packetService.addProcessor(packetProcessor,
PacketProcessor.advisor(PKT_PROC_PRIO));
linkService.addListener(linkListener);
topologyService.addListener(topologyListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
deviceService.removeListener(deviceListener);
packetService.removeProcessor(packetProcessor);
eventExecutor.shutdownNow();
eventExecutor = null;
linkService.removeListener(linkListener);
topologyService.removeListener(topologyListener);
log.info("Stopped");
}
/**
* Captures incoming device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
if (event == null) {
log.error("Device event is null.");
return;
}
mqService.publish(event);
}
}
/**
* Captures incoming packets from switches connected to ONOS
* controller..
*/
private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
if (context == null) {
log.error("Packet context is null.");
return;
}
mqService.publish(context);
}
}
/**
* Listens to link events and processes the link additions.
*/
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
if (event == null) {
log.error("Link event is null.");
return;
}
mqService.publish(event);
}
}
/**
* Listens to topology events and processes the topology changes.
*/
private class InternalTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
if (event == null) {
log.error("Topology event is null.");
return;
}
mqService.publish(event);
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* RabbitMQ module used for publishing device and packet events to MQ server.
*/
package org.onosproject.rabbitmq.listener;
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rabbitmq.util;
import java.io.File;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.onlab.packet.EthType;
import org.onosproject.net.Link;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.topology.Topology;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.packet.InboundPacket;
import org.osgi.service.component.ComponentContext;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onosproject.rabbitmq.api.MQConstants.*;
/**
* MQ utility class for constructing server url, packet message, device message,
* topology message and link message.
*/
public final class MQUtil {
private static final String COLON = ":";
private static final String AT = "@";
private static final String CDFS = "://";
private static final String FS = "/";
private static final String UTF8 = "UTF-8";
private static final Logger log = LoggerFactory.getLogger(MQUtil.class);
private MQUtil() {
}
/**
* Returns the MQ server url.
*
* @param proto mq server protocol
* @param userName mq server username
* @param password mq server password
* @param ipAddr server ip address
* @param port server port
* @param vhost server vhost
* @return server url
*/
public static String getMqUrl(String proto, String userName,
String password, String ipAddr, String port,
String vhost) {
StringBuilder urlBuilder = new StringBuilder();
try {
urlBuilder.append(proto).append(CDFS).append(userName).append(COLON)
.append(password).append(AT)
.append(ipAddr).append(COLON).append(port).append(FS)
.append(URLEncoder.encode(vhost, UTF8));
} catch (UnsupportedEncodingException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
return urlBuilder.toString().replaceAll("\\s+", "");
}
/**
* Initializes and returns publisher channel configuration.
*
* @param exchange the configured mq exchange name
* @param routingKey the configured mq routing key
* @param queueName the configured mq queue name
* @return the server url
*/
public static Map<String, String> rfProducerChannelConf(String exchange,
String routingKey, String queueName) {
Map<String, String> channelConf = new HashMap<>();
channelConf.put(EXCHANGE_NAME_PROPERTY, exchange);
channelConf.put(ROUTING_KEY_PROPERTY, routingKey);
channelConf.put(QUEUE_NAME_PROPERTY, queueName);
return channelConf;
}
/**
* Returns a JSON representation of the given device event.
*
* @param event the device event
* @return the device event json message
*/
public static JsonObject json(DeviceEvent event) {
JsonObject jo = new JsonObject();
jo.addProperty(SWITCH_ID, event.subject().id().toString());
jo.addProperty(INFRA_DEVICE_NAME, event.subject().type().name());
jo.addProperty(EVENT_TYPE, DEVICE_EVENT);
if (event.port() != null) {
jo.addProperty(PORT_NUMBER, event.port().number().toLong());
jo.addProperty(PORT_ENABLED, event.port().isEnabled());
jo.addProperty(PORT_SPEED, event.port().portSpeed());
jo.addProperty(SUB_EVENT_TYPE,
event.type().name() != null ? event.type().name() : null);
} else {
jo.addProperty(SUB_EVENT_TYPE,
event.type().name() != null ? event.type().name() : null);
}
jo.addProperty(HW_VERSION, event.subject().hwVersion());
jo.addProperty(MFR, event.subject().manufacturer());
jo.addProperty(SERIAL, event.subject().serialNumber());
jo.addProperty(SW_VERSION, event.subject().swVersion());
jo.addProperty(CHASIS_ID, event.subject().chassisId().id());
jo.addProperty(OCC_TIME, new Date(event.time()).toString());
return jo;
}
/**
* Returns a JSON representation of the given packet context.
*
* @param context the packet context
* @return the inbound packetjson message
*/
public static JsonObject json(PacketContext context) {
JsonObject jo = new JsonObject();
InboundPacket pkt = context.inPacket();
// parse connection host
jo.addProperty(SWITCH_ID, pkt.receivedFrom().deviceId().toString());
jo.addProperty(IN_PORT, pkt.receivedFrom().port().name());
jo.addProperty(LOGICAL, pkt.receivedFrom().port().isLogical());
jo.addProperty(RECIEVED, new Date(context.time()).toString());
jo.addProperty(MSG_TYPE, PKT_TYPE);
// parse ethernet
jo.addProperty(SUB_MSG_TYPE,
EthType.EtherType.lookup(pkt.parsed().getEtherType()).name());
jo.addProperty(ETH_TYPE, pkt.parsed().getEtherType());
jo.addProperty(SRC_MAC_ADDR, pkt.parsed().getSourceMAC().toString());
jo.addProperty(DEST_MAC_ADDR, pkt.parsed().getDestinationMAC().toString());
jo.addProperty(VLAN_ID, pkt.parsed().getVlanID());
jo.addProperty(B_CAST, pkt.parsed().isBroadcast());
jo.addProperty(M_CAST, pkt.parsed().isMulticast());
jo.addProperty(PAD, pkt.parsed().isPad());
jo.addProperty(PRIORITY_CODE, pkt.parsed().getPriorityCode());
// parse bytebuffer
jo.addProperty(DATA_LEN, pkt.unparsed().array().length);
jo.addProperty(PAYLOAD, pkt.unparsed().asCharBuffer().toString());
return jo;
}
/**
* Returns a JSON representation of the given topology event.
*
* @param event the topology event
* @return the topology event json message
*/
public static JsonObject json(TopologyEvent event) {
Topology topology = event.subject();
JsonObject jo = new JsonObject();
jo.addProperty(TOPO_TYPE, TopologyEvent.Type.TOPOLOGY_CHANGED.name());
jo.addProperty(CLUSTER_COUNT, topology.clusterCount());
jo.addProperty(COMPUTE_COST, topology.computeCost());
jo.addProperty(CREATE_TIME, new Date(topology.creationTime()).toString());
jo.addProperty(DEVICE_COUNT, topology.deviceCount());
jo.addProperty(LINK_COUNT, topology.linkCount());
jo.addProperty(AVAILABLE, new Date(topology.time()).toString());
return jo;
}
/**
* Returns a JSON representation of the given link event.
*
* @param event the link event
* @return the link event json message
*/
public static JsonObject json(LinkEvent event) {
Link link = event.subject();
JsonObject jo = new JsonObject();
jo.addProperty(EVENT_TYPE, event.type().name());
jo.addProperty(DEST, link.dst().deviceId().toString());
jo.addProperty(SRC, link.src().deviceId().toString());
jo.addProperty(EXPECTED, link.isExpected());
jo.addProperty(STATE, link.state().name());
jo.addProperty(LINK_TYPE, link.type().name());
return jo;
}
/**
* Handles load mq property file from resources and returns Properties.
*
* @param context the component context
* @return the mq server properties
* @throws RuntimeException if property file not found.
*/
public static Properties getProp(ComponentContext context) {
URL configUrl;
try {
configUrl = context.getBundleContext().getBundle()
.getResource(MQ_PROP_NAME);
} catch (Exception ex) {
// This will be used only during junit test case since bundle
// context will be available during runtime only.
File file = new File(
MQUtil.class.getClassLoader().getResource(MQ_PROP_NAME)
.getFile());
try {
configUrl = file.toURL();
} catch (MalformedURLException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
throw new RuntimeException(e);
}
}
Properties properties;
try {
InputStream is = configUrl.openStream();
properties = new Properties();
properties.load(is);
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
throw new RuntimeException(e);
}
return properties;
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Packet for mq utility.
*/
package org.onosproject.rabbitmq.util;
#Modified the below properties as per your requirements.
rmq.server.protocol = amqp
rmq.server.username = onosrmq
rmq.server.password = onosrocks
rmq.server.port = 5672
rmq.server.ip.address = 127.0.0.1
rmq.server.vhost = /
rmq.sender.type = topic
rmq.sender.correlation.id = onos->rmqserver
rmq.sender.exchange = onos_exchg_wr_to_rmqs
rmq.sender.routing.key = onos.rkey.rmqs
rmq.sender.queue = onos_send_queue
......@@ -33,6 +33,8 @@ osgi_feature_group(
':org.apache.karaf.features.core',
':org.apache.karaf.system.core',
':jsr305',
':amqp-client',
':gson',
],
)
......@@ -1102,3 +1104,21 @@ remote_jar (
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'amqp-client',
out = 'amqp-client-3.6.1.jar',
url = 'mvn:com.rabbitmq:amqp-client:jar:3.6.1',
sha1 = '089be4acfa8a0fa48a775a82d20632f90aecf10b',
maven_coords = 'com.rabbitmq:amqp-client:3.6.1',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'gson',
out = 'gson-2.6.2.jar',
url = 'mvn:com.google.code.gson:gson:jar:2.6.2',
sha1 = 'f1bc476cc167b18e66c297df599b2377131a8947',
maven_coords = 'com.google.code.gson:gson:2.6.2',
visibility = [ 'PUBLIC' ],
)
......