Yuta HIGUCHI

Initial DistributedDlowRuleStore

- known bug: responding to ClusterMessage not possible.

Change-Id: Iaa4245c64d2a6219d7c48ed30ddca7d558dbc177
...@@ -24,7 +24,18 @@ public class FlowRuleEvent extends AbstractEvent<FlowRuleEvent.Type, FlowRule> { ...@@ -24,7 +24,18 @@ public class FlowRuleEvent extends AbstractEvent<FlowRuleEvent.Type, FlowRule> {
24 /** 24 /**
25 * Signifies that a rule has been updated. 25 * Signifies that a rule has been updated.
26 */ 26 */
27 - RULE_UPDATED 27 + RULE_UPDATED,
28 +
29 + // internal event between Manager <-> Store
30 +
31 + /*
32 + * Signifies that a request to add flow rule has been added to the store.
33 + */
34 + RULE_ADD_REQUESTED,
35 + /*
36 + * Signifies that a request to remove flow rule has been added to the store.
37 + */
38 + RULE_REMOVE_REQUESTED,
28 } 39 }
29 40
30 /** 41 /**
......
...@@ -44,16 +44,18 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat ...@@ -44,16 +44,18 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
44 * Stores a new flow rule without generating events. 44 * Stores a new flow rule without generating events.
45 * 45 *
46 * @param rule the flow rule to add 46 * @param rule the flow rule to add
47 + * @return true if the rule should be handled locally
47 */ 48 */
48 - void storeFlowRule(FlowRule rule); 49 + boolean storeFlowRule(FlowRule rule);
49 50
50 /** 51 /**
51 * Marks a flow rule for deletion. Actual deletion will occur 52 * Marks a flow rule for deletion. Actual deletion will occur
52 * when the provider indicates that the flow has been removed. 53 * when the provider indicates that the flow has been removed.
53 * 54 *
54 * @param rule the flow rule to delete 55 * @param rule the flow rule to delete
56 + * @return true if the rule should be handled locally
55 */ 57 */
56 - void deleteFlowRule(FlowRule rule); 58 + boolean deleteFlowRule(FlowRule rule);
57 59
58 /** 60 /**
59 * Stores a new flow rule, or updates an existing entry. 61 * Stores a new flow rule, or updates an existing entry.
......
...@@ -104,24 +104,52 @@ public class FlowRuleManager ...@@ -104,24 +104,52 @@ public class FlowRuleManager
104 public void applyFlowRules(FlowRule... flowRules) { 104 public void applyFlowRules(FlowRule... flowRules) {
105 for (int i = 0; i < flowRules.length; i++) { 105 for (int i = 0; i < flowRules.length; i++) {
106 FlowRule f = flowRules[i]; 106 FlowRule f = flowRules[i];
107 - final Device device = deviceService.getDevice(f.deviceId()); 107 + boolean local = store.storeFlowRule(f);
108 - final FlowRuleProvider frp = getProvider(device.providerId()); 108 + if (local) {
109 - store.storeFlowRule(f); 109 + // TODO: aggregate all local rules and push down once?
110 - frp.applyFlowRule(f); 110 + applyFlowRulesToProviders(f);
111 + }
112 + }
113 + }
114 +
115 + private void applyFlowRulesToProviders(FlowRule... flowRules) {
116 + DeviceId did = null;
117 + FlowRuleProvider frp = null;
118 + for (FlowRule f : flowRules) {
119 + if (!f.deviceId().equals(did)) {
120 + did = f.deviceId();
121 + final Device device = deviceService.getDevice(did);
122 + frp = getProvider(device.providerId());
123 + }
124 + if (frp != null) {
125 + frp.applyFlowRule(f);
126 + }
111 } 127 }
112 } 128 }
113 129
114 @Override 130 @Override
115 public void removeFlowRules(FlowRule... flowRules) { 131 public void removeFlowRules(FlowRule... flowRules) {
116 FlowRule f; 132 FlowRule f;
117 - FlowRuleProvider frp;
118 - Device device;
119 for (int i = 0; i < flowRules.length; i++) { 133 for (int i = 0; i < flowRules.length; i++) {
120 f = flowRules[i]; 134 f = flowRules[i];
121 - device = deviceService.getDevice(f.deviceId()); 135 + boolean local = store.deleteFlowRule(f);
122 - store.deleteFlowRule(f); 136 + if (local) {
123 - if (device != null) { 137 + // TODO: aggregate all local rules and push down once?
138 + removeFlowRulesFromProviders(f);
139 + }
140 + }
141 + }
142 +
143 + private void removeFlowRulesFromProviders(FlowRule... flowRules) {
144 + DeviceId did = null;
145 + FlowRuleProvider frp = null;
146 + for (FlowRule f : flowRules) {
147 + if (!f.deviceId().equals(did)) {
148 + did = f.deviceId();
149 + final Device device = deviceService.getDevice(did);
124 frp = getProvider(device.providerId()); 150 frp = getProvider(device.providerId());
151 + }
152 + if (frp != null) {
125 frp.removeFlowRule(f); 153 frp.removeFlowRule(f);
126 } 154 }
127 } 155 }
...@@ -135,8 +163,11 @@ public class FlowRuleManager ...@@ -135,8 +163,11 @@ public class FlowRuleManager
135 163
136 for (FlowRule f : rules) { 164 for (FlowRule f : rules) {
137 store.deleteFlowRule(f); 165 store.deleteFlowRule(f);
166 + // FIXME: only accept request and push to provider on internal event
138 device = deviceService.getDevice(f.deviceId()); 167 device = deviceService.getDevice(f.deviceId());
139 frp = getProvider(device.providerId()); 168 frp = getProvider(device.providerId());
169 + // FIXME: flows removed from store and flows removed from might diverge
170 + // get rid of #removeRulesById?
140 frp.removeRulesById(id, f); 171 frp.removeRulesById(id, f);
141 } 172 }
142 } 173 }
...@@ -352,7 +383,23 @@ public class FlowRuleManager ...@@ -352,7 +383,23 @@ public class FlowRuleManager
352 private class InternalStoreDelegate implements FlowRuleStoreDelegate { 383 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
353 @Override 384 @Override
354 public void notify(FlowRuleEvent event) { 385 public void notify(FlowRuleEvent event) {
355 - eventDispatcher.post(event); 386 + switch (event.type()) {
387 + case RULE_ADD_REQUESTED:
388 + applyFlowRulesToProviders(event.subject());
389 + break;
390 + case RULE_REMOVE_REQUESTED:
391 + removeFlowRulesFromProviders(event.subject());
392 + break;
393 +
394 + case RULE_ADDED:
395 + case RULE_REMOVED:
396 + case RULE_UPDATED:
397 + // only dispatch events related to switch
398 + eventDispatcher.post(event);
399 + break;
400 + default:
401 + break;
402 + }
356 } 403 }
357 } 404 }
358 405
......
...@@ -2,6 +2,7 @@ package org.onlab.onos.store.flow.impl; ...@@ -2,6 +2,7 @@ package org.onlab.onos.store.flow.impl;
2 2
3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 +import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
5 6
6 import java.io.IOException; 7 import java.io.IOException;
7 import java.util.Collection; 8 import java.util.Collection;
...@@ -30,6 +31,7 @@ import org.onlab.onos.net.flow.StoredFlowEntry; ...@@ -30,6 +31,7 @@ import org.onlab.onos.net.flow.StoredFlowEntry;
30 import org.onlab.onos.store.AbstractStore; 31 import org.onlab.onos.store.AbstractStore;
31 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 32 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 33 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
34 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
33 import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse; 35 import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
34 import org.onlab.onos.store.flow.ReplicaInfo; 36 import org.onlab.onos.store.flow.ReplicaInfo;
35 import org.onlab.onos.store.flow.ReplicaInfoService; 37 import org.onlab.onos.store.flow.ReplicaInfoService;
...@@ -80,10 +82,44 @@ public class DistributedFlowRuleStore ...@@ -80,10 +82,44 @@ public class DistributedFlowRuleStore
80 }; 82 };
81 83
82 // TODO: make this configurable 84 // TODO: make this configurable
83 - private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000; 85 + private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
84 86
85 @Activate 87 @Activate
86 public void activate() { 88 public void activate() {
89 + clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
90 +
91 + @Override
92 + public void handle(ClusterMessage message) {
93 + FlowRule rule = SERIALIZER.decode(message.payload());
94 + log.info("received add request for {}", rule);
95 + storeFlowEntryInternal(rule);
96 + // FIXME what to respond.
97 + try {
98 + // FIXME: #respond() not working. responded message is
99 + // handled by this sender node and never goes back.
100 + message.respond(SERIALIZER.encode("ACK"));
101 + } catch (IOException e) {
102 + log.error("Failed to respond back", e);
103 + }
104 + }
105 + });
106 +
107 + clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
108 +
109 + @Override
110 + public void handle(ClusterMessage message) {
111 + FlowRule rule = SERIALIZER.decode(message.payload());
112 + log.info("received delete request for {}", rule);
113 + deleteFlowRuleInternal(rule);
114 + // FIXME what to respond.
115 + try {
116 + message.respond(SERIALIZER.encode("ACK"));
117 + } catch (IOException e) {
118 + log.error("Failed to respond back", e);
119 + }
120 +
121 + }
122 + });
87 log.info("Started"); 123 log.info("Started");
88 } 124 }
89 125
...@@ -131,13 +167,14 @@ public class DistributedFlowRuleStore ...@@ -131,13 +167,14 @@ public class DistributedFlowRuleStore
131 } 167 }
132 168
133 @Override 169 @Override
134 - public void storeFlowRule(FlowRule rule) { 170 + public boolean storeFlowRule(FlowRule rule) {
135 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 171 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
136 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 172 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
137 - storeFlowEntryInternal(rule); 173 + return storeFlowEntryInternal(rule);
138 - return;
139 } 174 }
140 175
176 + log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
177 +
141 ClusterMessage message = new ClusterMessage( 178 ClusterMessage message = new ClusterMessage(
142 clusterService.getLocalNode().id(), 179 clusterService.getLocalNode().id(),
143 FlowStoreMessageSubjects.STORE_FLOW_RULE, 180 FlowStoreMessageSubjects.STORE_FLOW_RULE,
...@@ -150,26 +187,29 @@ public class DistributedFlowRuleStore ...@@ -150,26 +187,29 @@ public class DistributedFlowRuleStore
150 // FIXME: throw a FlowStoreException 187 // FIXME: throw a FlowStoreException
151 throw new RuntimeException(e); 188 throw new RuntimeException(e);
152 } 189 }
190 + return false;
153 } 191 }
154 192
155 - private synchronized void storeFlowEntryInternal(FlowRule flowRule) { 193 + private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
156 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule); 194 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
157 DeviceId deviceId = flowRule.deviceId(); 195 DeviceId deviceId = flowRule.deviceId();
158 // write to local copy. 196 // write to local copy.
159 if (!flowEntries.containsEntry(deviceId, flowEntry)) { 197 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
160 flowEntries.put(deviceId, flowEntry); 198 flowEntries.put(deviceId, flowEntry);
161 flowEntriesById.put(flowRule.appId(), flowEntry); 199 flowEntriesById.put(flowRule.appId(), flowEntry);
200 + notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
201 + return true;
162 } 202 }
163 // write to backup. 203 // write to backup.
164 // TODO: write to a hazelcast map. 204 // TODO: write to a hazelcast map.
205 + return false;
165 } 206 }
166 207
167 @Override 208 @Override
168 - public synchronized void deleteFlowRule(FlowRule rule) { 209 + public synchronized boolean deleteFlowRule(FlowRule rule) {
169 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 210 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
170 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 211 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
171 - deleteFlowRuleInternal(rule); 212 + return deleteFlowRuleInternal(rule);
172 - return;
173 } 213 }
174 214
175 ClusterMessage message = new ClusterMessage( 215 ClusterMessage message = new ClusterMessage(
...@@ -184,15 +224,21 @@ public class DistributedFlowRuleStore ...@@ -184,15 +224,21 @@ public class DistributedFlowRuleStore
184 // FIXME: throw a FlowStoreException 224 // FIXME: throw a FlowStoreException
185 throw new RuntimeException(e); 225 throw new RuntimeException(e);
186 } 226 }
227 + return false;
187 } 228 }
188 229
189 - private synchronized void deleteFlowRuleInternal(FlowRule flowRule) { 230 + private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
190 StoredFlowEntry entry = getFlowEntryInternal(flowRule); 231 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
191 if (entry == null) { 232 if (entry == null) {
192 - return; 233 + return false;
193 } 234 }
194 entry.setState(FlowEntryState.PENDING_REMOVE); 235 entry.setState(FlowEntryState.PENDING_REMOVE);
236 +
195 // TODO: also update backup. 237 // TODO: also update backup.
238 +
239 + notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
240 +
241 + return true;
196 } 242 }
197 243
198 @Override 244 @Override
......
...@@ -148,8 +148,9 @@ public class SimpleFlowRuleStore ...@@ -148,8 +148,9 @@ public class SimpleFlowRuleStore
148 } 148 }
149 149
150 @Override 150 @Override
151 - public void storeFlowRule(FlowRule rule) { 151 + public boolean storeFlowRule(FlowRule rule) {
152 final boolean added = storeFlowRuleInternal(rule); 152 final boolean added = storeFlowRuleInternal(rule);
153 + return added;
153 } 154 }
154 155
155 private boolean storeFlowRuleInternal(FlowRule rule) { 156 private boolean storeFlowRuleInternal(FlowRule rule) {
...@@ -166,13 +167,14 @@ public class SimpleFlowRuleStore ...@@ -166,13 +167,14 @@ public class SimpleFlowRuleStore
166 } 167 }
167 // new flow rule added 168 // new flow rule added
168 existing.add(f); 169 existing.add(f);
169 - // TODO: notify through delegate about remote event? 170 + // TODO: Should we notify only if it's "remote" event?
171 + //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
170 return true; 172 return true;
171 } 173 }
172 } 174 }
173 175
174 @Override 176 @Override
175 - public void deleteFlowRule(FlowRule rule) { 177 + public boolean deleteFlowRule(FlowRule rule) {
176 178
177 List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); 179 List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
178 synchronized (entries) { 180 synchronized (entries) {
...@@ -180,12 +182,15 @@ public class SimpleFlowRuleStore ...@@ -180,12 +182,15 @@ public class SimpleFlowRuleStore
180 if (entry.equals(rule)) { 182 if (entry.equals(rule)) {
181 synchronized (entry) { 183 synchronized (entry) {
182 entry.setState(FlowEntryState.PENDING_REMOVE); 184 entry.setState(FlowEntryState.PENDING_REMOVE);
183 - return; 185 + // TODO: Should we notify only if it's "remote" event?
186 + //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
187 + return true;
184 } 188 }
185 } 189 }
186 } 190 }
187 } 191 }
188 //log.warn("Cannot find rule {}", rule); 192 //log.warn("Cannot find rule {}", rule);
193 + return false;
189 } 194 }
190 195
191 @Override 196 @Override
......