Madan Jampani

Initial implementation for DistributedFlowRuleStore utilizing master/backup replication

...@@ -3,14 +3,20 @@ package org.onlab.onos.store.flow.impl; ...@@ -3,14 +3,20 @@ package org.onlab.onos.store.flow.impl;
3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 5
6 +import java.io.IOException;
6 import java.util.Collection; 7 import java.util.Collection;
7 import java.util.Collections; 8 import java.util.Collections;
9 +import java.util.concurrent.TimeUnit;
10 +import java.util.concurrent.TimeoutException;
8 11
9 import org.apache.felix.scr.annotations.Activate; 12 import org.apache.felix.scr.annotations.Activate;
10 import org.apache.felix.scr.annotations.Component; 13 import org.apache.felix.scr.annotations.Component;
11 import org.apache.felix.scr.annotations.Deactivate; 14 import org.apache.felix.scr.annotations.Deactivate;
15 +import org.apache.felix.scr.annotations.Reference;
16 +import org.apache.felix.scr.annotations.ReferenceCardinality;
12 import org.apache.felix.scr.annotations.Service; 17 import org.apache.felix.scr.annotations.Service;
13 import org.onlab.onos.ApplicationId; 18 import org.onlab.onos.ApplicationId;
19 +import org.onlab.onos.cluster.ClusterService;
14 import org.onlab.onos.net.DeviceId; 20 import org.onlab.onos.net.DeviceId;
15 import org.onlab.onos.net.flow.DefaultFlowEntry; 21 import org.onlab.onos.net.flow.DefaultFlowEntry;
16 import org.onlab.onos.net.flow.FlowEntry; 22 import org.onlab.onos.net.flow.FlowEntry;
...@@ -21,6 +27,13 @@ import org.onlab.onos.net.flow.FlowRuleEvent.Type; ...@@ -21,6 +27,13 @@ import org.onlab.onos.net.flow.FlowRuleEvent.Type;
21 import org.onlab.onos.net.flow.FlowRuleStore; 27 import org.onlab.onos.net.flow.FlowRuleStore;
22 import org.onlab.onos.net.flow.FlowRuleStoreDelegate; 28 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
23 import org.onlab.onos.store.AbstractStore; 29 import org.onlab.onos.store.AbstractStore;
30 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
31 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
32 +import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
33 +import org.onlab.onos.store.flow.ReplicaInfo;
34 +import org.onlab.onos.store.serializers.DistributedStoreSerializers;
35 +import org.onlab.onos.store.serializers.KryoSerializer;
36 +import org.onlab.util.KryoPool;
24 import org.slf4j.Logger; 37 import org.slf4j.Logger;
25 38
26 import com.google.common.collect.ArrayListMultimap; 39 import com.google.common.collect.ArrayListMultimap;
...@@ -28,9 +41,8 @@ import com.google.common.collect.ImmutableSet; ...@@ -28,9 +41,8 @@ import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Multimap; 41 import com.google.common.collect.Multimap;
29 42
30 /** 43 /**
31 - * Manages inventory of flow rules using trivial in-memory implementation. 44 + * Manages inventory of flow rules using a distributed state management protocol.
32 */ 45 */
33 -//FIXME I LIE. I AIN'T DISTRIBUTED
34 @Component(immediate = true) 46 @Component(immediate = true)
35 @Service 47 @Service
36 public class DistributedFlowRuleStore 48 public class DistributedFlowRuleStore
...@@ -46,6 +58,28 @@ public class DistributedFlowRuleStore ...@@ -46,6 +58,28 @@ public class DistributedFlowRuleStore
46 private final Multimap<Short, FlowRule> flowEntriesById = 58 private final Multimap<Short, FlowRule> flowEntriesById =
47 ArrayListMultimap.<Short, FlowRule>create(); 59 ArrayListMultimap.<Short, FlowRule>create();
48 60
61 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 + private ReplicaInfoManager replicaInfoManager;
63 +
64 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 + private ClusterCommunicationService clusterCommunicator;
66 +
67 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 + private ClusterService clusterService;
69 +
70 + protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 + @Override
72 + protected void setupKryoPool() {
73 + serializerPool = KryoPool.newBuilder()
74 + .register(DistributedStoreSerializers.COMMON)
75 + .build()
76 + .populate(1);
77 + }
78 + };
79 +
80 + // TODO: make this configurable
81 + private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
82 +
49 @Activate 83 @Activate
50 public void activate() { 84 public void activate() {
51 log.info("Started"); 85 log.info("Started");
...@@ -91,26 +125,92 @@ public class DistributedFlowRuleStore ...@@ -91,26 +125,92 @@ public class DistributedFlowRuleStore
91 } 125 }
92 126
93 @Override 127 @Override
94 - public synchronized void storeFlowRule(FlowRule rule) { 128 + public void storeFlowRule(FlowRule rule) {
95 - FlowEntry f = new DefaultFlowEntry(rule); 129 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
96 - DeviceId did = f.deviceId(); 130 + if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
97 - if (!flowEntries.containsEntry(did, f)) { 131 + storeFlowEntryInternal(rule);
98 - flowEntries.put(did, f); 132 + return;
99 - flowEntriesById.put(rule.appId(), f);
100 } 133 }
134 +
135 + ClusterMessage message = new ClusterMessage(
136 + clusterService.getLocalNode().id(),
137 + FlowStoreMessageSubjects.STORE_FLOW_RULE,
138 + SERIALIZER.encode(rule));
139 +
140 + try {
141 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
142 + response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
143 + } catch (IOException | TimeoutException e) {
144 + // FIXME: throw a FlowStoreException
145 + throw new RuntimeException(e);
146 + }
147 + }
148 +
149 + public synchronized void storeFlowEntryInternal(FlowRule flowRule) {
150 + FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
151 + DeviceId deviceId = flowRule.deviceId();
152 + // write to local copy.
153 + if (!flowEntries.containsEntry(deviceId, flowEntry)) {
154 + flowEntries.put(deviceId, flowEntry);
155 + flowEntriesById.put(flowRule.appId(), flowEntry);
156 + }
157 + // write to backup.
158 + // TODO: write to a hazelcast map.
101 } 159 }
102 160
103 @Override 161 @Override
104 public synchronized void deleteFlowRule(FlowRule rule) { 162 public synchronized void deleteFlowRule(FlowRule rule) {
105 - FlowEntry entry = getFlowEntry(rule); 163 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
164 + if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
165 + deleteFlowRuleInternal(rule);
166 + return;
167 + }
168 +
169 + ClusterMessage message = new ClusterMessage(
170 + clusterService.getLocalNode().id(),
171 + FlowStoreMessageSubjects.DELETE_FLOW_RULE,
172 + SERIALIZER.encode(rule));
173 +
174 + try {
175 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
176 + response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
177 + } catch (IOException | TimeoutException e) {
178 + // FIXME: throw a FlowStoreException
179 + throw new RuntimeException(e);
180 + }
181 + }
182 +
183 + public synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
184 + FlowEntry entry = getFlowEntry(flowRule);
106 if (entry == null) { 185 if (entry == null) {
107 return; 186 return;
108 } 187 }
109 entry.setState(FlowEntryState.PENDING_REMOVE); 188 entry.setState(FlowEntryState.PENDING_REMOVE);
189 + // TODO: also update backup.
110 } 190 }
111 191
112 @Override 192 @Override
113 - public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { 193 + public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
194 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
195 + if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
196 + return addOrUpdateFlowRuleInternal(rule);
197 + }
198 +
199 + ClusterMessage message = new ClusterMessage(
200 + clusterService.getLocalNode().id(),
201 + FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
202 + SERIALIZER.encode(rule));
203 +
204 + try {
205 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
206 + return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
207 + } catch (IOException | TimeoutException e) {
208 + // FIXME: throw a FlowStoreException
209 + throw new RuntimeException(e);
210 + }
211 + }
212 +
213 + private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
114 DeviceId did = rule.deviceId(); 214 DeviceId did = rule.deviceId();
115 215
116 // check if this new rule is an update to an existing entry 216 // check if this new rule is an update to an existing entry
...@@ -128,15 +228,39 @@ public class DistributedFlowRuleStore ...@@ -128,15 +228,39 @@ public class DistributedFlowRuleStore
128 228
129 flowEntries.put(did, rule); 229 flowEntries.put(did, rule);
130 return null; 230 return null;
231 +
232 + // TODO: also update backup.
131 } 233 }
132 234
133 @Override 235 @Override
134 - public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) { 236 + public FlowRuleEvent removeFlowRule(FlowEntry rule) {
237 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
238 + if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
239 + // bypass and handle it locally
240 + return removeFlowRuleInternal(rule);
241 + }
242 +
243 + ClusterMessage message = new ClusterMessage(
244 + clusterService.getLocalNode().id(),
245 + FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
246 + SERIALIZER.encode(rule));
247 +
248 + try {
249 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
250 + return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
251 + } catch (IOException | TimeoutException e) {
252 + // FIXME: throw a FlowStoreException
253 + throw new RuntimeException(e);
254 + }
255 + }
256 +
257 + private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
135 // This is where one could mark a rule as removed and still keep it in the store. 258 // This is where one could mark a rule as removed and still keep it in the store.
136 if (flowEntries.remove(rule.deviceId(), rule)) { 259 if (flowEntries.remove(rule.deviceId(), rule)) {
137 return new FlowRuleEvent(RULE_REMOVED, rule); 260 return new FlowRuleEvent(RULE_REMOVED, rule);
138 } else { 261 } else {
139 return null; 262 return null;
140 } 263 }
264 + // TODO: also update backup.
141 } 265 }
142 } 266 }
......
1 +package org.onlab.onos.store.flow.impl;
2 +
3 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 +
5 +/**
6 + * MessageSubjects used by DistributedFlowRuleStore peer-peer communication.
7 + */
8 +public final class FlowStoreMessageSubjects {
9 + private FlowStoreMessageSubjects() {}
10 + public static final MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
11 + public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
12 + public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
13 + new MessageSubject("peer-forward-add-or-update-flow-rule");
14 + public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
15 +}
...@@ -26,6 +26,7 @@ import org.onlab.onos.net.Port; ...@@ -26,6 +26,7 @@ import org.onlab.onos.net.Port;
26 import org.onlab.onos.net.PortNumber; 26 import org.onlab.onos.net.PortNumber;
27 import org.onlab.onos.net.device.DefaultDeviceDescription; 27 import org.onlab.onos.net.device.DefaultDeviceDescription;
28 import org.onlab.onos.net.device.DefaultPortDescription; 28 import org.onlab.onos.net.device.DefaultPortDescription;
29 +import org.onlab.onos.net.flow.DefaultFlowRule;
29 import org.onlab.onos.net.host.DefaultHostDescription; 30 import org.onlab.onos.net.host.DefaultHostDescription;
30 import org.onlab.onos.net.host.HostDescription; 31 import org.onlab.onos.net.host.HostDescription;
31 import org.onlab.onos.net.link.DefaultLinkDescription; 32 import org.onlab.onos.net.link.DefaultLinkDescription;
...@@ -86,7 +87,8 @@ public final class KryoPoolUtil { ...@@ -86,7 +87,8 @@ public final class KryoPoolUtil {
86 Timestamp.class, 87 Timestamp.class,
87 HostId.class, 88 HostId.class,
88 HostDescription.class, 89 HostDescription.class,
89 - DefaultHostDescription.class 90 + DefaultHostDescription.class,
91 + DefaultFlowRule.class
90 ) 92 )
91 .register(URI.class, new URISerializer()) 93 .register(URI.class, new URISerializer())
92 .register(NodeId.class, new NodeIdSerializer()) 94 .register(NodeId.class, new NodeIdSerializer())
......