Carmelo Cascone
Committed by Ray Milkey

Fixed race condition in bmv2 SafeThriftClient

This was due to the fact that multiple clients were instantiated over
the same tranposrt. Now SafeThriftClient locks over the transport.
Also fixed a bug where some exceptions where uncaught in
SafeThriftClient.

Change-Id: I841ef64e64c28fc78016a0e0dc33e59052cd983e
...@@ -236,16 +236,6 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -236,16 +236,6 @@ public final class Bmv2ThriftClient implements Bmv2Client {
236 return buffers; 236 return buffers;
237 } 237 }
238 238
239 - private void closeTransport() {
240 - LOG.debug("Closing transport session... > deviceId={}", deviceId);
241 - if (this.transport.isOpen()) {
242 - this.transport.close();
243 - LOG.debug("Transport session closed! > deviceId={}", deviceId);
244 - } else {
245 - LOG.debug("Transport session was already closed! deviceId={}", deviceId);
246 - }
247 - }
248 -
249 @Override 239 @Override
250 public final long addTableEntry(Bmv2TableEntry entry) throws Bmv2RuntimeException { 240 public final long addTableEntry(Bmv2TableEntry entry) throws Bmv2RuntimeException {
251 241
...@@ -527,7 +517,7 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -527,7 +517,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
527 TTransport transport = new TSocket( 517 TTransport transport = new TSocket(
528 info.getLeft(), info.getRight()); 518 info.getLeft(), info.getRight());
529 TProtocol protocol = new TBinaryProtocol(transport); 519 TProtocol protocol = new TBinaryProtocol(transport);
530 - // Our BMv2 device implements multiple Thrift services, create a client for each one. 520 + // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport.
531 Standard.Client standardClient = new Standard.Client( 521 Standard.Client standardClient = new Standard.Client(
532 new TMultiplexedProtocol(protocol, "standard")); 522 new TMultiplexedProtocol(protocol, "standard"));
533 SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client( 523 SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client(
...@@ -551,11 +541,20 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -551,11 +541,20 @@ public final class Bmv2ThriftClient implements Bmv2Client {
551 RemovalListener<DeviceId, Bmv2ThriftClient> { 541 RemovalListener<DeviceId, Bmv2ThriftClient> {
552 542
553 @Override 543 @Override
554 - public void onRemoval( 544 + public void onRemoval(RemovalNotification<DeviceId, Bmv2ThriftClient> notification) {
555 - RemovalNotification<DeviceId, Bmv2ThriftClient> notification) {
556 // close the transport connection 545 // close the transport connection
557 - LOG.debug("Removing client from cache... > deviceId={}", notification.getKey()); 546 + Bmv2ThriftClient client = notification.getValue();
558 - notification.getValue().closeTransport(); 547 + // Locking here is ugly, but needed (see SafeThriftClient).
548 + synchronized (client.transport) {
549 + LOG.debug("Closing transport session... > deviceId={}", client.deviceId);
550 + if (client.transport.isOpen()) {
551 + client.transport.close();
552 + LOG.debug("Transport session closed! > deviceId={}", client.deviceId);
553 + } else {
554 + LOG.debug("Transport session was already closed! deviceId={}", client.deviceId);
555 + }
556 + }
557 + LOG.debug("Removing client from cache... > deviceId={}", client.deviceId);
559 } 558 }
560 } 559 }
561 } 560 }
......
...@@ -35,8 +35,8 @@ import java.lang.reflect.Proxy; ...@@ -35,8 +35,8 @@ import java.lang.reflect.Proxy;
35 import java.util.Set; 35 import java.util.Set;
36 36
37 /** 37 /**
38 - * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It al provides 38 + * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
39 - * synchronization between calls (automatically serialize multiple calls to the same client from different threads). 39 + * synchronization between calls over the same transport.
40 */ 40 */
41 public final class SafeThriftClient { 41 public final class SafeThriftClient {
42 42
...@@ -161,23 +161,28 @@ public final class SafeThriftClient { ...@@ -161,23 +161,28 @@ public final class SafeThriftClient {
161 */ 161 */
162 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler { 162 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
163 private final T baseClient; 163 private final T baseClient;
164 + private final TTransport transport;
164 private final int maxRetries; 165 private final int maxRetries;
165 private final long timeBetweenRetries; 166 private final long timeBetweenRetries;
166 167
167 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) { 168 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
168 this.baseClient = baseClient; 169 this.baseClient = baseClient;
170 + this.transport = baseClient.getInputProtocol().getTransport();
169 this.maxRetries = maxRetries; 171 this.maxRetries = maxRetries;
170 this.timeBetweenRetries = timeBetweenRetries; 172 this.timeBetweenRetries = timeBetweenRetries;
171 } 173 }
172 174
173 - private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries) 175 + private void reconnectOrThrowException()
174 throws TTransportException { 176 throws TTransportException {
175 int errors = 0; 177 int errors = 0;
176 try { 178 try {
177 - transport.close(); 179 + if (transport.isOpen()) {
180 + transport.close();
181 + }
178 } catch (Exception e) { 182 } catch (Exception e) {
179 // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown. 183 // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
180 // However, such an exception is not advertised by .close(), hence the general-purpose catch. 184 // However, such an exception is not advertised by .close(), hence the general-purpose catch.
185 + LOG.debug("Exception while closing transport", e);
181 } 186 }
182 187
183 while (errors < maxRetries) { 188 while (errors < maxRetries) {
...@@ -210,42 +215,37 @@ public final class SafeThriftClient { ...@@ -210,42 +215,37 @@ public final class SafeThriftClient {
210 @Override 215 @Override
211 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 216 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
212 217
213 - // With Thrift clients must be instantiated for each different transport session, i.e. server instance. 218 + // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
214 - // Hence, using baseClient as lock, only calls towards the same server will be synchronized. 219 + synchronized (transport) {
215 -
216 - synchronized (baseClient) {
217 220
218 LOG.debug("Invoking client method... > method={}, fromThread={}", 221 LOG.debug("Invoking client method... > method={}, fromThread={}",
219 method.getName(), Thread.currentThread().getId()); 222 method.getName(), Thread.currentThread().getId());
220 223
221 - Object result = null;
222 -
223 try { 224 try {
224 - result = method.invoke(baseClient, args);
225 225
226 + return method.invoke(baseClient, args);
226 } catch (InvocationTargetException e) { 227 } catch (InvocationTargetException e) {
227 if (e.getTargetException() instanceof TTransportException) { 228 if (e.getTargetException() instanceof TTransportException) {
228 TTransportException cause = (TTransportException) e.getTargetException(); 229 TTransportException cause = (TTransportException) e.getTargetException();
229 230
230 if (RESTARTABLE_CAUSES.contains(cause.getType())) { 231 if (RESTARTABLE_CAUSES.contains(cause.getType())) {
231 - reconnectOrThrowException(baseClient.getInputProtocol().getTransport(), 232 + // Try to reconnect. If fail, a TTransportException will be thrown.
232 - maxRetries, 233 + reconnectOrThrowException();
233 - timeBetweenRetries); 234 + try {
234 - result = method.invoke(baseClient, args); 235 + // If here, transport has been successfully open, hence new exceptions will be thrown.
236 + return method.invoke(baseClient, args);
237 + } catch (InvocationTargetException e1) {
238 + LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
239 + e1, method.getName(), Thread.currentThread().getId());
240 + throw e1.getTargetException();
241 + }
235 } 242 }
236 } 243 }
237 - 244 + // Target exception is neither a TTransportException nor a restartable cause.
238 - if (result == null) { 245 + LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
239 - LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}", 246 + e, method.getName(), Thread.currentThread().getId());
240 - e, method.getName(), Thread.currentThread().getId()); 247 + throw e.getTargetException();
241 - throw e.getTargetException();
242 - }
243 } 248 }
244 -
245 - LOG.debug("Method invoke complete! > method={}, fromThread={}",
246 - method.getName(), Thread.currentThread().getId());
247 -
248 - return result;
249 } 249 }
250 } 250 }
251 } 251 }
......