Jonathan Hart

Implemented multi-instance packet out.

We've defined a PacketStore abstraction through which outbound packets are
sent. The packet store has a simple implementation (basically a no-op) and a
distributed implementation on top of the cluster messaging service.

Change-Id: Ib32753314fe518ef1fd67c858db744b004539938
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.packet;
import org.onlab.onos.event.AbstractEvent;
/**
* Describes a packet event.
*/
public class PacketEvent extends AbstractEvent<PacketEvent.Type, OutboundPacket> {
/**
* Type of packet events.
*/
public enum Type {
/**
* Signifies that the packet should be emitted out a local port.
*/
EMIT
}
/**
* Creates an event of the given type for the specified packet.
*
* @param type the type of the event
* @param packet the packet the event is about
*/
public PacketEvent(Type type, OutboundPacket packet) {
super(type, packet);
}
/**
* Creates an event of the given type for the specified packet at the given
* time.
*
* @param type the type of the event
* @param packet the packet the event is about
* @param time the time of the event
*/
public PacketEvent(Type type, OutboundPacket packet, long time) {
super(type, packet, time);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.packet;
import org.onlab.onos.store.Store;
/**
* Manages routing of outbound packets.
*/
public interface PacketStore extends Store<PacketEvent, PacketStoreDelegate> {
/**
* Decides which instance should emit the packet and forwards the packet to
* that instance. The relevant PacketManager is notified via the
* PacketStoreDelegate that it should emit the packet.
*
* @param packet the packet to emit
*/
void emit(OutboundPacket packet);
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.packet;
import org.onlab.onos.store.StoreDelegate;
/**
* Packet store delegate abstraction.
*/
public interface PacketStoreDelegate extends StoreDelegate<PacketEvent> {
}
......@@ -19,7 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -31,11 +31,14 @@ import org.onlab.onos.net.Device;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketEvent;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketProvider;
import org.onlab.onos.net.packet.PacketProviderRegistry;
import org.onlab.onos.net.packet.PacketProviderService;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.packet.PacketStore;
import org.onlab.onos.net.packet.PacketStoreDelegate;
import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
......@@ -51,18 +54,25 @@ implements PacketService, PacketProviderRegistry {
private final Logger log = getLogger(getClass());
private final PacketStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceService deviceService;
private final Map<Integer, PacketProcessor> processors = new TreeMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private PacketStore store;
private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
@Activate
public void activate() {
store.setDelegate(delegate);
log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(delegate);
log.info("Stopped");
}
......@@ -81,6 +91,11 @@ implements PacketService, PacketProviderRegistry {
@Override
public void emit(OutboundPacket packet) {
checkNotNull(packet, "Packet cannot be null");
store.emit(packet);
}
private void localEmit(OutboundPacket packet) {
final Device device = deviceService.getDevice(packet.sendThrough());
final PacketProvider packetProvider = getProvider(device.providerId());
if (packetProvider != null) {
......@@ -110,4 +125,16 @@ implements PacketService, PacketProviderRegistry {
}
}
/**
* Internal callback from the packet store.
*/
private class InternalStoreDelegate
implements PacketStoreDelegate {
@Override
public void notify(PacketEvent event) {
localEmit(event.subject());
}
}
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.packet.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketEvent;
import org.onlab.onos.net.packet.PacketEvent.Type;
import org.onlab.onos.net.packet.PacketStore;
import org.onlab.onos.net.packet.PacketStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
/**
* Distributed packet store implementation allowing packets to be sent to
* remote instances.
*/
@Component(immediate = true)
@Service
public class DistributedPacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService communicationService;
private static final MessageSubject PACKET_OUT_SUBJECT =
new MessageSubject("packet-out");
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
}
};
@Activate
public void activate() {
log.info("Started");
communicationService.addSubscriber(
PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void emit(OutboundPacket packet) {
NodeId myId = clusterService.getLocalNode().id();
NodeId master = mastershipService.getMasterFor(packet.sendThrough());
if (myId.equals(master)) {
notifyDelegate(new PacketEvent(Type.EMIT, packet));
return;
}
try {
communicationService.unicast(new ClusterMessage(
myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
} catch (IOException e) {
log.warn("Failed to send packet-out to {}", master);
}
}
/**
* Handles incoming cluster messages.
*/
private class InternalClusterMessageHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
log.warn("Received message with wrong subject: {}", message);
}
OutboundPacket packet = SERIALIZER.decode(message.payload());
notifyDelegate(new PacketEvent(Type.EMIT, packet));
}
}
}
/**
* Implementation of distributed packet store.
*/
package org.onlab.onos.store.packet.impl;
\ No newline at end of file
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.serializers;
import java.nio.ByteBuffer;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Serializer for a default outbound packet.
*/
public class DefaultOutboundPacketSerializer extends Serializer<DefaultOutboundPacket> {
/**
* Creates {@link DefaultOutboundPacket} serializer instance.
*/
public DefaultOutboundPacketSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public DefaultOutboundPacket read(Kryo kryo, Input input,
Class<DefaultOutboundPacket> type) {
DeviceId sendThrough = (DeviceId) kryo.readClassAndObject(input);
TrafficTreatment treatment = (TrafficTreatment) kryo.readClassAndObject(input);
byte[] data = (byte[]) kryo.readClassAndObject(input);
return new DefaultOutboundPacket(sendThrough, treatment, ByteBuffer.wrap(data));
}
@Override
public void write(Kryo kryo, Output output, DefaultOutboundPacket object) {
kryo.writeClassAndObject(output, object.sendThrough());
kryo.writeClassAndObject(output, object.treatment());
kryo.writeClassAndObject(output, object.data().array());
}
}
......@@ -74,6 +74,7 @@ import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import org.onlab.onos.net.intent.PathIntent;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.Timestamp;
import org.onlab.packet.ChassisId;
......@@ -115,6 +116,7 @@ public final class KryoNamespaces {
HashMap.class,
HashSet.class,
LinkedList.class,
byte[].class,
//
//
ControllerNode.State.class,
......@@ -194,6 +196,7 @@ public final class KryoNamespaces {
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.register(HostLocation.class, new HostLocationSerializer())
.register(DefaultOutboundPacket.class, new DefaultOutboundPacketSerializer())
.build();
......
......@@ -15,9 +15,10 @@
*/
package org.onlab.onos.store.serializers;
import org.onlab.util.KryoNamespace;
import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
/**
* StoreSerializer implementation using Kryo.
*/
......@@ -30,7 +31,7 @@ public class KryoSerializer implements StoreSerializer {
}
/**
* Sets up the common serialzers pool.
* Sets up the common serializers pool.
*/
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketEvent;
import org.onlab.onos.net.packet.PacketEvent.Type;
import org.onlab.onos.net.packet.PacketStore;
import org.onlab.onos.net.packet.PacketStoreDelegate;
import org.onlab.onos.store.AbstractStore;
/**
* Simple single instance implementation of the packet store.
*/
@Component(immediate = true)
@Service
public class SimplePacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
@Override
public void emit(OutboundPacket packet) {
notifyDelegate(new PacketEvent(Type.EMIT, packet));
}
}
......@@ -115,10 +115,10 @@ public final class KryoNamespace implements KryoFactory {
/**
* Creates a Kryo instance pool.
*
* @param registerdTypes types to register
* @param registeredTypes types to register
*/
private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registerdTypes) {
this.registeredTypes = ImmutableList.copyOf(registerdTypes);
private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registeredTypes) {
this.registeredTypes = ImmutableList.copyOf(registeredTypes);
// always true for now
this.registrationRequired = true;
}
......