Madan Jampani

getFlowEntries now operates correctly in a distributed setting

...@@ -11,6 +11,7 @@ import java.util.Arrays; ...@@ -11,6 +11,7 @@ import java.util.Arrays;
11 import java.util.Collection; 11 import java.util.Collection;
12 import java.util.Collections; 12 import java.util.Collections;
13 import java.util.Map; 13 import java.util.Map;
14 +import java.util.Set;
14 import java.util.concurrent.ExecutorService; 15 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors; 16 import java.util.concurrent.Executors;
16 import java.util.concurrent.Future; 17 import java.util.concurrent.Future;
...@@ -159,6 +160,21 @@ public class DistributedFlowRuleStore ...@@ -159,6 +160,21 @@ public class DistributedFlowRuleStore
159 } 160 }
160 }); 161 });
161 162
163 + clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
164 +
165 + @Override
166 + public void handle(ClusterMessage message) {
167 + DeviceId deviceId = SERIALIZER.decode(message.payload());
168 + log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
169 + Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
170 + try {
171 + message.respond(SERIALIZER.encode(flowEntries));
172 + } catch (IOException e) {
173 + log.error("Failed to respond to peer's getFlowEntries request", e);
174 + }
175 + }
176 + });
177 +
162 log.info("Started"); 178 log.info("Started");
163 } 179 }
164 180
...@@ -217,9 +233,33 @@ public class DistributedFlowRuleStore ...@@ -217,9 +233,33 @@ public class DistributedFlowRuleStore
217 233
218 @Override 234 @Override
219 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { 235 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
236 +
237 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
238 + if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
239 + return getFlowEntriesInternal(deviceId);
240 + }
241 +
242 + log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
243 + replicaInfo.master().orNull(), deviceId);
244 +
245 + ClusterMessage message = new ClusterMessage(
246 + clusterService.getLocalNode().id(),
247 + GET_DEVICE_FLOW_ENTRIES,
248 + SERIALIZER.encode(deviceId));
249 +
250 + try {
251 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
252 + return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
253 + } catch (IOException | TimeoutException e) {
254 + // FIXME: throw a FlowStoreException
255 + throw new RuntimeException(e);
256 + }
257 + }
258 +
259 + private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
220 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId); 260 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
221 if (rules == null) { 261 if (rules == null) {
222 - return Collections.emptyList(); 262 + return Collections.emptySet();
223 } 263 }
224 return ImmutableSet.copyOf(rules); 264 return ImmutableSet.copyOf(rules);
225 } 265 }
......
...@@ -13,4 +13,7 @@ public final class FlowStoreMessageSubjects { ...@@ -13,4 +13,7 @@ public final class FlowStoreMessageSubjects {
13 13
14 public static final MessageSubject GET_FLOW_ENTRY 14 public static final MessageSubject GET_FLOW_ENTRY
15 = new MessageSubject("peer-forward-get-flow-entry"); 15 = new MessageSubject("peer-forward-get-flow-entry");
16 +
17 + public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
18 + = new MessageSubject("peer-forward-get-device-flow-entries");
16 } 19 }
......