Madan Jampani
Committed by Gerrit Code Review

ONOS-1286: Backing out chages to use ECMap for dist flow rule store backups

Change-Id: I93a60ef183aa335fecf63b97d300830369d2b9d7
1 -/* 1 + /*
2 * Copyright 2014 Open Networking Laboratory 2 * Copyright 2014 Open Networking Laboratory
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
...@@ -15,11 +15,15 @@ ...@@ -15,11 +15,15 @@
15 */ 15 */
16 package org.onosproject.store.flow.impl; 16 package org.onosproject.store.flow.impl;
17 17
18 +import com.google.common.cache.CacheBuilder;
19 +import com.google.common.cache.CacheLoader;
20 +import com.google.common.cache.LoadingCache;
18 import com.google.common.collect.ImmutableList; 21 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Iterables; 22 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Maps; 23 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets; 24 import com.google.common.collect.Sets;
22 -import org.apache.commons.lang.math.RandomUtils; 25 +import com.hazelcast.core.IMap;
26 +
23 import org.apache.felix.scr.annotations.Activate; 27 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component; 28 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate; 29 import org.apache.felix.scr.annotations.Deactivate;
...@@ -28,7 +32,9 @@ import org.apache.felix.scr.annotations.Property; ...@@ -28,7 +32,9 @@ import org.apache.felix.scr.annotations.Property;
28 import org.apache.felix.scr.annotations.Reference; 32 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality; 33 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service; 34 import org.apache.felix.scr.annotations.Service;
35 +import org.onlab.util.BoundedThreadPool;
31 import org.onlab.util.KryoNamespace; 36 import org.onlab.util.KryoNamespace;
37 +import org.onlab.util.NewConcurrentHashMap;
32 import org.onosproject.cfg.ComponentConfigService; 38 import org.onosproject.cfg.ComponentConfigService;
33 import org.onosproject.cluster.ClusterService; 39 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.NodeId; 40 import org.onosproject.cluster.NodeId;
...@@ -36,7 +42,6 @@ import org.onosproject.core.CoreService; ...@@ -36,7 +42,6 @@ import org.onosproject.core.CoreService;
36 import org.onosproject.core.IdGenerator; 42 import org.onosproject.core.IdGenerator;
37 import org.onosproject.net.Device; 43 import org.onosproject.net.Device;
38 import org.onosproject.net.DeviceId; 44 import org.onosproject.net.DeviceId;
39 -import org.onosproject.net.device.DeviceClockService;
40 import org.onosproject.net.device.DeviceService; 45 import org.onosproject.net.device.DeviceService;
41 import org.onosproject.net.flow.CompletedBatchOperation; 46 import org.onosproject.net.flow.CompletedBatchOperation;
42 import org.onosproject.net.flow.DefaultFlowEntry; 47 import org.onosproject.net.flow.DefaultFlowEntry;
...@@ -55,19 +60,15 @@ import org.onosproject.net.flow.FlowRuleService; ...@@ -55,19 +60,15 @@ import org.onosproject.net.flow.FlowRuleService;
55 import org.onosproject.net.flow.FlowRuleStore; 60 import org.onosproject.net.flow.FlowRuleStore;
56 import org.onosproject.net.flow.FlowRuleStoreDelegate; 61 import org.onosproject.net.flow.FlowRuleStoreDelegate;
57 import org.onosproject.net.flow.StoredFlowEntry; 62 import org.onosproject.net.flow.StoredFlowEntry;
58 -import org.onosproject.store.AbstractStore;
59 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 63 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60 import org.onosproject.store.cluster.messaging.ClusterMessage; 64 import org.onosproject.store.cluster.messaging.ClusterMessage;
61 import org.onosproject.store.cluster.messaging.ClusterMessageHandler; 65 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62 -import org.onosproject.store.ecmap.EventuallyConsistentMap;
63 -import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
64 import org.onosproject.store.flow.ReplicaInfo; 66 import org.onosproject.store.flow.ReplicaInfo;
65 import org.onosproject.store.flow.ReplicaInfoEvent; 67 import org.onosproject.store.flow.ReplicaInfoEvent;
66 import org.onosproject.store.flow.ReplicaInfoEventListener; 68 import org.onosproject.store.flow.ReplicaInfoEventListener;
67 import org.onosproject.store.flow.ReplicaInfoService; 69 import org.onosproject.store.flow.ReplicaInfoService;
68 -import org.onosproject.store.impl.ClockService; 70 +import org.onosproject.store.hz.AbstractHazelcastStore;
69 -import org.onosproject.store.impl.MastershipBasedTimestamp; 71 +import org.onosproject.store.hz.SMap;
70 -import org.onosproject.store.serializers.KryoNamespaces;
71 import org.onosproject.store.serializers.KryoSerializer; 72 import org.onosproject.store.serializers.KryoSerializer;
72 import org.onosproject.store.serializers.StoreSerializer; 73 import org.onosproject.store.serializers.StoreSerializer;
73 import org.onosproject.store.serializers.impl.DistributedStoreSerializers; 74 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
...@@ -75,12 +76,14 @@ import org.osgi.service.component.ComponentContext; ...@@ -75,12 +76,14 @@ import org.osgi.service.component.ComponentContext;
75 import org.slf4j.Logger; 76 import org.slf4j.Logger;
76 77
77 import java.io.IOException; 78 import java.io.IOException;
79 +import java.util.ArrayList;
78 import java.util.Arrays; 80 import java.util.Arrays;
79 -import java.util.Collection;
80 import java.util.Collections; 81 import java.util.Collections;
81 import java.util.Dictionary; 82 import java.util.Dictionary;
83 +import java.util.HashSet;
82 import java.util.List; 84 import java.util.List;
83 import java.util.Map; 85 import java.util.Map;
86 +import java.util.Map.Entry;
84 import java.util.Set; 87 import java.util.Set;
85 import java.util.concurrent.ConcurrentHashMap; 88 import java.util.concurrent.ConcurrentHashMap;
86 import java.util.concurrent.ConcurrentMap; 89 import java.util.concurrent.ConcurrentMap;
...@@ -93,8 +96,9 @@ import java.util.concurrent.TimeUnit; ...@@ -93,8 +96,9 @@ import java.util.concurrent.TimeUnit;
93 import java.util.concurrent.TimeoutException; 96 import java.util.concurrent.TimeoutException;
94 import java.util.stream.Collectors; 97 import java.util.stream.Collectors;
95 98
99 +import static com.google.common.base.Preconditions.checkNotNull;
100 +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
96 import static com.google.common.base.Strings.isNullOrEmpty; 101 import static com.google.common.base.Strings.isNullOrEmpty;
97 -import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
98 import static org.onlab.util.Tools.get; 102 import static org.onlab.util.Tools.get;
99 import static org.onlab.util.Tools.groupedThreads; 103 import static org.onlab.util.Tools.groupedThreads;
100 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 104 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
...@@ -107,13 +111,13 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -107,13 +111,13 @@ import static org.slf4j.LoggerFactory.getLogger;
107 @Component(immediate = true) 111 @Component(immediate = true)
108 @Service 112 @Service
109 public class DistributedFlowRuleStore 113 public class DistributedFlowRuleStore
110 - extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> 114 + extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
111 implements FlowRuleStore { 115 implements FlowRuleStore {
112 116
113 private final Logger log = getLogger(getClass()); 117 private final Logger log = getLogger(getClass());
114 118
115 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; 119 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
116 - private static final boolean DEFAULT_BACKUP_ENABLED = false; 120 + private static final boolean DEFAULT_BACKUP_ENABLED = true;
117 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; 121 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
118 122
119 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE, 123 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
...@@ -124,7 +128,10 @@ public class DistributedFlowRuleStore ...@@ -124,7 +128,10 @@ public class DistributedFlowRuleStore
124 label = "Indicates whether backups are enabled or not") 128 label = "Indicates whether backups are enabled or not")
125 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED; 129 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
126 130
127 - private InternalFlowTable flowTable; 131 + private InternalFlowTable flowTable = new InternalFlowTable();
132 +
133 + /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
134 + flowEntries = new ConcurrentHashMap<>();*/
128 135
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ReplicaInfoService replicaInfoManager; 137 protected ReplicaInfoService replicaInfoManager;
...@@ -139,18 +146,24 @@ public class DistributedFlowRuleStore ...@@ -139,18 +146,24 @@ public class DistributedFlowRuleStore
139 protected DeviceService deviceService; 146 protected DeviceService deviceService;
140 147
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 - protected DeviceClockService deviceClockService;
143 -
144 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected CoreService coreService; 149 protected CoreService coreService;
146 150
147 - @Reference(cardinality = MANDATORY_UNARY) 151 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ComponentConfigService configService; 152 protected ComponentConfigService configService;
149 153
150 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap(); 154 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
151 155
156 + // Cache of SMaps used for backup data. each SMap contain device flow table
157 + private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
158 +
152 private ExecutorService messageHandlingExecutor; 159 private ExecutorService messageHandlingExecutor;
153 160
161 + private final ExecutorService backupExecutors =
162 + BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
163 + //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
164 +
165 + private boolean syncBackup = false;
166 +
154 protected static final StoreSerializer SERIALIZER = new KryoSerializer() { 167 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
155 @Override 168 @Override
156 protected void setupKryoPool() { 169 protected void setupKryoPool() {
...@@ -170,11 +183,16 @@ public class DistributedFlowRuleStore ...@@ -170,11 +183,16 @@ public class DistributedFlowRuleStore
170 @Activate 183 @Activate
171 public void activate(ComponentContext context) { 184 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass()); 185 configService.registerProperties(getClass());
173 - 186 + super.serializer = SERIALIZER;
174 - flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled); 187 + super.theInstance = storeService.getHazelcastInstance();
175 188
176 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC); 189 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
177 190
191 + // Cache to create SMap on demand
192 + smaps = CacheBuilder.newBuilder()
193 + .softValues()
194 + .build(new SMapLoader());
195 +
178 final NodeId local = clusterService.getLocalNode().id(); 196 final NodeId local = clusterService.getLocalNode().id();
179 197
180 messageHandlingExecutor = Executors.newFixedThreadPool( 198 messageHandlingExecutor = Executors.newFixedThreadPool(
...@@ -212,7 +230,7 @@ public class DistributedFlowRuleStore ...@@ -212,7 +230,7 @@ public class DistributedFlowRuleStore
212 public void handle(ClusterMessage message) { 230 public void handle(ClusterMessage message) {
213 DeviceId deviceId = SERIALIZER.decode(message.payload()); 231 DeviceId deviceId = SERIALIZER.decode(message.payload());
214 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender()); 232 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
215 - Set<StoredFlowEntry> flowEntries = flowTable.getFlowEntries(deviceId); 233 + Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
216 try { 234 try {
217 message.respond(SERIALIZER.encode(flowEntries)); 235 message.respond(SERIALIZER.encode(flowEntries));
218 } catch (IOException e) { 236 } catch (IOException e) {
...@@ -282,8 +300,6 @@ public class DistributedFlowRuleStore ...@@ -282,8 +300,6 @@ public class DistributedFlowRuleStore
282 if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) { 300 if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
283 msgHandlerPoolSize = newPoolSize; 301 msgHandlerPoolSize = newPoolSize;
284 backupEnabled = newBackupEnabled; 302 backupEnabled = newBackupEnabled;
285 - // reconfigure the store
286 - flowTable.withBackupsEnabled(backupEnabled);
287 ExecutorService oldMsgHandler = messageHandlingExecutor; 303 ExecutorService oldMsgHandler = messageHandlingExecutor;
288 messageHandlingExecutor = Executors.newFixedThreadPool( 304 messageHandlingExecutor = Executors.newFixedThreadPool(
289 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers")); 305 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
...@@ -297,6 +313,7 @@ public class DistributedFlowRuleStore ...@@ -297,6 +313,7 @@ public class DistributedFlowRuleStore
297 prefix, msgHandlerPoolSize, backupEnabled); 313 prefix, msgHandlerPoolSize, backupEnabled);
298 } 314 }
299 315
316 +
300 // This is not a efficient operation on a distributed sharded 317 // This is not a efficient operation on a distributed sharded
301 // flow store. We need to revisit the need for this operation or at least 318 // flow store. We need to revisit the need for this operation or at least
302 // make it device specific. 319 // make it device specific.
...@@ -354,7 +371,7 @@ public class DistributedFlowRuleStore ...@@ -354,7 +371,7 @@ public class DistributedFlowRuleStore
354 } 371 }
355 372
356 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 373 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
357 - return flowTable.getFlowEntries(deviceId).stream().collect(Collectors.toSet()); 374 + return flowTable.getFlowEntries(deviceId);
358 } 375 }
359 376
360 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", 377 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
...@@ -451,6 +468,7 @@ public class DistributedFlowRuleStore ...@@ -451,6 +468,7 @@ public class DistributedFlowRuleStore
451 new CompletedBatchOperation(true, Collections.emptySet(), did))); 468 new CompletedBatchOperation(true, Collections.emptySet(), did)));
452 return; 469 return;
453 } 470 }
471 + updateBackup(did, currentOps);
454 472
455 notifyDelegate(FlowRuleBatchEvent.requested(new 473 notifyDelegate(FlowRuleBatchEvent.requested(new
456 FlowRuleBatchRequest(operation.id(), 474 FlowRuleBatchRequest(operation.id(),
...@@ -489,6 +507,23 @@ public class DistributedFlowRuleStore ...@@ -489,6 +507,23 @@ public class DistributedFlowRuleStore
489 ).filter(op -> op != null).collect(Collectors.toSet()); 507 ).filter(op -> op != null).collect(Collectors.toSet());
490 } 508 }
491 509
510 + private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
511 + if (!backupEnabled) {
512 + return;
513 + }
514 +
515 + Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
516 +
517 + if (syncBackup) {
518 + // wait for backup to complete
519 + try {
520 + backup.get();
521 + } catch (InterruptedException | ExecutionException e) {
522 + log.error("Failed to create backups", e);
523 + }
524 + }
525 + }
526 +
492 @Override 527 @Override
493 public void deleteFlowRule(FlowRule rule) { 528 public void deleteFlowRule(FlowRule rule) {
494 storeBatch( 529 storeBatch(
...@@ -504,7 +539,7 @@ public class DistributedFlowRuleStore ...@@ -504,7 +539,7 @@ public class DistributedFlowRuleStore
504 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 539 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
505 final NodeId localId = clusterService.getLocalNode().id(); 540 final NodeId localId = clusterService.getLocalNode().id();
506 if (localId.equals(replicaInfo.master().orNull())) { 541 if (localId.equals(replicaInfo.master().orNull())) {
507 - return addOrUpdateFlowRuleInternal((StoredFlowEntry) rule); 542 + return addOrUpdateFlowRuleInternal(rule);
508 } 543 }
509 544
510 log.warn("Tried to update FlowRule {} state," 545 log.warn("Tried to update FlowRule {} state,"
...@@ -512,7 +547,10 @@ public class DistributedFlowRuleStore ...@@ -512,7 +547,10 @@ public class DistributedFlowRuleStore
512 return null; 547 return null;
513 } 548 }
514 549
515 - private FlowRuleEvent addOrUpdateFlowRuleInternal(StoredFlowEntry rule) { 550 + private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
551 + final DeviceId did = rule.deviceId();
552 +
553 +
516 // check if this new rule is an update to an existing entry 554 // check if this new rule is an update to an existing entry
517 StoredFlowEntry stored = flowTable.getFlowEntry(rule); 555 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
518 if (stored != null) { 556 if (stored != null) {
...@@ -521,15 +559,21 @@ public class DistributedFlowRuleStore ...@@ -521,15 +559,21 @@ public class DistributedFlowRuleStore
521 stored.setPackets(rule.packets()); 559 stored.setPackets(rule.packets());
522 if (stored.state() == FlowEntryState.PENDING_ADD) { 560 if (stored.state() == FlowEntryState.PENDING_ADD) {
523 stored.setState(FlowEntryState.ADDED); 561 stored.setState(FlowEntryState.ADDED);
562 + FlowRuleBatchEntry entry =
563 + new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
564 + updateBackup(did, Sets.newHashSet(entry));
524 return new FlowRuleEvent(Type.RULE_ADDED, rule); 565 return new FlowRuleEvent(Type.RULE_ADDED, rule);
525 } 566 }
526 return new FlowRuleEvent(Type.RULE_UPDATED, rule); 567 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
527 } 568 }
528 569
529 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore 570 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
571 + // TODO: also update backup if the behavior is correct.
530 flowTable.add(rule); 572 flowTable.add(rule);
531 573
574 +
532 return null; 575 return null;
576 +
533 } 577 }
534 578
535 @Override 579 @Override
...@@ -570,6 +614,9 @@ public class DistributedFlowRuleStore ...@@ -570,6 +614,9 @@ public class DistributedFlowRuleStore
570 final DeviceId deviceId = rule.deviceId(); 614 final DeviceId deviceId = rule.deviceId();
571 // This is where one could mark a rule as removed and still keep it in the store. 615 // This is where one could mark a rule as removed and still keep it in the store.
572 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule); 616 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
617 + FlowRuleBatchEntry entry =
618 + new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
619 + updateBackup(deviceId, Sets.newHashSet(entry));
573 if (removed) { 620 if (removed) {
574 return new FlowRuleEvent(RULE_REMOVED, rule); 621 return new FlowRuleEvent(RULE_REMOVED, rule);
575 } else { 622 } else {
...@@ -596,10 +643,35 @@ public class DistributedFlowRuleStore ...@@ -596,10 +643,35 @@ public class DistributedFlowRuleStore
596 } 643 }
597 } 644 }
598 645
646 + private void loadFromBackup(final DeviceId did) {
647 + if (!backupEnabled) {
648 + return;
649 + }
650 + log.info("We are now the master for {}. Will load flow rules from backup", did);
651 + try {
652 + log.debug("Loading FlowRules for {} from backups", did);
653 + SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
654 + for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
655 + : backupFlowTable.entrySet()) {
656 +
657 + log.trace("loading {}", e.getValue());
658 + for (StoredFlowEntry entry : e.getValue()) {
659 + flowTable.getFlowEntriesById(entry).remove(entry);
660 + flowTable.getFlowEntriesById(entry).add(entry);
661 +
662 +
663 + }
664 + }
665 + } catch (ExecutionException e) {
666 + log.error("Failed to load backup flowtable for {}", did, e);
667 + }
668 + }
669 +
599 private void removeFromPrimary(final DeviceId did) { 670 private void removeFromPrimary(final DeviceId did) {
600 flowTable.clearDevice(did); 671 flowTable.clearDevice(did);
601 } 672 }
602 673
674 +
603 private final class OnStoreBatch implements ClusterMessageHandler { 675 private final class OnStoreBatch implements ClusterMessageHandler {
604 private final NodeId local; 676 private final NodeId local;
605 677
...@@ -616,11 +688,10 @@ public class DistributedFlowRuleStore ...@@ -616,11 +688,10 @@ public class DistributedFlowRuleStore
616 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId); 688 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
617 if (!local.equals(replicaInfo.master().orNull())) { 689 if (!local.equals(replicaInfo.master().orNull())) {
618 690
619 - Set<FlowRule> failures = operation.getOperations() 691 + Set<FlowRule> failures = new HashSet<>(operation.size());
620 - .stream() 692 + for (FlowRuleBatchEntry op : operation.getOperations()) {
621 - .map(FlowRuleBatchEntry::target) 693 + failures.add(op.target());
622 - .collect(Collectors.toSet()); 694 + }
623 -
624 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId); 695 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
625 // This node is no longer the master, respond as all failed. 696 // This node is no longer the master, respond as all failed.
626 // TODO: we might want to wrap response in envelope 697 // TODO: we might want to wrap response in envelope
...@@ -641,6 +712,17 @@ public class DistributedFlowRuleStore ...@@ -641,6 +712,17 @@ public class DistributedFlowRuleStore
641 } 712 }
642 } 713 }
643 714
715 + private final class SMapLoader
716 + extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
717 +
718 + @Override
719 + public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
720 + throws Exception {
721 + IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
722 + return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
723 + }
724 + }
725 +
644 private final class InternalReplicaInfoEventListener 726 private final class InternalReplicaInfoEventListener
645 implements ReplicaInfoEventListener { 727 implements ReplicaInfoEventListener {
646 728
...@@ -655,7 +737,7 @@ public class DistributedFlowRuleStore ...@@ -655,7 +737,7 @@ public class DistributedFlowRuleStore
655 if (local.equals(rInfo.master().orNull())) { 737 if (local.equals(rInfo.master().orNull())) {
656 // This node is the new master, populate local structure 738 // This node is the new master, populate local structure
657 // from backup 739 // from backup
658 - flowTable.loadFromBackup(did); 740 + loadFromBackup(did);
659 } 741 }
660 //else { 742 //else {
661 // This node is no longer the master holder, 743 // This node is no longer the master holder,
...@@ -672,133 +754,146 @@ public class DistributedFlowRuleStore ...@@ -672,133 +754,146 @@ public class DistributedFlowRuleStore
672 } 754 }
673 } 755 }
674 756
675 - private class InternalFlowTable { 757 + // Task to update FlowEntries in backup HZ store
758 + private final class UpdateBackup implements Runnable {
676 759
677 - private boolean backupsEnabled = true; 760 + private final DeviceId deviceId;
761 + private final Set<FlowRuleBatchEntry> ops;
678 762
679 - /** 763 +
680 - * Turns backups on or off. 764 + public UpdateBackup(DeviceId deviceId,
681 - * @param backupsEnabled whether backups should be enabled or not 765 + Set<FlowRuleBatchEntry> ops) {
682 - * @return this instance 766 + this.deviceId = checkNotNull(deviceId);
683 - */ 767 + this.ops = checkNotNull(ops);
684 - public InternalFlowTable withBackupsEnabled(boolean backupsEnabled) { 768 +
685 - this.backupsEnabled = backupsEnabled; 769 + }
686 - return this; 770 +
687 - } 771 + @Override
688 - 772 + public void run() {
689 - private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> 773 + try {
690 - flowEntries = Maps.newConcurrentMap(); 774 + log.trace("update backup {} {}", deviceId, ops
691 - 775 + );
692 - private final KryoNamespace.Builder flowSerializer = KryoNamespace.newBuilder() 776 + final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
693 - .register(KryoNamespaces.API) 777 +
694 - .register(MastershipBasedTimestamp.class); 778 +
695 - 779 + ops.stream().forEach(
696 - private final ClockService<FlowId, StoredFlowEntry> clockService = 780 + op -> {
697 - (flowId, flowEntry) -> 781 + final FlowRule entry = op.target();
698 - (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId()); 782 + final FlowId id = entry.id();
699 - 783 + ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
700 - private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = 784 + List<StoredFlowEntry> list = new ArrayList<>();
701 - new EventuallyConsistentMapImpl<>("flow-backup", 785 + if (original != null) {
702 - clusterService, 786 + list.addAll(original);
703 - clusterCommunicator, 787 + }
704 - flowSerializer, 788 + list.remove(op.target());
705 - clockService, 789 + if (op.operator() == FlowRuleOperation.ADD) {
706 - (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true); 790 + list.add((StoredFlowEntry) entry);
707 - 791 + }
708 - private Collection<NodeId> getPeerNodes() { 792 +
709 - List<NodeId> nodes = clusterService.getNodes() 793 + ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
710 - .stream() 794 + boolean success;
711 - .map(node -> node.id()) 795 + if (original == null) {
712 - .filter(id -> !id.equals(clusterService.getLocalNode().id())) 796 + success = (backupFlowTable.putIfAbsent(id, newValue) == null);
713 - .collect(Collectors.toList());
714 -
715 - if (nodes.isEmpty()) {
716 - return ImmutableList.of();
717 } else { 797 } else {
718 - // get a random peer 798 + success = backupFlowTable.replace(id, original, newValue);
719 - return ImmutableList.of(nodes.get(RandomUtils.nextInt(nodes.size())));
720 } 799 }
800 + if (!success) {
801 + log.error("Updating backup failed.");
721 } 802 }
722 803
723 - public void loadFromBackup(DeviceId deviceId) {
724 - if (!backupsEnabled) {
725 - return;
726 } 804 }
727 - log.info("We are now the master for {}. Will load flow rules from backup", deviceId); 805 + );
806 + } catch (ExecutionException e) {
807 + log.error("Failed to write to backups", e);
808 + }
728 809
729 - ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>(); 810 + }
811 + }
730 812
731 - backupMap.values() 813 + private class InternalFlowTable {
732 - .stream() 814 +
733 - .filter(entry -> entry.deviceId().equals(deviceId)) 815 + /*
734 - .forEach(entry -> flowTable.computeIfPresent(entry.id(), (k, v) -> { 816 + TODO: This needs to be cleaned up. Perhaps using the eventually consistent
735 - if (v == null) { 817 + map when it supports distributed to a sequence of instances.
736 - return Sets.newHashSet(entry); 818 + */
737 - } else { 819 +
738 - v.add(entry); 820 +
821 + private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
822 + flowEntries = new ConcurrentHashMap<>();
823 +
824 +
825 + private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
826 + return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
739 } 827 }
740 - return v; 828 +
741 - })); 829 + /**
742 - flowEntries.putIfAbsent(deviceId, flowTable); 830 + * Returns the flow table for specified device.
831 + *
832 + * @param deviceId identifier of the device
833 + * @return Map representing Flow Table of given device.
834 + */
835 + private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
836 + return createIfAbsentUnchecked(flowEntries,
837 + deviceId, lazyEmptyFlowTable());
743 } 838 }
744 839
745 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) { 840 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
746 - return flowEntries 841 + final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
747 - .computeIfAbsent(deviceId, key -> Maps.newConcurrentMap()) 842 + Set<StoredFlowEntry> r = flowTable.get(flowId);
748 - .computeIfAbsent(flowId, k -> new CopyOnWriteArraySet<>()); 843 + if (r == null) {
844 + final Set<StoredFlowEntry> concurrentlyAdded;
845 + r = new CopyOnWriteArraySet<>();
846 + concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
847 + if (concurrentlyAdded != null) {
848 + return concurrentlyAdded;
849 + }
850 + }
851 + return r;
749 } 852 }
750 853
751 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) { 854 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
752 - return getFlowEntriesInternal(rule.deviceId(), rule.id()) 855 + for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
753 - .stream() 856 + if (f.equals(rule)) {
754 - .filter(element -> element.equals(rule)) 857 + return f;
755 - .findFirst() 858 + }
756 - .orElse(null); 859 + }
860 + return null;
757 } 861 }
758 862
759 - private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId) { 863 + private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
760 - Set<StoredFlowEntry> entries = Sets.newHashSet(); 864 + return getFlowTable(deviceId).values().stream()
761 - flowEntries.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap()) 865 + .flatMap((list -> list.stream())).collect(Collectors.toSet());
762 - .values() 866 +
763 - .forEach(entries::addAll);
764 - return entries;
765 } 867 }
766 868
869 +
767 public StoredFlowEntry getFlowEntry(FlowRule rule) { 870 public StoredFlowEntry getFlowEntry(FlowRule rule) {
768 return getFlowEntryInternal(rule); 871 return getFlowEntryInternal(rule);
769 } 872 }
770 873
771 - public Set<StoredFlowEntry> getFlowEntries(DeviceId deviceId) { 874 + public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
772 return getFlowEntriesInternal(deviceId); 875 return getFlowEntriesInternal(deviceId);
773 } 876 }
774 877
775 - public void add(StoredFlowEntry rule) { 878 + public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
776 - getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule); 879 + return getFlowEntriesInternal(entry.deviceId(), entry.id());
777 - if (backupsEnabled) {
778 - try {
779 - backupMap.put(rule.id(), rule);
780 - } catch (Exception e) {
781 - log.warn("Failed to backup flow rule", e);
782 - }
783 } 880 }
881 +
882 + public void add(FlowEntry rule) {
883 + ((CopyOnWriteArraySet)
884 + getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
784 } 885 }
785 886
786 public boolean remove(DeviceId deviceId, FlowEntry rule) { 887 public boolean remove(DeviceId deviceId, FlowEntry rule) {
787 - boolean status = 888 + return ((CopyOnWriteArraySet)
788 - getFlowEntriesInternal(deviceId, rule.id()).remove(rule); 889 + getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
789 - if (backupsEnabled && status) { 890 + //return flowEntries.remove(deviceId, rule);
790 - try {
791 - backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
792 - } catch (Exception e) {
793 - log.warn("Failed to remove backup of flow rule", e);
794 - }
795 - }
796 - return status;
797 } 891 }
798 892
799 public void clearDevice(DeviceId did) { 893 public void clearDevice(DeviceId did) {
800 flowEntries.remove(did); 894 flowEntries.remove(did);
801 - // Flow entries should continue to remain in backup map.
802 } 895 }
803 } 896 }
897 +
898 +
804 } 899 }
......