Carmelo Cascone
Committed by Gerrit Code Review

BMv2 performance improvements

- Implemented a non-blocking Thrift server for the controller (before it
	was limiting the number of active connections)
- Improved configuration swap times by forcing it
- Minor bugfixes and polishing
- Update onos-bmv2 repo URL in thrift-api pom.xml

Change-Id: I13b61f5aa22558c395768e3b445f302b20c5bd33
......@@ -53,9 +53,14 @@ public interface Bmv2DeviceContextService {
void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
/**
* Returns a default context.
* Returns the default context.
*
* @return a BMv2 device context
*/
Bmv2DeviceContext defaultContext();
/**
* Sets the default context for the given device.
*/
void setDefaultContext(DeviceId deviceId);
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bmv2.ctl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
*/
final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
private static final int MAX_WORKER_THREADS = 20;
private static final int MAX_SELECTOR_THREADS = 4;
private static final int ACCEPT_QUEUE_LEN = 8;
private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
private AcceptThread acceptThread;
private final Logger log = LoggerFactory.getLogger(this.getClass());
/**
* Creates a new server.
*
* @param port a listening port
* @param processor a processor
* @param executorService an executor service
* @throws TTransportException
*/
public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
throws TTransportException {
super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
.workerThreads(MAX_WORKER_THREADS)
.selectorThreads(MAX_SELECTOR_THREADS)
.acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
.executorService(executorService)
.processor(processor));
}
/**
* Returns the IP address of the client associated with the given input framed transport.
*
* @param inputTransport a framed transport instance
* @return the IP address of the client or null
*/
InetAddress getClientAddress(TFramedTransport inputTransport) {
return clientAddresses.get(inputTransport);
}
@Override
protected boolean startThreads() {
try {
for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
}
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
selectorThreads.forEach(Thread::start);
acceptThread.start();
return true;
} catch (IOException e) {
log.error("Failed to start threads!", e);
return false;
}
}
@Override
protected void joinThreads() throws InterruptedException {
// Wait until the io threads exit.
acceptThread.join();
for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
thread.join();
}
}
@Override
public void stop() {
stopped_ = true;
// Stop queuing connect attempts asap.
stopListening();
if (acceptThread != null) {
acceptThread.wakeupSelector();
}
if (selectorThreads != null) {
selectorThreads.stream()
.filter(thread -> thread != null)
.forEach(TrackingSelectorThread::wakeupSelector);
}
}
private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
TrackingSelectorThread(int maxPendingAccepts) throws IOException {
super(maxPendingAccepts);
}
@Override
protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) {
TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
if (trans instanceof TNonblockingSocket) {
try {
SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
} catch (IOException e) {
log.warn("Exception while tracking client address", e);
clientAddresses.remove(frameBuffer.getInputFramedTransport());
}
} else {
log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
clientAddresses.remove(frameBuffer.getInputFramedTransport());
}
return frameBuffer;
}
}
private class TrackingFrameBuffer extends FrameBuffer {
TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) {
super(trans, selectionKey, selectThread);
}
TTransport getInputFramedTransport() {
return this.inTrans_;
}
}
}
......@@ -34,19 +34,18 @@ import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.onlab.util.ImmutableByteSequence;
import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
import org.onosproject.bmv2.api.service.Bmv2PacketListener;
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.thriftapi.BmConfig;
import org.onosproject.bmv2.thriftapi.ControlPlaneService;
import org.onosproject.bmv2.thriftapi.SimpleSwitch;
import org.onosproject.bmv2.thriftapi.Standard;
......@@ -55,6 +54,7 @@ import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
......@@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
/**
* Default implementation of a BMv2 controller.
......@@ -93,10 +94,10 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
.removalListener(new ClientRemovalListener())
.build(new ClientLoader());
private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
private final TProcessor trackingProcessor = new TrackingProcessor();
private final ExecutorService executorService = Executors
.newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
.newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
......@@ -104,7 +105,7 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private TThreadPoolServer thriftServer;
private Bmv2ControlPlaneThriftServer server;
// TODO: make it configurable trough component config
private int serverPort = DEFAULT_PORT;
......@@ -124,12 +125,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
private void startServer(int port) {
try {
TServerTransport transport = new TServerSocket(port);
log.info("Starting server on port {}...", port);
this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
.processor(trackingProcessor)
.executorService(executorService));
executorService.execute(thriftServer::serve);
this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
executorService.execute(server::serve);
} catch (TTransportException e) {
log.error("Unable to start server", e);
}
......@@ -137,8 +135,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
private void stopServer() {
// Stop the server if running...
if (thriftServer != null && !thriftServer.isServing()) {
thriftServer.stop();
if (server != null && !server.isServing()) {
server.setShouldStop(true);
server.stop();
}
try {
executorService.shutdown();
......@@ -162,8 +161,11 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
@Override
public boolean isReacheable(DeviceId deviceId) {
try {
return getAgent(deviceId).ping();
} catch (Bmv2RuntimeException e) {
Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
BmConfig config = client.standardClient.bm_mgmt_get_info();
// The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
} catch (Bmv2RuntimeException | TException e) {
return false;
}
}
......@@ -213,15 +215,15 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
}
/**
* Handles Thrift calls from BMv2 devices using registered listeners.
* Handles requests from BMv2 devices using the registered listeners.
*/
private final class InternalServiceHandler implements ControlPlaneService.Iface {
private final class ServiceHandler implements ControlPlaneService.Iface {
private final TSocket socket;
private final InetAddress clientAddress;
private Bmv2Device remoteDevice;
private InternalServiceHandler(TSocket socket) {
this.socket = socket;
ServiceHandler(InetAddress clientAddress) {
this.clientAddress = clientAddress;
}
@Override
......@@ -231,20 +233,19 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
@Override
public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
// Locally note the remote device for future uses.
String host = socket.getSocket().getInetAddress().getHostAddress();
// Store a reference to the remote device for future uses.
String host = clientAddress.getHostAddress();
remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
if (deviceListeners.size() == 0) {
log.debug("Received hello, but there's no listener registered.");
} else {
deviceListeners.forEach(
l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
}
}
@Override
public void packet_in(int port, ByteBuffer packet) {
public void packet_in(int port, ByteBuffer data, int dataLength) {
if (remoteDevice == null) {
log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
return;
......@@ -253,41 +254,42 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
if (packetListeners.size() == 0) {
log.debug("Received packet-in, but there's no listener registered.");
} else {
packetListeners.forEach(
l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
port,
ImmutableByteSequence.copyFrom(packet))));
byte[] bytes = new byte[dataLength];
data.get(bytes);
ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
}
}
}
/**
* Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
* Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
* Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
* request.
*/
private final class InternalTrackingProcessor implements TProcessor {
private final class TrackingProcessor implements TProcessor {
// Map sockets to processors.
// TODO: implement it as a cache so unused sockets are expired automatically
private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
Maps.newConcurrentMap();
// Map transports to processors.
private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
@Override
public boolean process(final TProtocol in, final TProtocol out) throws TException {
// Get the socket for this request.
TSocket socket = (TSocket) in.getTransport();
// Get or create a processor for this socket
ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
InternalServiceHandler handler = new InternalServiceHandler(s);
return new ControlPlaneService.Processor<>(handler);
});
// Delegate to the processor we are decorating.
return processor.process(in, out);
// Get the client address for this request.
InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
if (clientAddress != null) {
// Get or create a processor for this input transport, i.e. the client on the other side.
Processor<ServiceHandler> processor = processors.computeIfAbsent(
in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
// Delegate to the processor we are decorating.
return processor.process(in, out);
} else {
log.warn("Unable to retrieve client IP address of incoming request");
return false;
}
}
}
/**
* Transport/client cache loader.
* Cache loader of BMv2 Thrift clients.
*/
private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
......
......@@ -171,6 +171,17 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
return defaultContext;
}
@Override
public void setDefaultContext(DeviceId deviceId) {
Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
if (mastershipService.getMasterFor(deviceId) == null) {
// Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
// prior to master election. A solution could be to use a separate leadership contest instead of the
// mastership service.
triggerConfigCheck(deviceId, defaultContext);
}
}
private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
if (storedContext == null) {
return;
......@@ -206,14 +217,14 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
}
private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
if (mastershipService.isLocalMaster(deviceId)) {
scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
}
}
private void checkDevices() {
deviceService.getAvailableDevices().forEach(device -> {
triggerConfigCheck(device.id(), getContext(device.id()));
if (mastershipService.isLocalMaster(device.id())) {
triggerConfigCheck(device.id(), getContext(device.id()));
}
});
}
......@@ -235,8 +246,10 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
DeviceId deviceId = event.key();
if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
log.trace("Context {} for {}", event.type().name(), deviceId);
triggerConfigCheck(deviceId, event.newValue().value());
if (mastershipService.isLocalMaster(deviceId)) {
log.trace("Context {} for {}", event.type().name(), deviceId);
triggerConfigCheck(deviceId, event.newValue().value());
}
}
}
}
......
......@@ -68,7 +68,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
// See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
private static final int CONTEXT_ID = 0;
private final Standard.Iface standardClient;
protected final Standard.Iface standardClient;
private final SimpleSwitch.Iface simpleSwitchClient;
private final TTransport transport;
private final DeviceId deviceId;
......@@ -418,6 +418,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
try {
standardClient.bm_swap_configs();
simpleSwitchClient.force_swap();
log.debug("JSON config swapped! > deviceId={}", deviceId);
} catch (TException e) {
log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
......
......@@ -32,9 +32,9 @@
<properties>
<!-- BMv2 Commit ID and Thrift version -->
<bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit>
<bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
<bmv2.thrift.version>0.9.3</bmv2.thrift.version>
<bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl>
<bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
<bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
<bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
<thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>
......
......@@ -20,10 +20,7 @@ import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.api.service.Bmv2Controller;
......@@ -34,6 +31,7 @@ import org.onosproject.common.net.AbstractDeviceProvider;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.config.basics.ConfigException;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
......@@ -55,14 +53,20 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.bmv2.api.runtime.Bmv2Device.*;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics;
import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters;
......@@ -76,20 +80,22 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
private static final String APP_NAME = "org.onosproject.bmv2";
private static final int POLL_INTERVAL = 5_000; // milliseconds
private static final int POLL_PERIOD = 5_000; // milliseconds
private final Logger log = getLogger(this.getClass());
private final ExecutorService executorService = Executors
.newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log));
private final ScheduledExecutorService scheduledExecutorService = SharedScheduledExecutors.getPoolThreadExecutor();
private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private final ConfigFactory cfgFactory = new InternalConfigFactory();
private final ConcurrentMap<DeviceId, DeviceDescription> activeDevices = Maps.newConcurrentMap();
private final Map<DeviceId, DeviceDescription> lastDescriptions = Maps.newHashMap();
private final DevicePoller devicePoller = new DevicePoller();
private final ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
private final InternalDeviceListener deviceListener = new InternalDeviceListener();
......@@ -103,6 +109,9 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected Bmv2Controller controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -112,6 +121,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
protected Bmv2TableEntryService tableEntryService;
private ApplicationId appId;
private ScheduledFuture<?> poller;
/**
* Creates a Bmv2 device provider with the supplied identifier.
......@@ -126,19 +136,24 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
netCfgService.registerConfigFactory(cfgFactory);
netCfgService.addListener(cfgListener);
controller.addDeviceListener(deviceListener);
devicePoller.start();
if (poller != null) {
poller.cancel(false);
}
poller = scheduledExecutorService.scheduleAtFixedRate(this::pollDevices, 1_000, POLL_PERIOD, MILLISECONDS);
super.activate();
}
@Override
protected void deactivate() {
devicePoller.stop();
if (poller != null) {
poller.cancel(false);
}
controller.removeDeviceListener(deviceListener);
try {
activeDevices.forEach((did, value) -> {
lastDescriptions.forEach((did, value) -> {
executorService.execute(() -> disconnectDevice(did));
});
executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
executorService.awaitTermination(1000, MILLISECONDS);
} catch (InterruptedException e) {
log.error("Device discovery threads did not terminate");
}
......@@ -181,32 +196,43 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
}
private void discoverDevice(DeviceId did) {
log.debug("Starting device discovery... deviceId={}", did);
activeDevices.compute(did, (k, lastDescription) -> {
// Serialize discovery for the same device.
Lock lock = deviceLocks.computeIfAbsent(did, k -> new ReentrantLock());
lock.lock();
try {
log.debug("Starting device discovery... deviceId={}", did);
if (contextService.getContext(did) == null) {
// Device is a first timer.
log.info("Setting DEFAULT context for {}", did);
// It is important to do this before creating the device in the core
// so other services won't find a null context.
contextService.setDefaultContext(did);
// Abort discovery, we'll receive a new hello once the swap has been performed.
return;
}
DeviceDescription lastDescription = lastDescriptions.get(did);
DeviceDescription thisDescription = getDeviceDescription(did);
if (thisDescription != null) {
boolean descriptionChanged = lastDescription != null &&
(!Objects.equals(thisDescription, lastDescription) ||
!Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
boolean descriptionChanged = lastDescription == null ||
(!Objects.equals(thisDescription, lastDescription) ||
!Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
if (descriptionChanged || !deviceService.isAvailable(did)) {
if (deviceService.getDevice(did) == null) {
// Device is a first timer.
log.info("Setting DEFAULT context for {}", did);
// It is important to do this before connecting the device so other
// services won't find a null context.
contextService.setContext(did, contextService.defaultContext());
}
resetDeviceState(did);
initPortCounters(did);
providerService.deviceConnected(did, thisDescription);
updatePortsAndStats(did);
}
return thisDescription;
lastDescriptions.put(did, thisDescription);
} else {
log.warn("Unable to get device description for {}", did);
return lastDescription;
lastDescriptions.put(did, lastDescription);
}
});
} finally {
lock.unlock();
}
}
private DeviceDescription getDeviceDescription(DeviceId did) {
......@@ -269,11 +295,27 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
}
private void disconnectDevice(DeviceId did) {
log.debug("Trying to disconnect device from core... deviceId={}", did);
if (deviceService.isAvailable(did)) {
providerService.deviceDisconnected(did);
log.debug("Disconnecting device from core... deviceId={}", did);
providerService.deviceDisconnected(did);
lastDescriptions.remove(did);
}
private void pollDevices() {
for (Device device: deviceService.getAvailableDevices(SWITCH)) {
if (device.id().uri().getScheme().equals(SCHEME) &&
mastershipService.isLocalMaster(device.id())) {
executorService.execute(() -> pollingTask(device.id()));
}
}
}
private void pollingTask(DeviceId deviceId) {
log.debug("Polling device {}...", deviceId);
if (isReachable(deviceId)) {
updatePortsAndStats(deviceId);
} else {
disconnectDevice(deviceId);
}
activeDevices.remove(did);
}
/**
......@@ -332,50 +374,4 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
triggerProbe(device.asDeviceId());
}
}
/**
* Task that periodically trigger device probes to check for device status and update port information.
*/
private class DevicePoller implements TimerTask {
private final HashedWheelTimer timer = Timer.getTimer();
private Timeout timeout;
@Override
public void run(Timeout tout) throws Exception {
if (tout.isCancelled()) {
return;
}
activeDevices.keySet()
.stream()
// Filter out devices not yet created in the core.
.filter(did -> deviceService.getDevice(did) != null)
.forEach(did -> executorService.execute(() -> pollingTask(did)));
tout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.MILLISECONDS);
}
private void pollingTask(DeviceId deviceId) {
if (isReachable(deviceId)) {
updatePortsAndStats(deviceId);
} else {
disconnectDevice(deviceId);
}
}
/**
* Starts the collector.
*/
synchronized void start() {
log.info("Starting device poller...");
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**
* Stops the collector.
*/
synchronized void stop() {
log.info("Stopping device poller...");
timeout.cancel();
}
}
}
......