Shravan Ambati
Committed by Gerrit Code Review

CodeReview - Initial Commit for Kafka Integration Application

1. Partial REST API implementation
2. Partial Event Manager backend implementation

Change-Id: Ieaf703f7a3f6e296aea8ffcf155c7a1b603236ca
Showing 20 changed files with 1109 additions and 0 deletions
<?sxml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-apps</artifactId>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-kafka</artifactId>
<packaging>bundle</packaging>
<description>
ONOS Kafka Integration Application.
This will export ONOS events to an external Kafka Server
</description>
<url>http://onosproject.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<onos.version>1.6.0-SNAPSHOT</onos.version>
<onos.app.name>org.onosproject.kafkaintegration</onos.app.name>
<onos.app.title>Kafka Integration Application</onos.app.title>
<onos.app.origin>Calix, Inc.</onos.app.origin>
<web.context>/onos/kafka</web.context>
<api.version>1.0.0</api.version>
<api.package>org.onosproject.kafkaintegration.rest</api.package>
<api.title>Kafka Integration Application REST API</api.title>
<api.description>
APIs for subscribing to Events generated by onos
</api.description>
<onos.app.category>Utility</onos.app.category>
<onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
<onos.app.readme>Export onos events to a Northbound Kafka server</onos.app.readme>
</properties>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<version>${onos.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${onos.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-rest</artifactId>
<version>${onos.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<version>${onos.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>2.22.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${onos.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<version>1.9.12</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.1</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>
${project.groupId}.${project.artifactId}
</Bundle-SymbolicName>
<_wab>src/main/webapp/</_wab>
<Include-Resource>
WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
{maven-resources}
</Include-Resource>
<Import-Package>
org.slf4j,
org.osgi.framework,
javax.ws.rs,
javax.ws.rs.core,
org.glassfish.jersey.servlet,
com.fasterxml.jackson.databind,
com.fasterxml.jackson.databind.node,
com.fasterxml.jackson.core,
org.onlab.packet.*,
org.onosproject.*,
com.google.common.*
</Import-Package>
<Web-ContextPath>${web.context}</Web-ContextPath>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<version>1.21.0</version>
<executions>
<execution>
<id>generate-scr-srcdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
<configuration>
<supportedProjectTypes>
<supportedProjectType>bundle</supportedProjectType>
<supportedProjectType>war</supportedProjectType>
</supportedProjectTypes>
</configuration>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
<version>1.9</version>
<executions>
<execution>
<id>cfg</id>
<phase>generate-resources</phase>
<goals>
<goal>cfg</goal>
</goals>
</execution>
<execution>
<id>swagger</id>
<phase>generate-sources</phase>
<goals>
<goal>swagger</goal>
</goals>
</execution>
<execution>
<id>app</id>
<phase>package</phase>
<goals>
<goal>app</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import java.util.UUID;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kafkaintegration.api.EventExporterService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import org.onosproject.kafkaintegration.errors.UnsupportedEventException;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.link.LinkService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of Event Exporter Service.
*
*/
@Component(immediate = true)
@Service
public class EventExporterManager implements EventExporterService {
private final Logger log = LoggerFactory.getLogger(getClass());
// Stores the currently registered applications for event export service.
// Map of Appname to groupId
private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
private static final String REGISTERED_APPS = "registered-applications";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private static final String NOT_YET_SUPPORTED = "Not yet supported.";
private ApplicationId appId;
@Activate
protected void activate() {
appId = coreService
.registerApplication("org.onosproject.kafkaintegration");
registeredApps = storageService
.<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
.withName(REGISTERED_APPS)
.withSerializer(Serializer.using(KryoNamespaces.API,
EventSubscriberGroupId.class,
UUID.class))
.build().asJavaMap();
log.info("Started");
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
@Override
public EventSubscriberGroupId registerListener(String appName) {
// TODO: Remove it once ONOS provides a mechanism for external apps
// to register with the core service. See Jira - 4409
ApplicationId externalAppId = coreService.registerApplication(appName);
return registeredApps.computeIfAbsent(externalAppId,
(key) -> new EventSubscriberGroupId(UUID
.randomUUID()));
}
@Override
public void unregisterListener(String appName) {
ApplicationId externalAppId =
checkNotNull(coreService.getAppId(appName));
registeredApps.remove(externalAppId);
}
@Override
public void subscribe(EventSubscriber subscriber)
throws UnsupportedEventException, InvalidGroupIdException,
InvalidApplicationException {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void unsubscribe(EventSubscriber subscriber)
throws InvalidGroupIdException, InvalidApplicationException {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import org.onosproject.kafkaintegration.errors.UnsupportedEventException;
/**
* APIs for subscribing to Onos Event Messages.
*/
public interface EventExporterService {
/**
* Registers the external application to receive events generated in ONOS.
*
* @param appName Application Name
* @return unique consumer group identifier
*/
EventSubscriberGroupId registerListener(String appName);
/**
* Removes the Registered Listener.
*
* @param appName Application Name
*/
void unregisterListener(String appName);
/**
* Allows registered listener to subscribe for a specific event type.
*
* @param subscriber Subscription data containing the event type
* @throws UnsupportedEventException
* @throws InvalidGroupIdException
* @throws InvalidApplicationException
*/
void subscribe(EventSubscriber subscriber)
throws UnsupportedEventException, InvalidGroupIdException,
InvalidApplicationException;
/**
* Allows the registered listener to unsubscribe for a specific event.
*
* @param subscriber Subscription data containing the event type
* @throws InvalidGroupIdException
* @throws InvalidApplicationException
*/
void unsubscribe(EventSubscriber subscriber)
throws InvalidGroupIdException, InvalidApplicationException;
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
/**
* Representation of a subscription to an event type.
*
*/
public final class EventSubscriber {
private final String appName;
private final EventSubscriberGroupId subscriberGroupId;
private final Type eventType;
/**
* Creates a new Event Subscriber.
*
* @param name Application Name
* @param groupId Subscriber group id of the application
* @param eventType ONOS event type
*/
public EventSubscriber(String name, EventSubscriberGroupId groupId,
Type eventType) {
this.appName = checkNotNull(name);
this.subscriberGroupId = checkNotNull(groupId);
this.eventType = checkNotNull(eventType);
}
/**
* Returns the Application Name.
*
* @return application name
*/
public String appName() {
return appName;
}
/**
* Returns the Subscriber Group Id.
*
* @return Subscriber Group Id
*/
public EventSubscriberGroupId subscriberGroupId() {
return subscriberGroupId;
}
/**
* Returns the Event type.
*
* @return ONOS Event Type
*/
public Type eventType() {
return eventType;
}
@Override
public int hashCode() {
return Objects.hash(appName, subscriberGroupId, eventType);
}
@Override
public boolean equals(Object o) {
if (o instanceof EventSubscriber) {
EventSubscriber sub = (EventSubscriber) o;
if (sub.appName.equals(appName)
&& sub.subscriberGroupId.equals(subscriberGroupId)
&& sub.eventType.equals(eventType)) {
return true;
}
}
return false;
}
@Override
public String toString() {
return toStringHelper(this).add("appName", appName)
.addValue(subscriberGroupId.toString())
.add("eventType", eventType).toString();
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import java.util.UUID;
/**
* Wrapper Object for storing the consumer group id. Group ids are used by
* external applications when consuming events from Kafka server.
*
*/
public final class EventSubscriberGroupId {
private final UUID id;
/**
* Creates a new Subscriber Group Id.
*
* @param uuid representing the group id.
*/
public EventSubscriberGroupId(UUID uuid) {
id = checkNotNull(uuid);
}
/**
* Returns the Group Id of the subscriber.
*
* @return uuid representing the group id.
*/
public UUID getId() {
return id;
}
@Override
public boolean equals(Object o) {
if (o instanceof EventSubscriberGroupId) {
EventSubscriberGroupId sub = (EventSubscriberGroupId) o;
if (sub.id.equals(id)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public String toString() {
return toStringHelper(this).add("subscriberGroupId", id).toString();
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
import org.onosproject.event.AbstractEvent;
import com.google.protobuf.GeneratedMessage;
/**
* Represents the converted Onos Event data into GPB format.
*
*/
public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessage> {
/**
* Creates a new Onos Event.
*
* @param type The Type of Onos Event
* @param subject Protobuf message corresponding to the Onos Event
*/
public OnosEvent(Type type, GeneratedMessage subject) {
super(type, subject);
}
/**
* List of Event Types supported.
*/
public enum Type {
DEVICE, LINK;
}
}
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Immutable Data Transfer Objects.
*/
package org.onosproject.kafkaintegration.api.dto;
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* API definitions for the Application.
*/
package org.onosproject.kafkaintegration.api;
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.errors;
/**
* Represents that an unregistered application trying to subscribe to ONOS
* events.
*
*/
public class InvalidApplicationException extends RuntimeException {
private static final long serialVersionUID = 1L;
public InvalidApplicationException(String message) {
super(message);
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.errors;
/**
* The groupId given by the external application is already in use by another
* application or groupId does not exist.
*
*/
public class InvalidGroupIdException extends RuntimeException {
private static final long serialVersionUID = 1L;
public InvalidGroupIdException(String message) {
super(message);
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.errors;
/**
* Event Type requested for subscription is not supported/available for export.
*
*/
public class UnsupportedEventException extends RuntimeException {
private static final long serialVersionUID = 1L;
public UnsupportedEventException(String message) {
super(message);
}
}
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Application specific Exception classes.
*/
package org.onosproject.kafkaintegration.errors;
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* API implementation classes.
*/
package org.onosproject.kafkaintegration;
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.rest;
import static com.google.common.base.Preconditions.checkNotNull;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import java.io.IOException;
import java.io.InputStream;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.onosproject.codec.JsonCodec;
import org.onosproject.kafkaintegration.api.EventExporterService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Rest Interfaces for subscribing/unsubscribing to event notifications.
*/
@Path("kafkaService")
public class EventExporterWebResource extends AbstractWebResource {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final String JSON_NOT_NULL = "Registration Data cannot be empty";
public static final String REGISTRATION_SUCCESSFUL = "Registered Listener successfully";
public static final String DEREGISTRATION_SUCCESSFUL = "De-Registered Listener successfully";
public static final String EVENT_SUBSCRIPTION_SUCCESSFUL = "Event Registration successfull";
public static final String EVENT_SUBSCRIPTION_REMOVED = "Event De-Registration successfull";
/**
* Registers a listener for Onos Events.
*
* @param appName The application trying to register
* @return 200 OK with UUID string which should be used as Kafka Consumer
* Group Id
* @onos.rsModel KafkaRegistration
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("register")
public Response registerKafkaListener(String appName) {
EventExporterService service = get(EventExporterService.class);
EventSubscriberGroupId groupId = service.registerListener(appName);
log.info("Registered app {}", appName);
// TODO: Should also return Kafka server information.
// Will glue this in when we have the config and Kafka modules ready
return ok(groupId.getId().toString()).build();
}
/**
* Unregisters a listener for Onos Events.
*
* @param appName The application trying to unregister
* @return 200 OK
* @onos.rsModel KafkaRegistration
*/
@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("unregister")
public Response removeKafkaListener(String appName) {
EventExporterService service = get(EventExporterService.class);
service.unregisterListener(appName);
return ok(DEREGISTRATION_SUCCESSFUL).build();
}
/**
* Creates subscription to a specific Onos event.
*
* @param input Subscription Data in Json format
* @return 200 OK if successful or 400 BAD REQUEST
* @onos.rsModel KafkaSubscription
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("subscribe")
public Response subscribe(InputStream input) {
EventExporterService service = get(EventExporterService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
service.subscribe(sub);
} catch (Exception e) {
log.error(e.getMessage());
return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
}
return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
}
/**
* Parses Json Subscription Data from the external application.
*
* @param node node within the parsed json tree.
* @return parsed DTO object
* @throws IOException
*/
private EventSubscriber parseSubscriptionData(InputStream input)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode node = (ObjectNode) mapper.readTree(input);
checkNotNull(node, JSON_NOT_NULL);
JsonCodec<EventSubscriber> codec = codec(EventSubscriber.class);
return codec.decode(node, this);
}
/**
* Deletes subscription from a specific Onos event.
*
* @param input data in json format
* @return 200 OK if successful or 400 BAD REQUEST
* @onos.rsModel KafkaSubscription
*/
@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("unsubscribe")
public Response unsubscribe(InputStream input) {
EventExporterService service = get(EventExporterService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
service.subscribe(sub);
} catch (Exception e) {
log.error(e.getMessage());
return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
}
return ok(EVENT_SUBSCRIPTION_REMOVED).build();
}
}
/**
* Copyright 2016-present Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.rest;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.UUID;
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Codec for encoding/decoding a Subscriber object to/from JSON.
*
*/
public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
// JSON field names
private static final String NAME = "appName";
private static final String GROUP_ID = "groupId";
private static final String EVENT_TYPE = "eventType";
@Override
public ObjectNode encode(EventSubscriber data, CodecContext context) {
checkNotNull(data, "Subscriber cannot be null");
return context.mapper().createObjectNode().put(NAME, data.appName())
.put(GROUP_ID, data.subscriberGroupId().getId().toString())
.put(EVENT_TYPE, data.eventType().toString());
}
@Override
public EventSubscriber decode(ObjectNode json, CodecContext context) {
String name = json.path(NAME).asText();
String groupId = json.path(GROUP_ID).asText();
EventSubscriberGroupId subscriberGroupId = new EventSubscriberGroupId(UUID
.fromString(groupId));
String eventType = json.path(EVENT_TYPE).asText();
return new EventSubscriber(name, subscriberGroupId,
Type.valueOf(eventType));
}
}
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* REST API Definitions.
*/
package org.onosproject.kafkaintegration.rest;
{
"type": "string",
"title": "KafkaRegistration",
"example": "forwardingApp"
}
\ No newline at end of file
{
"type": "object",
"title": "KafkaSubscription",
"required": [
"appName",
"groupId",
"eventType"
],
"properties": {
"appName": {
"type": "string",
"example": "forwardingApp"
},
"groupId": {
"type": "string",
"example": "18285435-2c62-4684-96dd-fb03b7cd0c83"
},
"eventType": {
"type": "string",
"example": "DEVICE"
}
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
id="ONOS" version="2.5">
<display-name>Event Exporter REST API</display-name>
<servlet>
<servlet-name>JAX-RS Service</servlet-name>
<servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
<init-param>
<param-name>jersey.config.server.provider.classnames</param-name>
<param-value>org.onosproject.kafkaintegration.rest.EventExporterWebResource</param-value>
</init-param>
<load-on-startup>10</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>JAX-RS Service</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
</web-app>
......@@ -46,6 +46,7 @@
<module>test</module>
<module>segmentrouting</module>
<module>xos-integration</module>
<module>kafka-integration</module>
<module>pcep-api</module>
<module>iptopology-api</module>
<module>pce</module>
......