alshabib
Committed by Gerrit Code Review

implemented a distributed default flow registration mechanism to avoid duplicate…

… requests from other onos instances

Change-Id: Ib2abb483456538e3e08e9790c4b4b0d50db8b384

implemented a distributed default flow registration mechanism to avoid
duplicate requests from other onos instances

Change-Id: I620cc51ac29cddaffa73cdbb20e9a9acbdd9ea69
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.packet;
import com.google.common.base.MoreObjects;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.TrafficSelector;
/**
* Default implementation of a packet request.
*/
public final class DefaultPacketRequest implements PacketRequest {
private final TrafficSelector selector;
private final PacketPriority priority;
private final ApplicationId appId;
private final FlowRule.Type tableType;
public DefaultPacketRequest(TrafficSelector selector, PacketPriority priority,
ApplicationId appId, FlowRule.Type tableType) {
this.selector = selector;
this.priority = priority;
this.appId = appId;
this.tableType = tableType;
}
public TrafficSelector selector() {
return selector;
}
public PacketPriority priority() {
return priority;
}
public ApplicationId appId() {
return appId;
}
public FlowRule.Type tableType() {
return tableType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultPacketRequest that = (DefaultPacketRequest) o;
if (priority != that.priority) {
return false;
}
if (!selector.equals(that.selector)) {
return false;
}
if (!tableType.equals(that.tableType)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = selector.hashCode();
result = 31 * result + priority.hashCode();
return result;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this.getClass())
.add("selector", selector)
.add("priority", priority)
.add("appId", appId)
.add("table-type", tableType).toString();
}
}
\ No newline at end of file
......@@ -46,4 +46,8 @@ public enum PacketPriority {
public int priorityValue() {
return priorityValue;
}
public String toString() {
return String.valueOf(priorityValue);
}
}
\ No newline at end of file
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.packet;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.TrafficSelector;
/**
* Represents a packet request made to devices.
*/
public interface PacketRequest {
/**
* Obtain the traffic selector.
* @return a traffic selector
*/
public TrafficSelector selector();
/**
* Obtain the priority.
* @return a PacketPriority
*/
public PacketPriority priority();
/**
* Obtain the application id.
* @return an application id
*/
public ApplicationId appId();
/**
* Obtain the table type.
* @return a table type
*/
public FlowRule.Type tableType();
}
......@@ -17,6 +17,8 @@ package org.onosproject.net.packet;
import org.onosproject.store.Store;
import java.util.Set;
/**
* Manages routing of outbound packets.
*/
......@@ -31,4 +33,21 @@ public interface PacketStore extends Store<PacketEvent, PacketStoreDelegate> {
*/
void emit(OutboundPacket packet);
/**
* Register a request for packets. If the registration
* is successful the manager can proceed, otherwise it should
* consider these packet already available in the system.
*
* @param request a packet request
* @return a boolean indicating registration state.
*/
boolean requestPackets(PacketRequest request);
/**
* Obtains all existing requests in the system.
*
* @return a set of packet requests
*/
Set<PacketRequest> existingRequests();
}
......
......@@ -23,7 +23,6 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.Device;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
......@@ -33,6 +32,7 @@ import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.packet.DefaultPacketRequest;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketEvent;
......@@ -41,6 +41,7 @@ import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketProvider;
import org.onosproject.net.packet.PacketProviderRegistry;
import org.onosproject.net.packet.PacketProviderService;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
......@@ -48,9 +49,7 @@ import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -82,68 +81,6 @@ implements PacketService, PacketProviderRegistry {
private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
private Set<PacketRequest> packetRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final class PacketRequest {
private final TrafficSelector selector;
private final PacketPriority priority;
private final ApplicationId appId;
private final FlowRule.Type tableType;
public PacketRequest(TrafficSelector selector, PacketPriority priority,
ApplicationId appId, FlowRule.Type tableType) {
this.selector = selector;
this.priority = priority;
this.appId = appId;
this.tableType = tableType;
}
public TrafficSelector selector() {
return selector;
}
public PacketPriority priority() {
return priority;
}
public ApplicationId appId() {
return appId;
}
public FlowRule.Type tableType() {
return tableType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PacketRequest that = (PacketRequest) o;
if (priority != that.priority) {
return false;
}
if (!selector.equals(that.selector)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = selector.hashCode();
result = 31 * result + priority.hashCode();
return result;
}
}
@Activate
public void activate() {
store.setDelegate(delegate);
......@@ -177,10 +114,11 @@ implements PacketService, PacketProviderRegistry {
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request =
new PacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
new DefaultPacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
packetRequests.add(request);
pushToAllDevices(request);
if (store.requestPackets(request)) {
pushToAllDevices(request);
}
}
@Override
......@@ -192,11 +130,12 @@ implements PacketService, PacketProviderRegistry {
+ "without table hints, use other methods in the packetService API");
PacketRequest request =
new PacketRequest(selector, priority, appId, tableType);
new DefaultPacketRequest(selector, priority, appId, tableType);
if (packetRequests.add(request)) {
if (store.requestPackets(request)) {
pushToAllDevices(request);
}
}
/**
......@@ -206,9 +145,7 @@ implements PacketService, PacketProviderRegistry {
*/
private void pushToAllDevices(PacketRequest request) {
for (Device device : deviceService.getDevices()) {
if (deviceService.getRole(device.id()) == MastershipRole.MASTER) {
pushRule(device, request);
}
pushRule(device, request);
}
}
......@@ -303,7 +240,7 @@ implements PacketService, PacketProviderRegistry {
public void event(DeviceEvent event) {
Device device = event.subject();
if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
for (PacketRequest request : packetRequests) {
for (PacketRequest request : store.existingRequests()) {
pushRule(device, request);
}
}
......
......@@ -28,6 +28,7 @@ import org.onosproject.mastership.MastershipService;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
......@@ -37,8 +38,12 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -69,6 +74,11 @@ public class DistributedPacketStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private PacketRequestTracker tracker;
private static final MessageSubject PACKET_OUT_SUBJECT =
new MessageSubject("packet-out");
......@@ -94,6 +104,8 @@ public class DistributedPacketStore
new InternalClusterMessageHandler(),
messageHandlingExecutor);
tracker = new PacketRequestTracker();
log.info("Started");
}
......@@ -125,6 +137,16 @@ public class DistributedPacketStore
// error log: log.warn("Failed to send packet-out to {}", master);
}
@Override
public boolean requestPackets(PacketRequest request) {
return tracker.add(request);
}
@Override
public Set<PacketRequest> existingRequests() {
return tracker.requests();
}
/**
* Handles incoming cluster messages.
*/
......@@ -140,4 +162,46 @@ public class DistributedPacketStore
}
}
private class PacketRequestTracker {
private ConsistentMap<PacketRequest, Boolean> requests;
public PacketRequestTracker() {
requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
.withName("packet-requests")
.withSerializer(new Serializer() {
KryoNamespace kryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.build();
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
return kryo.deserialize(bytes);
}
}).build();
}
public boolean add(PacketRequest request) {
if (requests.putIfAbsent(request, true) == null) {
return true;
}
return false;
}
public boolean remove(PacketRequest request) {
if (requests.remove(request) == null) {
return false;
}
return true;
}
public Set<PacketRequest> requests() {
return requests.keySet();
}
}
}
......
......@@ -106,6 +106,8 @@ import org.onosproject.net.intent.constraint.ObstacleConstraint;
import org.onosproject.net.intent.constraint.WaypointConstraint;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.DefaultPacketRequest;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.resource.Bandwidth;
import org.onosproject.net.resource.BandwidthResourceAllocation;
......@@ -225,6 +227,8 @@ public final class KryoNamespaces {
FlowRule.Type.class,
DefaultFlowRule.class,
DefaultFlowEntry.class,
DefaultPacketRequest.class,
PacketPriority.class,
FlowEntry.FlowEntryState.class,
FlowId.class,
DefaultTrafficSelector.class,
......
......@@ -15,15 +15,21 @@
*/
package org.onosproject.store.trivial.impl;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import java.util.Collections;
import java.util.Set;
/**
* Simple single instance implementation of the packet store.
*/
......@@ -33,9 +39,21 @@ public class SimplePacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
@Override
public void emit(OutboundPacket packet) {
notifyDelegate(new PacketEvent(Type.EMIT, packet));
}
@Override
public boolean requestPackets(PacketRequest request) {
return requests.add(request);
}
@Override
public Set<PacketRequest> existingRequests() {
return Collections.unmodifiableSet(requests);
}
}
......