add basic backup to DistributedFlowRuleStore
Change-Id: I8eedf0cf30a2555d45145889b5ef210e826b0ac0
Showing
2 changed files
with
222 additions
and
13 deletions
... | @@ -13,7 +13,7 @@ public class FlowRuleBatchRequest { | ... | @@ -13,7 +13,7 @@ public class FlowRuleBatchRequest { |
13 | private final List<FlowEntry> toAdd; | 13 | private final List<FlowEntry> toAdd; |
14 | private final List<FlowEntry> toRemove; | 14 | private final List<FlowEntry> toRemove; |
15 | 15 | ||
16 | - public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) { | 16 | + public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) { |
17 | this.batchId = batchId; | 17 | this.batchId = batchId; |
18 | this.toAdd = Collections.unmodifiableList(toAdd); | 18 | this.toAdd = Collections.unmodifiableList(toAdd); |
19 | this.toRemove = Collections.unmodifiableList(toRemove); | 19 | this.toRemove = Collections.unmodifiableList(toRemove); | ... | ... |
1 | package org.onlab.onos.store.flow.impl; | 1 | package org.onlab.onos.store.flow.impl; |
2 | 2 | ||
3 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
3 | import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; | 4 | import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; |
4 | import static org.slf4j.LoggerFactory.getLogger; | 5 | import static org.slf4j.LoggerFactory.getLogger; |
5 | import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*; | 6 | import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*; |
... | @@ -10,6 +11,7 @@ import java.util.ArrayList; | ... | @@ -10,6 +11,7 @@ import java.util.ArrayList; |
10 | import java.util.Arrays; | 11 | import java.util.Arrays; |
11 | import java.util.Collection; | 12 | import java.util.Collection; |
12 | import java.util.Collections; | 13 | import java.util.Collections; |
14 | +import java.util.Map.Entry; | ||
13 | import java.util.Set; | 15 | import java.util.Set; |
14 | import java.util.concurrent.ExecutionException; | 16 | import java.util.concurrent.ExecutionException; |
15 | import java.util.concurrent.ExecutorService; | 17 | import java.util.concurrent.ExecutorService; |
... | @@ -27,6 +29,7 @@ import org.apache.felix.scr.annotations.Reference; | ... | @@ -27,6 +29,7 @@ import org.apache.felix.scr.annotations.Reference; |
27 | import org.apache.felix.scr.annotations.ReferenceCardinality; | 29 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
28 | import org.apache.felix.scr.annotations.Service; | 30 | import org.apache.felix.scr.annotations.Service; |
29 | import org.onlab.onos.cluster.ClusterService; | 31 | import org.onlab.onos.cluster.ClusterService; |
32 | +import org.onlab.onos.cluster.NodeId; | ||
30 | import org.onlab.onos.net.Device; | 33 | import org.onlab.onos.net.Device; |
31 | import org.onlab.onos.net.DeviceId; | 34 | import org.onlab.onos.net.DeviceId; |
32 | import org.onlab.onos.net.device.DeviceService; | 35 | import org.onlab.onos.net.device.DeviceService; |
... | @@ -34,6 +37,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation; | ... | @@ -34,6 +37,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation; |
34 | import org.onlab.onos.net.flow.DefaultFlowEntry; | 37 | import org.onlab.onos.net.flow.DefaultFlowEntry; |
35 | import org.onlab.onos.net.flow.FlowEntry; | 38 | import org.onlab.onos.net.flow.FlowEntry; |
36 | import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; | 39 | import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; |
40 | +import org.onlab.onos.net.flow.FlowId; | ||
37 | import org.onlab.onos.net.flow.FlowRule; | 41 | import org.onlab.onos.net.flow.FlowRule; |
38 | import org.onlab.onos.net.flow.FlowRuleBatchEntry; | 42 | import org.onlab.onos.net.flow.FlowRuleBatchEntry; |
39 | import org.onlab.onos.net.flow.FlowRuleBatchEvent; | 43 | import org.onlab.onos.net.flow.FlowRuleBatchEvent; |
... | @@ -45,12 +49,15 @@ import org.onlab.onos.net.flow.FlowRuleEvent.Type; | ... | @@ -45,12 +49,15 @@ import org.onlab.onos.net.flow.FlowRuleEvent.Type; |
45 | import org.onlab.onos.net.flow.FlowRuleStore; | 49 | import org.onlab.onos.net.flow.FlowRuleStore; |
46 | import org.onlab.onos.net.flow.FlowRuleStoreDelegate; | 50 | import org.onlab.onos.net.flow.FlowRuleStoreDelegate; |
47 | import org.onlab.onos.net.flow.StoredFlowEntry; | 51 | import org.onlab.onos.net.flow.StoredFlowEntry; |
48 | -import org.onlab.onos.store.AbstractStore; | ||
49 | import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; | 52 | import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; |
50 | import org.onlab.onos.store.cluster.messaging.ClusterMessage; | 53 | import org.onlab.onos.store.cluster.messaging.ClusterMessage; |
51 | import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; | 54 | import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; |
52 | import org.onlab.onos.store.flow.ReplicaInfo; | 55 | import org.onlab.onos.store.flow.ReplicaInfo; |
56 | +import org.onlab.onos.store.flow.ReplicaInfoEvent; | ||
57 | +import org.onlab.onos.store.flow.ReplicaInfoEventListener; | ||
53 | import org.onlab.onos.store.flow.ReplicaInfoService; | 58 | import org.onlab.onos.store.flow.ReplicaInfoService; |
59 | +import org.onlab.onos.store.hz.AbstractHazelcastStore; | ||
60 | +import org.onlab.onos.store.hz.SMap; | ||
54 | import org.onlab.onos.store.serializers.DistributedStoreSerializers; | 61 | import org.onlab.onos.store.serializers.DistributedStoreSerializers; |
55 | import org.onlab.onos.store.serializers.KryoSerializer; | 62 | import org.onlab.onos.store.serializers.KryoSerializer; |
56 | import org.onlab.util.KryoNamespace; | 63 | import org.onlab.util.KryoNamespace; |
... | @@ -59,13 +66,17 @@ import org.slf4j.Logger; | ... | @@ -59,13 +66,17 @@ import org.slf4j.Logger; |
59 | import com.google.common.base.Function; | 66 | import com.google.common.base.Function; |
60 | import com.google.common.cache.Cache; | 67 | import com.google.common.cache.Cache; |
61 | import com.google.common.cache.CacheBuilder; | 68 | import com.google.common.cache.CacheBuilder; |
69 | +import com.google.common.cache.CacheLoader; | ||
70 | +import com.google.common.cache.LoadingCache; | ||
62 | import com.google.common.collect.ArrayListMultimap; | 71 | import com.google.common.collect.ArrayListMultimap; |
72 | +import com.google.common.collect.ImmutableList; | ||
63 | import com.google.common.collect.ImmutableSet; | 73 | import com.google.common.collect.ImmutableSet; |
64 | import com.google.common.collect.Iterables; | 74 | import com.google.common.collect.Iterables; |
65 | import com.google.common.collect.Multimap; | 75 | import com.google.common.collect.Multimap; |
66 | import com.google.common.util.concurrent.Futures; | 76 | import com.google.common.util.concurrent.Futures; |
67 | import com.google.common.util.concurrent.ListenableFuture; | 77 | import com.google.common.util.concurrent.ListenableFuture; |
68 | import com.google.common.util.concurrent.SettableFuture; | 78 | import com.google.common.util.concurrent.SettableFuture; |
79 | +import com.hazelcast.core.IMap; | ||
69 | 80 | ||
70 | /** | 81 | /** |
71 | * Manages inventory of flow rules using a distributed state management protocol. | 82 | * Manages inventory of flow rules using a distributed state management protocol. |
... | @@ -73,7 +84,7 @@ import com.google.common.util.concurrent.SettableFuture; | ... | @@ -73,7 +84,7 @@ import com.google.common.util.concurrent.SettableFuture; |
73 | @Component(immediate = true) | 84 | @Component(immediate = true) |
74 | @Service | 85 | @Service |
75 | public class DistributedFlowRuleStore | 86 | public class DistributedFlowRuleStore |
76 | - extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> | 87 | + extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> |
77 | implements FlowRuleStore { | 88 | implements FlowRuleStore { |
78 | 89 | ||
79 | private final Logger log = getLogger(getClass()); | 90 | private final Logger log = getLogger(getClass()); |
... | @@ -82,8 +93,6 @@ public class DistributedFlowRuleStore | ... | @@ -82,8 +93,6 @@ public class DistributedFlowRuleStore |
82 | private final Multimap<DeviceId, StoredFlowEntry> flowEntries = | 93 | private final Multimap<DeviceId, StoredFlowEntry> flowEntries = |
83 | ArrayListMultimap.<DeviceId, StoredFlowEntry>create(); | 94 | ArrayListMultimap.<DeviceId, StoredFlowEntry>create(); |
84 | 95 | ||
85 | - private final Multimap<Short, FlowRule> flowEntriesById = | ||
86 | - ArrayListMultimap.<Short, FlowRule>create(); | ||
87 | 96 | ||
88 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 97 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
89 | protected ReplicaInfoService replicaInfoManager; | 98 | protected ReplicaInfoService replicaInfoManager; |
... | @@ -109,10 +118,17 @@ public class DistributedFlowRuleStore | ... | @@ -109,10 +118,17 @@ public class DistributedFlowRuleStore |
109 | //.removalListener(listener) | 118 | //.removalListener(listener) |
110 | .build(); | 119 | .build(); |
111 | 120 | ||
121 | + private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps; | ||
122 | + | ||
112 | 123 | ||
113 | private final ExecutorService futureListeners = | 124 | private final ExecutorService futureListeners = |
114 | Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders")); | 125 | Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders")); |
115 | 126 | ||
127 | + private final ExecutorService backupExecutors = | ||
128 | + Executors.newSingleThreadExecutor(namedThreads("async-backups")); | ||
129 | + | ||
130 | + // TODO make this configurable | ||
131 | + private boolean syncBackup = false; | ||
116 | 132 | ||
117 | protected static final KryoSerializer SERIALIZER = new KryoSerializer() { | 133 | protected static final KryoSerializer SERIALIZER = new KryoSerializer() { |
118 | @Override | 134 | @Override |
... | @@ -127,8 +143,20 @@ public class DistributedFlowRuleStore | ... | @@ -127,8 +143,20 @@ public class DistributedFlowRuleStore |
127 | // TODO: make this configurable | 143 | // TODO: make this configurable |
128 | private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; | 144 | private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; |
129 | 145 | ||
146 | + private ReplicaInfoEventListener replicaInfoEventListener; | ||
147 | + | ||
148 | + @Override | ||
130 | @Activate | 149 | @Activate |
131 | public void activate() { | 150 | public void activate() { |
151 | + | ||
152 | + super.serializer = SERIALIZER; | ||
153 | + super.theInstance = storeService.getHazelcastInstance(); | ||
154 | + | ||
155 | + // Cache to create SMap on demand | ||
156 | + smaps = CacheBuilder.newBuilder() | ||
157 | + .softValues() | ||
158 | + .build(new SMapLoader()); | ||
159 | + | ||
132 | clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() { | 160 | clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() { |
133 | 161 | ||
134 | @Override | 162 | @Override |
... | @@ -182,11 +210,16 @@ public class DistributedFlowRuleStore | ... | @@ -182,11 +210,16 @@ public class DistributedFlowRuleStore |
182 | } | 210 | } |
183 | }); | 211 | }); |
184 | 212 | ||
213 | + replicaInfoEventListener = new InternalReplicaInfoEventListener(); | ||
214 | + | ||
215 | + replicaInfoManager.addListener(replicaInfoEventListener); | ||
216 | + | ||
185 | log.info("Started"); | 217 | log.info("Started"); |
186 | } | 218 | } |
187 | 219 | ||
188 | @Deactivate | 220 | @Deactivate |
189 | public void deactivate() { | 221 | public void deactivate() { |
222 | + replicaInfoManager.removeListener(replicaInfoEventListener); | ||
190 | log.info("Stopped"); | 223 | log.info("Stopped"); |
191 | } | 224 | } |
192 | 225 | ||
... | @@ -276,8 +309,10 @@ public class DistributedFlowRuleStore | ... | @@ -276,8 +309,10 @@ public class DistributedFlowRuleStore |
276 | storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)))); | 309 | storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)))); |
277 | } | 310 | } |
278 | 311 | ||
312 | + // FIXME document that all of the FlowEntries must be about same device | ||
279 | @Override | 313 | @Override |
280 | public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) { | 314 | public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) { |
315 | + | ||
281 | if (operation.getOperations().isEmpty()) { | 316 | if (operation.getOperations().isEmpty()) { |
282 | return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); | 317 | return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); |
283 | } | 318 | } |
... | @@ -313,12 +348,17 @@ public class DistributedFlowRuleStore | ... | @@ -313,12 +348,17 @@ public class DistributedFlowRuleStore |
313 | } | 348 | } |
314 | 349 | ||
315 | private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) { | 350 | private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) { |
316 | - List<FlowEntry> toRemove = new ArrayList<>(); | 351 | + final List<StoredFlowEntry> toRemove = new ArrayList<>(); |
317 | - List<FlowEntry> toAdd = new ArrayList<>(); | 352 | + final List<StoredFlowEntry> toAdd = new ArrayList<>(); |
318 | - // TODO: backup changes to hazelcast map | 353 | + DeviceId did = null; |
354 | + | ||
355 | + | ||
319 | for (FlowRuleBatchEntry batchEntry : operation.getOperations()) { | 356 | for (FlowRuleBatchEntry batchEntry : operation.getOperations()) { |
320 | FlowRule flowRule = batchEntry.getTarget(); | 357 | FlowRule flowRule = batchEntry.getTarget(); |
321 | FlowRuleOperation op = batchEntry.getOperator(); | 358 | FlowRuleOperation op = batchEntry.getOperator(); |
359 | + if (did == null) { | ||
360 | + did = flowRule.deviceId(); | ||
361 | + } | ||
322 | if (op.equals(FlowRuleOperation.REMOVE)) { | 362 | if (op.equals(FlowRuleOperation.REMOVE)) { |
323 | StoredFlowEntry entry = getFlowEntryInternal(flowRule); | 363 | StoredFlowEntry entry = getFlowEntryInternal(flowRule); |
324 | if (entry != null) { | 364 | if (entry != null) { |
... | @@ -330,7 +370,6 @@ public class DistributedFlowRuleStore | ... | @@ -330,7 +370,6 @@ public class DistributedFlowRuleStore |
330 | DeviceId deviceId = flowRule.deviceId(); | 370 | DeviceId deviceId = flowRule.deviceId(); |
331 | if (!flowEntries.containsEntry(deviceId, flowEntry)) { | 371 | if (!flowEntries.containsEntry(deviceId, flowEntry)) { |
332 | flowEntries.put(deviceId, flowEntry); | 372 | flowEntries.put(deviceId, flowEntry); |
333 | - flowEntriesById.put(flowRule.appId(), flowEntry); | ||
334 | toAdd.add(flowEntry); | 373 | toAdd.add(flowEntry); |
335 | } | 374 | } |
336 | } | 375 | } |
... | @@ -339,14 +378,39 @@ public class DistributedFlowRuleStore | ... | @@ -339,14 +378,39 @@ public class DistributedFlowRuleStore |
339 | return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); | 378 | return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); |
340 | } | 379 | } |
341 | 380 | ||
381 | + // create remote backup copies | ||
382 | + final DeviceId deviceId = did; | ||
383 | + updateBackup(deviceId, toAdd, toRemove); | ||
384 | + | ||
342 | SettableFuture<CompletedBatchOperation> r = SettableFuture.create(); | 385 | SettableFuture<CompletedBatchOperation> r = SettableFuture.create(); |
343 | final int batchId = localBatchIdGen.incrementAndGet(); | 386 | final int batchId = localBatchIdGen.incrementAndGet(); |
344 | 387 | ||
345 | pendingFutures.put(batchId, r); | 388 | pendingFutures.put(batchId, r); |
346 | notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove))); | 389 | notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove))); |
390 | + | ||
347 | return r; | 391 | return r; |
348 | } | 392 | } |
349 | 393 | ||
394 | + private void updateBackup(final DeviceId deviceId, | ||
395 | + final List<StoredFlowEntry> toAdd, | ||
396 | + final List<? extends FlowRule> list) { | ||
397 | + | ||
398 | + Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list)); | ||
399 | + | ||
400 | + if (syncBackup) { | ||
401 | + // wait for backup to complete | ||
402 | + try { | ||
403 | + submit.get(); | ||
404 | + } catch (InterruptedException | ExecutionException e) { | ||
405 | + log.error("Failed to create backups", e); | ||
406 | + } | ||
407 | + } | ||
408 | + } | ||
409 | + | ||
410 | + private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) { | ||
411 | + updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList()); | ||
412 | + } | ||
413 | + | ||
350 | @Override | 414 | @Override |
351 | public void deleteFlowRule(FlowRule rule) { | 415 | public void deleteFlowRule(FlowRule rule) { |
352 | storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule)))); | 416 | storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule)))); |
... | @@ -365,7 +429,7 @@ public class DistributedFlowRuleStore | ... | @@ -365,7 +429,7 @@ public class DistributedFlowRuleStore |
365 | } | 429 | } |
366 | 430 | ||
367 | private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) { | 431 | private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) { |
368 | - DeviceId did = rule.deviceId(); | 432 | + final DeviceId did = rule.deviceId(); |
369 | 433 | ||
370 | // check if this new rule is an update to an existing entry | 434 | // check if this new rule is an update to an existing entry |
371 | StoredFlowEntry stored = getFlowEntryInternal(rule); | 435 | StoredFlowEntry stored = getFlowEntryInternal(rule); |
... | @@ -375,16 +439,18 @@ public class DistributedFlowRuleStore | ... | @@ -375,16 +439,18 @@ public class DistributedFlowRuleStore |
375 | stored.setPackets(rule.packets()); | 439 | stored.setPackets(rule.packets()); |
376 | if (stored.state() == FlowEntryState.PENDING_ADD) { | 440 | if (stored.state() == FlowEntryState.PENDING_ADD) { |
377 | stored.setState(FlowEntryState.ADDED); | 441 | stored.setState(FlowEntryState.ADDED); |
442 | + // update backup. | ||
443 | + updateBackup(did, Arrays.asList(stored)); | ||
378 | return new FlowRuleEvent(Type.RULE_ADDED, rule); | 444 | return new FlowRuleEvent(Type.RULE_ADDED, rule); |
379 | } | 445 | } |
380 | return new FlowRuleEvent(Type.RULE_UPDATED, rule); | 446 | return new FlowRuleEvent(Type.RULE_UPDATED, rule); |
381 | } | 447 | } |
382 | 448 | ||
383 | // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore | 449 | // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore |
450 | + // TODO: also update backup. | ||
384 | flowEntries.put(did, new DefaultFlowEntry(rule)); | 451 | flowEntries.put(did, new DefaultFlowEntry(rule)); |
385 | return null; | 452 | return null; |
386 | 453 | ||
387 | - // TODO: also update backup. | ||
388 | } | 454 | } |
389 | 455 | ||
390 | @Override | 456 | @Override |
... | @@ -401,13 +467,15 @@ public class DistributedFlowRuleStore | ... | @@ -401,13 +467,15 @@ public class DistributedFlowRuleStore |
401 | } | 467 | } |
402 | 468 | ||
403 | private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { | 469 | private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { |
470 | + final DeviceId deviceId = rule.deviceId(); | ||
404 | // This is where one could mark a rule as removed and still keep it in the store. | 471 | // This is where one could mark a rule as removed and still keep it in the store. |
405 | - if (flowEntries.remove(rule.deviceId(), rule)) { | 472 | + final boolean removed = flowEntries.remove(deviceId, rule); |
473 | + updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule)); | ||
474 | + if (removed) { | ||
406 | return new FlowRuleEvent(RULE_REMOVED, rule); | 475 | return new FlowRuleEvent(RULE_REMOVED, rule); |
407 | } else { | 476 | } else { |
408 | return null; | 477 | return null; |
409 | } | 478 | } |
410 | - // TODO: also update backup. | ||
411 | } | 479 | } |
412 | 480 | ||
413 | @Override | 481 | @Override |
... | @@ -421,4 +489,145 @@ public class DistributedFlowRuleStore | ... | @@ -421,4 +489,145 @@ public class DistributedFlowRuleStore |
421 | } | 489 | } |
422 | notifyDelegate(event); | 490 | notifyDelegate(event); |
423 | } | 491 | } |
492 | + | ||
493 | + private synchronized void loadFromBackup(final DeviceId did) { | ||
494 | + // should relax synchronized condition | ||
495 | + | ||
496 | + try { | ||
497 | + log.info("Loading FlowRules for {} from backups", did); | ||
498 | + SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did); | ||
499 | + for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e | ||
500 | + : backupFlowTable.entrySet()) { | ||
501 | + | ||
502 | + // TODO: should we be directly updating internal structure or | ||
503 | + // should we be triggering event? | ||
504 | + log.debug("loading {}", e.getValue()); | ||
505 | + for (StoredFlowEntry entry : e.getValue()) { | ||
506 | + flowEntries.remove(did, entry); | ||
507 | + flowEntries.put(did, entry); | ||
508 | + } | ||
509 | + } | ||
510 | + } catch (ExecutionException e) { | ||
511 | + log.error("Failed to load backup flowtable for {}", did, e); | ||
512 | + } | ||
513 | + } | ||
514 | + | ||
515 | + private synchronized void removeFromPrimary(final DeviceId did) { | ||
516 | + Collection<StoredFlowEntry> removed = flowEntries.removeAll(did); | ||
517 | + log.debug("removedFromPrimary {}", removed); | ||
518 | + } | ||
519 | + | ||
520 | + private final class SMapLoader | ||
521 | + extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> { | ||
522 | + | ||
523 | + @Override | ||
524 | + public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id) | ||
525 | + throws Exception { | ||
526 | + IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString()); | ||
527 | + return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER); | ||
528 | + } | ||
529 | + } | ||
530 | + | ||
531 | + private final class InternalReplicaInfoEventListener | ||
532 | + implements ReplicaInfoEventListener { | ||
533 | + | ||
534 | + @Override | ||
535 | + public void event(ReplicaInfoEvent event) { | ||
536 | + final NodeId local = clusterService.getLocalNode().id(); | ||
537 | + final DeviceId did = event.subject(); | ||
538 | + final ReplicaInfo rInfo = event.replicaInfo(); | ||
539 | + | ||
540 | + switch (event.type()) { | ||
541 | + case MASTER_CHANGED: | ||
542 | + if (local.equals(rInfo.master().orNull())) { | ||
543 | + // This node is the new master, populate local structure | ||
544 | + // from backup | ||
545 | + loadFromBackup(did); | ||
546 | + } else { | ||
547 | + // This node is no longer the master holder, | ||
548 | + // clean local structure | ||
549 | + removeFromPrimary(did); | ||
550 | + // FIXME: probably should stop pending backup activities in | ||
551 | + // executors to avoid overwriting with old value | ||
552 | + } | ||
553 | + break; | ||
554 | + default: | ||
555 | + break; | ||
556 | + | ||
557 | + } | ||
558 | + } | ||
559 | + } | ||
560 | + | ||
561 | + // Task to update FlowEntries in backup HZ store | ||
562 | + private final class UpdateBackup implements Runnable { | ||
563 | + | ||
564 | + private final DeviceId deviceId; | ||
565 | + private final List<StoredFlowEntry> toAdd; | ||
566 | + private final List<? extends FlowRule> toRemove; | ||
567 | + | ||
568 | + public UpdateBackup(DeviceId deviceId, | ||
569 | + List<StoredFlowEntry> toAdd, | ||
570 | + List<? extends FlowRule> list) { | ||
571 | + this.deviceId = checkNotNull(deviceId); | ||
572 | + this.toAdd = checkNotNull(toAdd); | ||
573 | + this.toRemove = checkNotNull(list); | ||
574 | + } | ||
575 | + | ||
576 | + @Override | ||
577 | + public void run() { | ||
578 | + try { | ||
579 | + log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove); | ||
580 | + final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId); | ||
581 | + // Following should be rewritten using async APIs | ||
582 | + for (StoredFlowEntry entry : toAdd) { | ||
583 | + final FlowId id = entry.id(); | ||
584 | + ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id); | ||
585 | + List<StoredFlowEntry> list = new ArrayList<>(); | ||
586 | + if (original != null) { | ||
587 | + list.addAll(original); | ||
588 | + } | ||
589 | + | ||
590 | + list.remove(entry); | ||
591 | + list.add(entry); | ||
592 | + | ||
593 | + ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list); | ||
594 | + boolean success; | ||
595 | + if (original == null) { | ||
596 | + success = (backupFlowTable.putIfAbsent(id, newValue) == null); | ||
597 | + } else { | ||
598 | + success = backupFlowTable.replace(id, original, newValue); | ||
599 | + } | ||
600 | + // TODO retry? | ||
601 | + if (!success) { | ||
602 | + log.error("Updating backup failed."); | ||
603 | + } | ||
604 | + } | ||
605 | + for (FlowRule entry : toRemove) { | ||
606 | + final FlowId id = entry.id(); | ||
607 | + ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id); | ||
608 | + List<StoredFlowEntry> list = new ArrayList<>(); | ||
609 | + if (original != null) { | ||
610 | + list.addAll(original); | ||
611 | + } | ||
612 | + | ||
613 | + list.remove(entry); | ||
614 | + | ||
615 | + ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list); | ||
616 | + boolean success; | ||
617 | + if (original == null) { | ||
618 | + success = (backupFlowTable.putIfAbsent(id, newValue) == null); | ||
619 | + } else { | ||
620 | + success = backupFlowTable.replace(id, original, newValue); | ||
621 | + } | ||
622 | + // TODO retry? | ||
623 | + if (!success) { | ||
624 | + log.error("Updating backup failed."); | ||
625 | + } | ||
626 | + } | ||
627 | + } catch (ExecutionException e) { | ||
628 | + log.error("Failed to write to backups", e); | ||
629 | + } | ||
630 | + | ||
631 | + } | ||
632 | + } | ||
424 | } | 633 | } | ... | ... |
-
Please register or login to post a comment