Carmelo Cascone
Committed by Gerrit Code Review

ONOS-4118 Added synchronization and resiliency to Bmv2ThriftClient

Due to the multi-threaded nature of drivers, calls to a Bmv2ThriftClient
could result in a race condition if not properly synchronized. Also,
once open, transport session might close due to several reasons. Now the
client calls are synchronized and automatically wrapped in a try/catch
that tries to re-open the session for fixed number of times before
giving up.

Change-Id: I5dcdd5a6304406dc6d9d3a0ccf7f16cdbf3b9573
...@@ -20,6 +20,7 @@ import com.google.common.collect.Lists; ...@@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
20 import com.google.common.collect.Maps; 20 import com.google.common.collect.Maps;
21 import org.apache.commons.lang3.tuple.Pair; 21 import org.apache.commons.lang3.tuple.Pair;
22 import org.apache.commons.lang3.tuple.Triple; 22 import org.apache.commons.lang3.tuple.Triple;
23 +import org.onosproject.bmv2.api.runtime.Bmv2Client;
23 import org.onosproject.bmv2.api.runtime.Bmv2MatchKey; 24 import org.onosproject.bmv2.api.runtime.Bmv2MatchKey;
24 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException; 25 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
25 import org.onosproject.bmv2.api.runtime.Bmv2TableEntry; 26 import org.onosproject.bmv2.api.runtime.Bmv2TableEntry;
...@@ -84,7 +85,7 @@ public class Bmv2FlowRuleDriver extends AbstractHandlerBehaviour ...@@ -84,7 +85,7 @@ public class Bmv2FlowRuleDriver extends AbstractHandlerBehaviour
84 85
85 DeviceId deviceId = handler().data().deviceId(); 86 DeviceId deviceId = handler().data().deviceId();
86 87
87 - Bmv2ThriftClient deviceClient; 88 + Bmv2Client deviceClient;
88 try { 89 try {
89 deviceClient = Bmv2ThriftClient.of(deviceId); 90 deviceClient = Bmv2ThriftClient.of(deviceId);
90 } catch (Bmv2RuntimeException e) { 91 } catch (Bmv2RuntimeException e) {
......
...@@ -18,6 +18,7 @@ package org.onosproject.drivers.bmv2; ...@@ -18,6 +18,7 @@ package org.onosproject.drivers.bmv2;
18 18
19 import com.google.common.collect.ImmutableList; 19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Lists; 20 import com.google.common.collect.Lists;
21 +import org.onosproject.bmv2.api.runtime.Bmv2Client;
21 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException; 22 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
22 import org.onosproject.bmv2.ctl.Bmv2ThriftClient; 23 import org.onosproject.bmv2.ctl.Bmv2ThriftClient;
23 import org.onosproject.net.DefaultAnnotations; 24 import org.onosproject.net.DefaultAnnotations;
...@@ -41,7 +42,7 @@ public class Bmv2PortGetterDriver extends AbstractHandlerBehaviour ...@@ -41,7 +42,7 @@ public class Bmv2PortGetterDriver extends AbstractHandlerBehaviour
41 42
42 @Override 43 @Override
43 public List<PortDescription> getPorts() { 44 public List<PortDescription> getPorts() {
44 - Bmv2ThriftClient deviceClient; 45 + Bmv2Client deviceClient;
45 try { 46 try {
46 deviceClient = Bmv2ThriftClient.of(handler().data().deviceId()); 47 deviceClient = Bmv2ThriftClient.of(handler().data().deviceId());
47 } catch (Bmv2RuntimeException e) { 48 } catch (Bmv2RuntimeException e) {
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.bmv2.api.runtime;
18 +
19 +import java.util.Collection;
20 +
21 +/**
22 + * RPC client to control a BMv2 device.
23 + */
24 +public interface Bmv2Client {
25 + /**
26 + * Adds a new table entry.
27 + *
28 + * @param entry a table entry value
29 + * @return table-specific entry ID
30 + * @throws Bmv2RuntimeException if any error occurs
31 + */
32 + long addTableEntry(Bmv2TableEntry entry) throws Bmv2RuntimeException;
33 +
34 + /**
35 + * Modifies a currently installed entry by updating its action.
36 + *
37 + * @param tableName string value of table name
38 + * @param entryId long value of entry ID
39 + * @param action an action value
40 + * @throws Bmv2RuntimeException if any error occurs
41 + */
42 + void modifyTableEntry(String tableName,
43 + long entryId, Bmv2Action action)
44 + throws Bmv2RuntimeException;
45 +
46 + /**
47 + * Deletes currently installed entry.
48 + *
49 + * @param tableName string value of table name
50 + * @param entryId long value of entry ID
51 + * @throws Bmv2RuntimeException if any error occurs
52 + */
53 + void deleteTableEntry(String tableName,
54 + long entryId) throws Bmv2RuntimeException;
55 +
56 + /**
57 + * Sets table default action.
58 + *
59 + * @param tableName string value of table name
60 + * @param action an action value
61 + * @throws Bmv2RuntimeException if any error occurs
62 + */
63 + void setTableDefaultAction(String tableName, Bmv2Action action)
64 + throws Bmv2RuntimeException;
65 +
66 + /**
67 + * Returns information of the ports currently configured in the switch.
68 + *
69 + * @return collection of port information
70 + * @throws Bmv2RuntimeException if any error occurs
71 + */
72 + Collection<Bmv2PortInfo> getPortsInfo() throws Bmv2RuntimeException;
73 +
74 + /**
75 + * Return a string representation of a table content.
76 + *
77 + * @param tableName string value of table name
78 + * @return table string dump
79 + * @throws Bmv2RuntimeException if any error occurs
80 + */
81 + String dumpTable(String tableName) throws Bmv2RuntimeException;
82 +
83 + /**
84 + * Reset the state of the switch (e.g. delete all entries, etc.).
85 + *
86 + * @throws Bmv2RuntimeException if any error occurs
87 + */
88 + void resetState() throws Bmv2RuntimeException;
89 +}
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +/*
18 + * Most of the code of this class was copied from:
19 + * http://liveramp.com/engineering/reconnecting-thrift-client/
20 + */
21 +
22 +package org.onosproject.bmv2.ctl;
23 +
24 +import com.google.common.collect.ImmutableSet;
25 +import org.apache.thrift.TServiceClient;
26 +import org.apache.thrift.transport.TTransport;
27 +import org.apache.thrift.transport.TTransportException;
28 +import org.slf4j.Logger;
29 +import org.slf4j.LoggerFactory;
30 +
31 +import java.lang.reflect.InvocationHandler;
32 +import java.lang.reflect.InvocationTargetException;
33 +import java.lang.reflect.Method;
34 +import java.lang.reflect.Proxy;
35 +import java.util.Set;
36 +
37 +/**
38 + * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It al provides
39 + * synchronization between calls (automatically serialize multiple calls to the same client from different threads).
40 + */
41 +public final class SafeThriftClient {
42 +
43 + private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
44 +
45 + /**
46 + * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
47 + */
48 + private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
49 + TTransportException.END_OF_FILE,
50 + TTransportException.TIMED_OUT,
51 + TTransportException.UNKNOWN);
52 +
53 + private SafeThriftClient() {
54 + // ban constructor.
55 + }
56 +
57 + /**
58 + * Reflectively wraps an already existing Thrift client.
59 + *
60 + * @param baseClient the client to wrap
61 + * @param clientInterface the interface that the client implements
62 + * @param options options that control behavior of the reconnecting client
63 + * @param <T>
64 + * @param <C>
65 + * @return
66 + */
67 + public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
68 + Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
69 + new Class<?>[]{clientInterface},
70 + new ReconnectingClientProxy<T>(baseClient,
71 + options.getNumRetries(),
72 + options.getTimeBetweenRetries()));
73 +
74 + return (C) proxyObject;
75 + }
76 +
77 + /**
78 + * Reflectively wraps an already existing Thrift client.
79 + *
80 + * @param baseClient the client to wrap
81 + * @param options options that control behavior of the reconnecting client
82 + * @param <T>
83 + * @param <C>
84 + * @return
85 + */
86 + public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
87 + Class<?>[] interfaces = baseClient.getClass().getInterfaces();
88 +
89 + for (Class<?> iface : interfaces) {
90 + if (iface.getSimpleName().equals("Iface")
91 + && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
92 + return (C) wrap(baseClient, iface, options);
93 + }
94 + }
95 +
96 + throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
97 + }
98 +
99 + /**
100 + * Reflectively wraps an already existing Thrift client.
101 + *
102 + * @param baseClient the client to wrap
103 + * @param clientInterface the interface that the client implements
104 + * @param <T>
105 + * @param <C>
106 + * @return
107 + */
108 + public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
109 + return wrap(baseClient, clientInterface, Options.defaults());
110 + }
111 +
112 + /**
113 + * Reflectively wraps an already existing Thrift client.
114 + *
115 + * @param baseClient the client to wrap
116 + * @param <T>
117 + * @param <C>
118 + * @return
119 + */
120 + public static <T extends TServiceClient, C> C wrap(T baseClient) {
121 + return wrap(baseClient, Options.defaults());
122 + }
123 +
124 + /**
125 + * Reconnection options for {@link SafeThriftClient}.
126 + */
127 + public static class Options {
128 + private int numRetries;
129 + private long timeBetweenRetries;
130 +
131 + /**
132 + * Creates new options with the given parameters.
133 + *
134 + * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
135 + * exception
136 + * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
137 + */
138 + public Options(int numRetries, long timeBetweenRetries) {
139 + this.numRetries = numRetries;
140 + this.timeBetweenRetries = timeBetweenRetries;
141 + }
142 +
143 + private static Options defaults() {
144 + return new Options(5, 10000L);
145 + }
146 +
147 + private int getNumRetries() {
148 + return numRetries;
149 + }
150 +
151 + private long getTimeBetweenRetries() {
152 + return timeBetweenRetries;
153 + }
154 + }
155 +
156 + /**
157 + * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
158 + * reconnect and tries the method again.
159 + *
160 + * @param <T>
161 + */
162 + private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
163 + private final T baseClient;
164 + private final int maxRetries;
165 + private final long timeBetweenRetries;
166 +
167 + public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
168 + this.baseClient = baseClient;
169 + this.maxRetries = maxRetries;
170 + this.timeBetweenRetries = timeBetweenRetries;
171 + }
172 +
173 + private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
174 + throws TTransportException {
175 + int errors = 0;
176 + transport.close();
177 +
178 + while (errors < maxRetries) {
179 + try {
180 + LOG.debug("Attempting to reconnect...");
181 + transport.open();
182 + LOG.debug("Reconnection successful");
183 + break;
184 + } catch (TTransportException e) {
185 + LOG.error("Error while reconnecting:", e);
186 + errors++;
187 +
188 + if (errors < maxRetries) {
189 + try {
190 + LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
191 + Thread.sleep(timeBetweenRetries);
192 + } catch (InterruptedException e2) {
193 + Thread.currentThread().interrupt();
194 + throw new RuntimeException(e);
195 + }
196 + }
197 + }
198 + }
199 +
200 + if (errors >= maxRetries) {
201 + throw new TTransportException("Failed to reconnect");
202 + }
203 + }
204 +
205 + @Override
206 + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
207 +
208 + // With Thrift clients must be instantiated for each different transport session, i.e. server instance.
209 + // Hence, using baseClient as lock, only calls towards the same server will be synchronized.
210 +
211 + synchronized (baseClient) {
212 +
213 + LOG.debug("Invoking client method... > method={}, fromThread={}",
214 + method.getName(), Thread.currentThread().getId());
215 +
216 + Object result = null;
217 +
218 + try {
219 + result = method.invoke(baseClient, args);
220 +
221 + } catch (InvocationTargetException e) {
222 + if (e.getTargetException() instanceof TTransportException) {
223 + TTransportException cause = (TTransportException) e.getTargetException();
224 +
225 + if (RESTARTABLE_CAUSES.contains(cause.getType())) {
226 + reconnectOrThrowException(baseClient.getInputProtocol().getTransport(),
227 + maxRetries,
228 + timeBetweenRetries);
229 + result = method.invoke(baseClient, args);
230 + }
231 + }
232 +
233 + if (result == null) {
234 + LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
235 + e, method.getName(), Thread.currentThread().getId());
236 + throw e.getTargetException();
237 + }
238 + }
239 +
240 + LOG.debug("Method invoke complete! > method={}, fromThread={}",
241 + method.getName(), Thread.currentThread().getId());
242 +
243 + return result;
244 + }
245 + }
246 + }
247 +}