ClusterMessageResponse to provide Future interface
Change-Id: I6d43382a1b572f34c5d7d1d41ca1e41dd472f6f2
Showing
2 changed files
with
44 additions
and
8 deletions
1 | package org.onlab.onos.store.cluster.messaging; | 1 | package org.onlab.onos.store.cluster.messaging; |
2 | 2 | ||
3 | +import java.util.concurrent.Future; | ||
3 | import java.util.concurrent.TimeUnit; | 4 | import java.util.concurrent.TimeUnit; |
4 | import java.util.concurrent.TimeoutException; | 5 | import java.util.concurrent.TimeoutException; |
5 | 6 | ||
6 | import org.onlab.onos.cluster.NodeId; | 7 | import org.onlab.onos.cluster.NodeId; |
7 | 8 | ||
8 | -public interface ClusterMessageResponse { | 9 | +public interface ClusterMessageResponse extends Future<byte[]> { |
10 | + | ||
9 | public NodeId sender(); | 11 | public NodeId sender(); |
10 | - public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException; | 12 | + |
11 | - public byte[] get(long timeout) throws InterruptedException; | 13 | + // TODO InterruptedException, ExecutionException removed from original |
14 | + // Future declaration. Revisit if we ever need those. | ||
15 | + @Override | ||
16 | + public byte[] get(long timeout, TimeUnit unit) throws TimeoutException; | ||
17 | + | ||
12 | } | 18 | } | ... | ... |
... | @@ -4,9 +4,9 @@ import static com.google.common.base.Preconditions.checkArgument; | ... | @@ -4,9 +4,9 @@ import static com.google.common.base.Preconditions.checkArgument; |
4 | 4 | ||
5 | import java.io.IOException; | 5 | import java.io.IOException; |
6 | import java.util.Set; | 6 | import java.util.Set; |
7 | +import java.util.concurrent.ExecutionException; | ||
7 | import java.util.concurrent.TimeUnit; | 8 | import java.util.concurrent.TimeUnit; |
8 | import java.util.concurrent.TimeoutException; | 9 | import java.util.concurrent.TimeoutException; |
9 | - | ||
10 | import org.apache.felix.scr.annotations.Activate; | 10 | import org.apache.felix.scr.annotations.Activate; |
11 | import org.apache.felix.scr.annotations.Component; | 11 | import org.apache.felix.scr.annotations.Component; |
12 | import org.apache.felix.scr.annotations.Deactivate; | 12 | import org.apache.felix.scr.annotations.Deactivate; |
... | @@ -181,10 +181,13 @@ public class ClusterCommunicationManager | ... | @@ -181,10 +181,13 @@ public class ClusterCommunicationManager |
181 | } | 181 | } |
182 | } | 182 | } |
183 | 183 | ||
184 | - private static final class InternalClusterMessageResponse implements ClusterMessageResponse { | 184 | + private static final class InternalClusterMessageResponse |
185 | + implements ClusterMessageResponse { | ||
185 | 186 | ||
186 | private final NodeId sender; | 187 | private final NodeId sender; |
187 | private final Response responseFuture; | 188 | private final Response responseFuture; |
189 | + private volatile boolean isCancelled = false; | ||
190 | + private volatile boolean isDone = false; | ||
188 | 191 | ||
189 | public InternalClusterMessageResponse(NodeId sender, Response responseFuture) { | 192 | public InternalClusterMessageResponse(NodeId sender, Response responseFuture) { |
190 | this.sender = sender; | 193 | this.sender = sender; |
... | @@ -198,12 +201,39 @@ public class ClusterCommunicationManager | ... | @@ -198,12 +201,39 @@ public class ClusterCommunicationManager |
198 | @Override | 201 | @Override |
199 | public byte[] get(long timeout, TimeUnit timeunit) | 202 | public byte[] get(long timeout, TimeUnit timeunit) |
200 | throws TimeoutException { | 203 | throws TimeoutException { |
201 | - return responseFuture.get(timeout, timeunit); | 204 | + final byte[] result = responseFuture.get(timeout, timeunit); |
205 | + isDone = true; | ||
206 | + return result; | ||
207 | + } | ||
208 | + | ||
209 | + @Override | ||
210 | + public boolean cancel(boolean mayInterruptIfRunning) { | ||
211 | + if (isDone()) { | ||
212 | + return false; | ||
213 | + } | ||
214 | + // doing nothing for now | ||
215 | + // when onlab.netty Response support cancel, call them. | ||
216 | + isCancelled = true; | ||
217 | + return true; | ||
218 | + } | ||
219 | + | ||
220 | + @Override | ||
221 | + public boolean isCancelled() { | ||
222 | + return isCancelled; | ||
223 | + } | ||
224 | + | ||
225 | + @Override | ||
226 | + public boolean isDone() { | ||
227 | + return this.isDone || isCancelled(); | ||
202 | } | 228 | } |
203 | 229 | ||
204 | @Override | 230 | @Override |
205 | - public byte[] get(long timeout) throws InterruptedException { | 231 | + public byte[] get() throws InterruptedException, ExecutionException { |
206 | - return responseFuture.get(); | 232 | + // TODO: consider forbidding this call and force the use of timed get |
233 | + // to enforce handling of remote peer failure scenario | ||
234 | + final byte[] result = responseFuture.get(); | ||
235 | + isDone = true; | ||
236 | + return result; | ||
207 | } | 237 | } |
208 | } | 238 | } |
209 | } | 239 | } | ... | ... |
-
Please register or login to post a comment