Carmelo Cascone
Committed by Brian O'Connor

BMv2 performance improvements

- Implemented a non-blocking Thrift server for the controller (before it
	was limiting the number of active connections)
- Improved configuration swap times by forcing it
- Minor bugfixes and polishing
- Update onos-bmv2 repo URL in thrift-api pom.xml

Change-Id: I13b61f5aa22558c395768e3b445f302b20c5bd33
...@@ -53,9 +53,14 @@ public interface Bmv2DeviceContextService { ...@@ -53,9 +53,14 @@ public interface Bmv2DeviceContextService {
53 void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader); 53 void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
54 54
55 /** 55 /**
56 - * Returns a default context. 56 + * Returns the default context.
57 * 57 *
58 * @return a BMv2 device context 58 * @return a BMv2 device context
59 */ 59 */
60 Bmv2DeviceContext defaultContext(); 60 Bmv2DeviceContext defaultContext();
61 +
62 + /**
63 + * Sets the default context for the given device.
64 + */
65 + void setDefaultContext(DeviceId deviceId);
61 } 66 }
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.bmv2.ctl;
18 +
19 +import com.google.common.collect.Maps;
20 +import com.google.common.collect.Sets;
21 +import org.apache.thrift.TProcessor;
22 +import org.apache.thrift.server.TThreadedSelectorServer;
23 +import org.apache.thrift.transport.TFramedTransport;
24 +import org.apache.thrift.transport.TNonblockingServerSocket;
25 +import org.apache.thrift.transport.TNonblockingServerTransport;
26 +import org.apache.thrift.transport.TNonblockingSocket;
27 +import org.apache.thrift.transport.TNonblockingTransport;
28 +import org.apache.thrift.transport.TTransport;
29 +import org.apache.thrift.transport.TTransportException;
30 +import org.slf4j.Logger;
31 +import org.slf4j.LoggerFactory;
32 +
33 +import java.io.IOException;
34 +import java.net.InetAddress;
35 +import java.net.InetSocketAddress;
36 +import java.nio.channels.SelectionKey;
37 +import java.nio.channels.SocketChannel;
38 +import java.util.Map;
39 +import java.util.Set;
40 +import java.util.concurrent.ExecutorService;
41 +
42 +/**
43 + * A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
44 + */
45 +final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
46 +
47 + private static final int MAX_WORKER_THREADS = 20;
48 + private static final int MAX_SELECTOR_THREADS = 4;
49 + private static final int ACCEPT_QUEUE_LEN = 8;
50 +
51 + private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
52 + private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
53 +
54 + private AcceptThread acceptThread;
55 +
56 + private final Logger log = LoggerFactory.getLogger(this.getClass());
57 +
58 + /**
59 + * Creates a new server.
60 + *
61 + * @param port a listening port
62 + * @param processor a processor
63 + * @param executorService an executor service
64 + * @throws TTransportException
65 + */
66 + public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
67 + throws TTransportException {
68 + super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
69 + .workerThreads(MAX_WORKER_THREADS)
70 + .selectorThreads(MAX_SELECTOR_THREADS)
71 + .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
72 + .executorService(executorService)
73 + .processor(processor));
74 + }
75 +
76 + /**
77 + * Returns the IP address of the client associated with the given input framed transport.
78 + *
79 + * @param inputTransport a framed transport instance
80 + * @return the IP address of the client or null
81 + */
82 + InetAddress getClientAddress(TFramedTransport inputTransport) {
83 + return clientAddresses.get(inputTransport);
84 + }
85 +
86 + @Override
87 + protected boolean startThreads() {
88 + try {
89 + for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
90 + selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
91 + }
92 + acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
93 + createSelectorThreadLoadBalancer(selectorThreads));
94 + selectorThreads.forEach(Thread::start);
95 + acceptThread.start();
96 + return true;
97 + } catch (IOException e) {
98 + log.error("Failed to start threads!", e);
99 + return false;
100 + }
101 + }
102 +
103 + @Override
104 + protected void joinThreads() throws InterruptedException {
105 + // Wait until the io threads exit.
106 + acceptThread.join();
107 + for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
108 + thread.join();
109 + }
110 + }
111 +
112 + @Override
113 + public void stop() {
114 + stopped_ = true;
115 + // Stop queuing connect attempts asap.
116 + stopListening();
117 + if (acceptThread != null) {
118 + acceptThread.wakeupSelector();
119 + }
120 + if (selectorThreads != null) {
121 + selectorThreads.stream()
122 + .filter(thread -> thread != null)
123 + .forEach(TrackingSelectorThread::wakeupSelector);
124 + }
125 + }
126 +
127 + private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
128 +
129 + TrackingSelectorThread(int maxPendingAccepts) throws IOException {
130 + super(maxPendingAccepts);
131 + }
132 +
133 + @Override
134 + protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
135 + AbstractSelectThread selectThread) {
136 + TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
137 + if (trans instanceof TNonblockingSocket) {
138 + try {
139 + SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
140 + InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
141 + clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
142 + } catch (IOException e) {
143 + log.warn("Exception while tracking client address", e);
144 + clientAddresses.remove(frameBuffer.getInputFramedTransport());
145 + }
146 + } else {
147 + log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
148 + clientAddresses.remove(frameBuffer.getInputFramedTransport());
149 + }
150 + return frameBuffer;
151 + }
152 + }
153 +
154 + private class TrackingFrameBuffer extends FrameBuffer {
155 +
156 + TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
157 + AbstractSelectThread selectThread) {
158 + super(trans, selectionKey, selectThread);
159 + }
160 +
161 + TTransport getInputFramedTransport() {
162 + return this.inTrans_;
163 + }
164 + }
165 +}
...@@ -34,19 +34,18 @@ import org.apache.thrift.TProcessor; ...@@ -34,19 +34,18 @@ import org.apache.thrift.TProcessor;
34 import org.apache.thrift.protocol.TBinaryProtocol; 34 import org.apache.thrift.protocol.TBinaryProtocol;
35 import org.apache.thrift.protocol.TMultiplexedProtocol; 35 import org.apache.thrift.protocol.TMultiplexedProtocol;
36 import org.apache.thrift.protocol.TProtocol; 36 import org.apache.thrift.protocol.TProtocol;
37 -import org.apache.thrift.server.TThreadPoolServer; 37 +import org.apache.thrift.transport.TFramedTransport;
38 -import org.apache.thrift.transport.TServerSocket;
39 -import org.apache.thrift.transport.TServerTransport;
40 import org.apache.thrift.transport.TSocket; 38 import org.apache.thrift.transport.TSocket;
41 import org.apache.thrift.transport.TTransport; 39 import org.apache.thrift.transport.TTransport;
42 import org.apache.thrift.transport.TTransportException; 40 import org.apache.thrift.transport.TTransportException;
43 import org.onlab.util.ImmutableByteSequence; 41 import org.onlab.util.ImmutableByteSequence;
44 -import org.onosproject.bmv2.api.service.Bmv2Controller;
45 import org.onosproject.bmv2.api.runtime.Bmv2Device; 42 import org.onosproject.bmv2.api.runtime.Bmv2Device;
46 import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent; 43 import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
44 +import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
45 +import org.onosproject.bmv2.api.service.Bmv2Controller;
47 import org.onosproject.bmv2.api.service.Bmv2DeviceListener; 46 import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
48 import org.onosproject.bmv2.api.service.Bmv2PacketListener; 47 import org.onosproject.bmv2.api.service.Bmv2PacketListener;
49 -import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException; 48 +import org.onosproject.bmv2.thriftapi.BmConfig;
50 import org.onosproject.bmv2.thriftapi.ControlPlaneService; 49 import org.onosproject.bmv2.thriftapi.ControlPlaneService;
51 import org.onosproject.bmv2.thriftapi.SimpleSwitch; 50 import org.onosproject.bmv2.thriftapi.SimpleSwitch;
52 import org.onosproject.bmv2.thriftapi.Standard; 51 import org.onosproject.bmv2.thriftapi.Standard;
...@@ -55,6 +54,7 @@ import org.onosproject.net.DeviceId; ...@@ -55,6 +54,7 @@ import org.onosproject.net.DeviceId;
55 import org.slf4j.Logger; 54 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory; 55 import org.slf4j.LoggerFactory;
57 56
57 +import java.net.InetAddress;
58 import java.nio.ByteBuffer; 58 import java.nio.ByteBuffer;
59 import java.util.List; 59 import java.util.List;
60 import java.util.Set; 60 import java.util.Set;
...@@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit; ...@@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit;
67 67
68 import static com.google.common.base.Preconditions.checkNotNull; 68 import static com.google.common.base.Preconditions.checkNotNull;
69 import static org.onlab.util.Tools.groupedThreads; 69 import static org.onlab.util.Tools.groupedThreads;
70 +import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
70 71
71 /** 72 /**
72 * Default implementation of a BMv2 controller. 73 * Default implementation of a BMv2 controller.
...@@ -93,10 +94,10 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -93,10 +94,10 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
93 .removalListener(new ClientRemovalListener()) 94 .removalListener(new ClientRemovalListener())
94 .build(new ClientLoader()); 95 .build(new ClientLoader());
95 96
96 - private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor(); 97 + private final TProcessor trackingProcessor = new TrackingProcessor();
97 98
98 private final ExecutorService executorService = Executors 99 private final ExecutorService executorService = Executors
99 - .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log)); 100 + .newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
100 101
101 private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>(); 102 private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
102 private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>(); 103 private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
...@@ -104,7 +105,7 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -104,7 +105,7 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected CoreService coreService; 106 protected CoreService coreService;
106 107
107 - private TThreadPoolServer thriftServer; 108 + private Bmv2ControlPlaneThriftServer server;
108 109
109 // TODO: make it configurable trough component config 110 // TODO: make it configurable trough component config
110 private int serverPort = DEFAULT_PORT; 111 private int serverPort = DEFAULT_PORT;
...@@ -124,12 +125,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -124,12 +125,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
124 125
125 private void startServer(int port) { 126 private void startServer(int port) {
126 try { 127 try {
127 - TServerTransport transport = new TServerSocket(port);
128 log.info("Starting server on port {}...", port); 128 log.info("Starting server on port {}...", port);
129 - this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport) 129 + this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
130 - .processor(trackingProcessor) 130 + executorService.execute(server::serve);
131 - .executorService(executorService));
132 - executorService.execute(thriftServer::serve);
133 } catch (TTransportException e) { 131 } catch (TTransportException e) {
134 log.error("Unable to start server", e); 132 log.error("Unable to start server", e);
135 } 133 }
...@@ -137,8 +135,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -137,8 +135,9 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
137 135
138 private void stopServer() { 136 private void stopServer() {
139 // Stop the server if running... 137 // Stop the server if running...
140 - if (thriftServer != null && !thriftServer.isServing()) { 138 + if (server != null && !server.isServing()) {
141 - thriftServer.stop(); 139 + server.setShouldStop(true);
140 + server.stop();
142 } 141 }
143 try { 142 try {
144 executorService.shutdown(); 143 executorService.shutdown();
...@@ -162,8 +161,11 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -162,8 +161,11 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
162 @Override 161 @Override
163 public boolean isReacheable(DeviceId deviceId) { 162 public boolean isReacheable(DeviceId deviceId) {
164 try { 163 try {
165 - return getAgent(deviceId).ping(); 164 + Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
166 - } catch (Bmv2RuntimeException e) { 165 + BmConfig config = client.standardClient.bm_mgmt_get_info();
166 + // The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
167 + return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
168 + } catch (Bmv2RuntimeException | TException e) {
167 return false; 169 return false;
168 } 170 }
169 } 171 }
...@@ -213,15 +215,15 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -213,15 +215,15 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
213 } 215 }
214 216
215 /** 217 /**
216 - * Handles Thrift calls from BMv2 devices using registered listeners. 218 + * Handles requests from BMv2 devices using the registered listeners.
217 */ 219 */
218 - private final class InternalServiceHandler implements ControlPlaneService.Iface { 220 + private final class ServiceHandler implements ControlPlaneService.Iface {
219 221
220 - private final TSocket socket; 222 + private final InetAddress clientAddress;
221 private Bmv2Device remoteDevice; 223 private Bmv2Device remoteDevice;
222 224
223 - private InternalServiceHandler(TSocket socket) { 225 + ServiceHandler(InetAddress clientAddress) {
224 - this.socket = socket; 226 + this.clientAddress = clientAddress;
225 } 227 }
226 228
227 @Override 229 @Override
...@@ -231,20 +233,19 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -231,20 +233,19 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
231 233
232 @Override 234 @Override
233 public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) { 235 public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
234 - // Locally note the remote device for future uses. 236 + // Store a reference to the remote device for future uses.
235 - String host = socket.getSocket().getInetAddress().getHostAddress(); 237 + String host = clientAddress.getHostAddress();
236 remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId); 238 remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
237 239
238 if (deviceListeners.size() == 0) { 240 if (deviceListeners.size() == 0) {
239 log.debug("Received hello, but there's no listener registered."); 241 log.debug("Received hello, but there's no listener registered.");
240 } else { 242 } else {
241 - deviceListeners.forEach( 243 + deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
242 - l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
243 } 244 }
244 } 245 }
245 246
246 @Override 247 @Override
247 - public void packet_in(int port, ByteBuffer packet) { 248 + public void packet_in(int port, ByteBuffer data, int dataLength) {
248 if (remoteDevice == null) { 249 if (remoteDevice == null) {
249 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first..."); 250 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
250 return; 251 return;
...@@ -253,41 +254,42 @@ public class Bmv2ControllerImpl implements Bmv2Controller { ...@@ -253,41 +254,42 @@ public class Bmv2ControllerImpl implements Bmv2Controller {
253 if (packetListeners.size() == 0) { 254 if (packetListeners.size() == 0) {
254 log.debug("Received packet-in, but there's no listener registered."); 255 log.debug("Received packet-in, but there's no listener registered.");
255 } else { 256 } else {
256 - packetListeners.forEach( 257 + byte[] bytes = new byte[dataLength];
257 - l -> executorService.execute(() -> l.handlePacketIn(remoteDevice, 258 + data.get(bytes);
258 - port, 259 + ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
259 - ImmutableByteSequence.copyFrom(packet)))); 260 + packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
260 } 261 }
261 } 262 }
262 } 263 }
263 264
264 /** 265 /**
265 - * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call. 266 + * Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
266 - * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello()) 267 + * request.
267 */ 268 */
268 - private final class InternalTrackingProcessor implements TProcessor { 269 + private final class TrackingProcessor implements TProcessor {
269 270
270 - // Map sockets to processors. 271 + // Map transports to processors.
271 - // TODO: implement it as a cache so unused sockets are expired automatically 272 + private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
272 - private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
273 - Maps.newConcurrentMap();
274 273
275 @Override 274 @Override
276 public boolean process(final TProtocol in, final TProtocol out) throws TException { 275 public boolean process(final TProtocol in, final TProtocol out) throws TException {
277 - // Get the socket for this request. 276 + // Get the client address for this request.
278 - TSocket socket = (TSocket) in.getTransport(); 277 + InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
279 - // Get or create a processor for this socket 278 + if (clientAddress != null) {
280 - ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> { 279 + // Get or create a processor for this input transport, i.e. the client on the other side.
281 - InternalServiceHandler handler = new InternalServiceHandler(s); 280 + Processor<ServiceHandler> processor = processors.computeIfAbsent(
282 - return new ControlPlaneService.Processor<>(handler); 281 + in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
283 - }); 282 + // Delegate to the processor we are decorating.
284 - // Delegate to the processor we are decorating. 283 + return processor.process(in, out);
285 - return processor.process(in, out); 284 + } else {
285 + log.warn("Unable to retrieve client IP address of incoming request");
286 + return false;
287 + }
286 } 288 }
287 } 289 }
288 290
289 /** 291 /**
290 - * Transport/client cache loader. 292 + * Cache loader of BMv2 Thrift clients.
291 */ 293 */
292 private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> { 294 private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
293 295
......
...@@ -171,6 +171,17 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService { ...@@ -171,6 +171,17 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
171 return defaultContext; 171 return defaultContext;
172 } 172 }
173 173
174 + @Override
175 + public void setDefaultContext(DeviceId deviceId) {
176 + Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
177 + if (mastershipService.getMasterFor(deviceId) == null) {
178 + // Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
179 + // prior to master election. A solution could be to use a separate leadership contest instead of the
180 + // mastership service.
181 + triggerConfigCheck(deviceId, defaultContext);
182 + }
183 + }
184 +
174 private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) { 185 private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
175 if (storedContext == null) { 186 if (storedContext == null) {
176 return; 187 return;
...@@ -206,14 +217,14 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService { ...@@ -206,14 +217,14 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
206 } 217 }
207 218
208 private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) { 219 private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
209 - if (mastershipService.isLocalMaster(deviceId)) {
210 scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS); 220 scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
211 - }
212 } 221 }
213 222
214 private void checkDevices() { 223 private void checkDevices() {
215 deviceService.getAvailableDevices().forEach(device -> { 224 deviceService.getAvailableDevices().forEach(device -> {
216 - triggerConfigCheck(device.id(), getContext(device.id())); 225 + if (mastershipService.isLocalMaster(device.id())) {
226 + triggerConfigCheck(device.id(), getContext(device.id()));
227 + }
217 }); 228 });
218 } 229 }
219 230
...@@ -235,8 +246,10 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService { ...@@ -235,8 +246,10 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
235 public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) { 246 public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
236 DeviceId deviceId = event.key(); 247 DeviceId deviceId = event.key();
237 if (event.type().equals(INSERT) || event.type().equals(UPDATE)) { 248 if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
238 - log.trace("Context {} for {}", event.type().name(), deviceId); 249 + if (mastershipService.isLocalMaster(deviceId)) {
239 - triggerConfigCheck(deviceId, event.newValue().value()); 250 + log.trace("Context {} for {}", event.type().name(), deviceId);
251 + triggerConfigCheck(deviceId, event.newValue().value());
252 + }
240 } 253 }
241 } 254 }
242 } 255 }
......
...@@ -68,7 +68,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent { ...@@ -68,7 +68,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
68 // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h 68 // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
69 private static final int CONTEXT_ID = 0; 69 private static final int CONTEXT_ID = 0;
70 70
71 - private final Standard.Iface standardClient; 71 + protected final Standard.Iface standardClient;
72 private final SimpleSwitch.Iface simpleSwitchClient; 72 private final SimpleSwitch.Iface simpleSwitchClient;
73 private final TTransport transport; 73 private final TTransport transport;
74 private final DeviceId deviceId; 74 private final DeviceId deviceId;
...@@ -418,6 +418,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent { ...@@ -418,6 +418,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
418 418
419 try { 419 try {
420 standardClient.bm_swap_configs(); 420 standardClient.bm_swap_configs();
421 + simpleSwitchClient.force_swap();
421 log.debug("JSON config swapped! > deviceId={}", deviceId); 422 log.debug("JSON config swapped! > deviceId={}", deviceId);
422 } catch (TException e) { 423 } catch (TException e) {
423 log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId); 424 log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
......
...@@ -32,9 +32,9 @@ ...@@ -32,9 +32,9 @@
32 32
33 <properties> 33 <properties>
34 <!-- BMv2 Commit ID and Thrift version --> 34 <!-- BMv2 Commit ID and Thrift version -->
35 - <bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit> 35 + <bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
36 <bmv2.thrift.version>0.9.3</bmv2.thrift.version> 36 <bmv2.thrift.version>0.9.3</bmv2.thrift.version>
37 - <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl> 37 + <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
38 <bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace> 38 <bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
39 <bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir> 39 <bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
40 <thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir> 40 <thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>
......
...@@ -20,10 +20,7 @@ import com.google.common.collect.Maps; ...@@ -20,10 +20,7 @@ import com.google.common.collect.Maps;
20 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 -import org.jboss.netty.util.HashedWheelTimer; 23 +import org.onlab.util.SharedScheduledExecutors;
24 -import org.jboss.netty.util.Timeout;
25 -import org.jboss.netty.util.TimerTask;
26 -import org.onlab.util.Timer;
27 import org.onosproject.bmv2.api.runtime.Bmv2Device; 24 import org.onosproject.bmv2.api.runtime.Bmv2Device;
28 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException; 25 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
29 import org.onosproject.bmv2.api.service.Bmv2Controller; 26 import org.onosproject.bmv2.api.service.Bmv2Controller;
...@@ -34,6 +31,7 @@ import org.onosproject.common.net.AbstractDeviceProvider; ...@@ -34,6 +31,7 @@ import org.onosproject.common.net.AbstractDeviceProvider;
34 import org.onosproject.core.ApplicationId; 31 import org.onosproject.core.ApplicationId;
35 import org.onosproject.core.CoreService; 32 import org.onosproject.core.CoreService;
36 import org.onosproject.incubator.net.config.basics.ConfigException; 33 import org.onosproject.incubator.net.config.basics.ConfigException;
34 +import org.onosproject.mastership.MastershipService;
37 import org.onosproject.net.Device; 35 import org.onosproject.net.Device;
38 import org.onosproject.net.DeviceId; 36 import org.onosproject.net.DeviceId;
39 import org.onosproject.net.MastershipRole; 37 import org.onosproject.net.MastershipRole;
...@@ -55,14 +53,20 @@ import org.slf4j.Logger; ...@@ -55,14 +53,20 @@ import org.slf4j.Logger;
55 53
56 import java.util.Collection; 54 import java.util.Collection;
57 import java.util.List; 55 import java.util.List;
56 +import java.util.Map;
58 import java.util.Objects; 57 import java.util.Objects;
59 import java.util.concurrent.ConcurrentMap; 58 import java.util.concurrent.ConcurrentMap;
60 import java.util.concurrent.ExecutorService; 59 import java.util.concurrent.ExecutorService;
61 import java.util.concurrent.Executors; 60 import java.util.concurrent.Executors;
62 -import java.util.concurrent.TimeUnit; 61 +import java.util.concurrent.ScheduledExecutorService;
62 +import java.util.concurrent.ScheduledFuture;
63 +import java.util.concurrent.locks.Lock;
64 +import java.util.concurrent.locks.ReentrantLock;
63 65
66 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
64 import static org.onlab.util.Tools.groupedThreads; 67 import static org.onlab.util.Tools.groupedThreads;
65 import static org.onosproject.bmv2.api.runtime.Bmv2Device.*; 68 import static org.onosproject.bmv2.api.runtime.Bmv2Device.*;
69 +import static org.onosproject.net.Device.Type.SWITCH;
66 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY; 70 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
67 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics; 71 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics;
68 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters; 72 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters;
...@@ -76,20 +80,22 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -76,20 +80,22 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
76 80
77 private static final String APP_NAME = "org.onosproject.bmv2"; 81 private static final String APP_NAME = "org.onosproject.bmv2";
78 82
79 - private static final int POLL_INTERVAL = 5_000; // milliseconds 83 + private static final int POLL_PERIOD = 5_000; // milliseconds
80 84
81 private final Logger log = getLogger(this.getClass()); 85 private final Logger log = getLogger(this.getClass());
82 86
83 private final ExecutorService executorService = Executors 87 private final ExecutorService executorService = Executors
84 .newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log)); 88 .newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log));
85 89
90 + private final ScheduledExecutorService scheduledExecutorService = SharedScheduledExecutors.getPoolThreadExecutor();
91 +
86 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener(); 92 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
87 93
88 private final ConfigFactory cfgFactory = new InternalConfigFactory(); 94 private final ConfigFactory cfgFactory = new InternalConfigFactory();
89 95
90 - private final ConcurrentMap<DeviceId, DeviceDescription> activeDevices = Maps.newConcurrentMap(); 96 + private final Map<DeviceId, DeviceDescription> lastDescriptions = Maps.newHashMap();
91 97
92 - private final DevicePoller devicePoller = new DevicePoller(); 98 + private final ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
93 99
94 private final InternalDeviceListener deviceListener = new InternalDeviceListener(); 100 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
95 101
...@@ -103,6 +109,9 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -103,6 +109,9 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
103 protected DeviceService deviceService; 109 protected DeviceService deviceService;
104 110
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 + protected MastershipService mastershipService;
113 +
114 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected Bmv2Controller controller; 115 protected Bmv2Controller controller;
107 116
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -112,6 +121,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -112,6 +121,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
112 protected Bmv2TableEntryService tableEntryService; 121 protected Bmv2TableEntryService tableEntryService;
113 122
114 private ApplicationId appId; 123 private ApplicationId appId;
124 + private ScheduledFuture<?> poller;
115 125
116 /** 126 /**
117 * Creates a Bmv2 device provider with the supplied identifier. 127 * Creates a Bmv2 device provider with the supplied identifier.
...@@ -126,19 +136,24 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -126,19 +136,24 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
126 netCfgService.registerConfigFactory(cfgFactory); 136 netCfgService.registerConfigFactory(cfgFactory);
127 netCfgService.addListener(cfgListener); 137 netCfgService.addListener(cfgListener);
128 controller.addDeviceListener(deviceListener); 138 controller.addDeviceListener(deviceListener);
129 - devicePoller.start(); 139 + if (poller != null) {
140 + poller.cancel(false);
141 + }
142 + poller = scheduledExecutorService.scheduleAtFixedRate(this::pollDevices, 1_000, POLL_PERIOD, MILLISECONDS);
130 super.activate(); 143 super.activate();
131 } 144 }
132 145
133 @Override 146 @Override
134 protected void deactivate() { 147 protected void deactivate() {
135 - devicePoller.stop(); 148 + if (poller != null) {
149 + poller.cancel(false);
150 + }
136 controller.removeDeviceListener(deviceListener); 151 controller.removeDeviceListener(deviceListener);
137 try { 152 try {
138 - activeDevices.forEach((did, value) -> { 153 + lastDescriptions.forEach((did, value) -> {
139 executorService.execute(() -> disconnectDevice(did)); 154 executorService.execute(() -> disconnectDevice(did));
140 }); 155 });
141 - executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); 156 + executorService.awaitTermination(1000, MILLISECONDS);
142 } catch (InterruptedException e) { 157 } catch (InterruptedException e) {
143 log.error("Device discovery threads did not terminate"); 158 log.error("Device discovery threads did not terminate");
144 } 159 }
...@@ -181,32 +196,43 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -181,32 +196,43 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
181 } 196 }
182 197
183 private void discoverDevice(DeviceId did) { 198 private void discoverDevice(DeviceId did) {
184 - log.debug("Starting device discovery... deviceId={}", did); 199 + // Serialize discovery for the same device.
185 - activeDevices.compute(did, (k, lastDescription) -> { 200 + Lock lock = deviceLocks.computeIfAbsent(did, k -> new ReentrantLock());
201 + lock.lock();
202 + try {
203 + log.debug("Starting device discovery... deviceId={}", did);
204 +
205 + if (contextService.getContext(did) == null) {
206 + // Device is a first timer.
207 + log.info("Setting DEFAULT context for {}", did);
208 + // It is important to do this before creating the device in the core
209 + // so other services won't find a null context.
210 + contextService.setDefaultContext(did);
211 + // Abort discovery, we'll receive a new hello once the swap has been performed.
212 + return;
213 + }
214 +
215 + DeviceDescription lastDescription = lastDescriptions.get(did);
186 DeviceDescription thisDescription = getDeviceDescription(did); 216 DeviceDescription thisDescription = getDeviceDescription(did);
217 +
187 if (thisDescription != null) { 218 if (thisDescription != null) {
188 - boolean descriptionChanged = lastDescription != null && 219 + boolean descriptionChanged = lastDescription == null ||
189 - (!Objects.equals(thisDescription, lastDescription) || 220 + (!Objects.equals(thisDescription, lastDescription) ||
190 - !Objects.equals(thisDescription.annotations(), lastDescription.annotations())); 221 + !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
191 if (descriptionChanged || !deviceService.isAvailable(did)) { 222 if (descriptionChanged || !deviceService.isAvailable(did)) {
192 - if (deviceService.getDevice(did) == null) {
193 - // Device is a first timer.
194 - log.info("Setting DEFAULT context for {}", did);
195 - // It is important to do this before connecting the device so other
196 - // services won't find a null context.
197 - contextService.setContext(did, contextService.defaultContext());
198 - }
199 resetDeviceState(did); 223 resetDeviceState(did);
200 initPortCounters(did); 224 initPortCounters(did);
201 providerService.deviceConnected(did, thisDescription); 225 providerService.deviceConnected(did, thisDescription);
202 updatePortsAndStats(did); 226 updatePortsAndStats(did);
203 } 227 }
204 - return thisDescription; 228 + lastDescriptions.put(did, thisDescription);
205 } else { 229 } else {
206 log.warn("Unable to get device description for {}", did); 230 log.warn("Unable to get device description for {}", did);
207 - return lastDescription; 231 + lastDescriptions.put(did, lastDescription);
208 } 232 }
209 - }); 233 + } finally {
234 + lock.unlock();
235 + }
210 } 236 }
211 237
212 private DeviceDescription getDeviceDescription(DeviceId did) { 238 private DeviceDescription getDeviceDescription(DeviceId did) {
...@@ -269,11 +295,27 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -269,11 +295,27 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
269 } 295 }
270 296
271 private void disconnectDevice(DeviceId did) { 297 private void disconnectDevice(DeviceId did) {
272 - log.debug("Trying to disconnect device from core... deviceId={}", did); 298 + log.debug("Disconnecting device from core... deviceId={}", did);
273 - if (deviceService.isAvailable(did)) { 299 + providerService.deviceDisconnected(did);
274 - providerService.deviceDisconnected(did); 300 + lastDescriptions.remove(did);
301 + }
302 +
303 + private void pollDevices() {
304 + for (Device device: deviceService.getAvailableDevices(SWITCH)) {
305 + if (device.id().uri().getScheme().equals(SCHEME) &&
306 + mastershipService.isLocalMaster(device.id())) {
307 + executorService.execute(() -> pollingTask(device.id()));
308 + }
309 + }
310 + }
311 +
312 + private void pollingTask(DeviceId deviceId) {
313 + log.debug("Polling device {}...", deviceId);
314 + if (isReachable(deviceId)) {
315 + updatePortsAndStats(deviceId);
316 + } else {
317 + disconnectDevice(deviceId);
275 } 318 }
276 - activeDevices.remove(did);
277 } 319 }
278 320
279 /** 321 /**
...@@ -332,50 +374,4 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -332,50 +374,4 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
332 triggerProbe(device.asDeviceId()); 374 triggerProbe(device.asDeviceId());
333 } 375 }
334 } 376 }
335 -
336 - /**
337 - * Task that periodically trigger device probes to check for device status and update port information.
338 - */
339 - private class DevicePoller implements TimerTask {
340 -
341 - private final HashedWheelTimer timer = Timer.getTimer();
342 - private Timeout timeout;
343 -
344 - @Override
345 - public void run(Timeout tout) throws Exception {
346 - if (tout.isCancelled()) {
347 - return;
348 - }
349 - activeDevices.keySet()
350 - .stream()
351 - // Filter out devices not yet created in the core.
352 - .filter(did -> deviceService.getDevice(did) != null)
353 - .forEach(did -> executorService.execute(() -> pollingTask(did)));
354 - tout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.MILLISECONDS);
355 - }
356 -
357 - private void pollingTask(DeviceId deviceId) {
358 - if (isReachable(deviceId)) {
359 - updatePortsAndStats(deviceId);
360 - } else {
361 - disconnectDevice(deviceId);
362 - }
363 - }
364 -
365 - /**
366 - * Starts the collector.
367 - */
368 - synchronized void start() {
369 - log.info("Starting device poller...");
370 - timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
371 - }
372 -
373 - /**
374 - * Stops the collector.
375 - */
376 - synchronized void stop() {
377 - log.info("Stopping device poller...");
378 - timeout.cancel();
379 - }
380 - }
381 } 377 }
......