Brian O'Connor
Committed by Gerrit Code Review

cleaning up some TODOs

Change-Id: Ib2380e9533ba30c6f9fdf79aed1879dbbe3589a7
...@@ -194,8 +194,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService { ...@@ -194,8 +194,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
194 || (linkEvent.type() == LINK_UPDATED && 194 || (linkEvent.type() == LINK_UPDATED &&
195 linkEvent.subject().isDurable())) { 195 linkEvent.subject().isDurable())) {
196 final LinkKey linkKey = linkKey(linkEvent.subject()); 196 final LinkKey linkKey = linkKey(linkEvent.subject());
197 - Set<IntentId> intentIds = intentsByLink.get(linkKey);
198 synchronized (intentsByLink) { 197 synchronized (intentsByLink) {
198 + Set<IntentId> intentIds = intentsByLink.get(linkKey);
199 log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds); 199 log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
200 toBeRecompiled.addAll(intentIds); 200 toBeRecompiled.addAll(intentIds);
201 } 201 }
...@@ -206,7 +206,6 @@ public class ObjectiveTracker implements ObjectiveTrackerService { ...@@ -206,7 +206,6 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
206 linkEvent.subject().isDurable())); 206 linkEvent.subject().isDurable()));
207 } 207 }
208 } 208 }
209 -
210 delegate.triggerCompile(toBeRecompiled, !recompileOnly); 209 delegate.triggerCompile(toBeRecompiled, !recompileOnly);
211 } 210 }
212 } 211 }
......
...@@ -175,7 +175,7 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -175,7 +175,7 @@ public class HazelcastLeadershipService implements LeadershipService,
175 for (Topic topic : topics.values()) { 175 for (Topic topic : topics.values()) {
176 Leadership leadership = new Leadership(topic.topicName(), 176 Leadership leadership = new Leadership(topic.topicName(),
177 topic.leader(), 177 topic.leader(),
178 - 0L); // TODO: epoch not used 178 + 0L);
179 result.put(topic.topicName(), leadership); 179 result.put(topic.topicName(), leadership);
180 } 180 }
181 return result; 181 return result;
......
...@@ -51,7 +51,6 @@ public class DistributedIdBlockStore implements IdBlockStore { ...@@ -51,7 +51,6 @@ public class DistributedIdBlockStore implements IdBlockStore {
51 51
52 @Override 52 @Override
53 public IdBlock getIdBlock(String topic) { 53 public IdBlock getIdBlock(String topic) {
54 - //TODO need to persist this value across cluster failures
55 Long blockBase = theInstance.getAtomicLong(topic).getAndAdd(DEFAULT_BLOCK_SIZE); 54 Long blockBase = theInstance.getAtomicLong(topic).getAndAdd(DEFAULT_BLOCK_SIZE);
56 return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE); 55 return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
57 } 56 }
......
...@@ -130,7 +130,6 @@ public class DistributedFlowRuleStore ...@@ -130,7 +130,6 @@ public class DistributedFlowRuleStore
130 130
131 private final AtomicInteger localBatchIdGen = new AtomicInteger(); 131 private final AtomicInteger localBatchIdGen = new AtomicInteger();
132 132
133 - // TODO: make this configurable
134 private int pendingFutureTimeoutMinutes = 5; 133 private int pendingFutureTimeoutMinutes = 5;
135 134
136 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = 135 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
...@@ -149,7 +148,6 @@ public class DistributedFlowRuleStore ...@@ -149,7 +148,6 @@ public class DistributedFlowRuleStore
149 private final ExecutorService backupExecutors = 148 private final ExecutorService backupExecutors =
150 Executors.newSingleThreadExecutor(namedThreads("async-backups")); 149 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
151 150
152 - // TODO make this configurable
153 private boolean syncBackup = false; 151 private boolean syncBackup = false;
154 152
155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() { 153 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
...@@ -163,7 +161,6 @@ public class DistributedFlowRuleStore ...@@ -163,7 +161,6 @@ public class DistributedFlowRuleStore
163 } 161 }
164 }; 162 };
165 163
166 - // TODO: make this configurable
167 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; 164 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
168 165
169 private ReplicaInfoEventListener replicaInfoEventListener; 166 private ReplicaInfoEventListener replicaInfoEventListener;
...@@ -247,7 +244,7 @@ public class DistributedFlowRuleStore ...@@ -247,7 +244,7 @@ public class DistributedFlowRuleStore
247 } 244 }
248 245
249 246
250 - // TODO: This is not a efficient operation on a distributed sharded 247 + // This is not a efficient operation on a distributed sharded
251 // flow store. We need to revisit the need for this operation or at least 248 // flow store. We need to revisit the need for this operation or at least
252 // make it device specific. 249 // make it device specific.
253 @Override 250 @Override
...@@ -267,7 +264,6 @@ public class DistributedFlowRuleStore ...@@ -267,7 +264,6 @@ public class DistributedFlowRuleStore
267 264
268 if (!replicaInfo.master().isPresent()) { 265 if (!replicaInfo.master().isPresent()) {
269 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId()); 266 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
270 - // TODO: should we try returning from backup?
271 return null; 267 return null;
272 } 268 }
273 269
...@@ -287,9 +283,9 @@ public class DistributedFlowRuleStore ...@@ -287,9 +283,9 @@ public class DistributedFlowRuleStore
287 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 283 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
288 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); 284 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
289 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { 285 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
290 - // FIXME: throw a FlowStoreException 286 + log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
291 - throw new RuntimeException(e);
292 } 287 }
288 + return null;
293 } 289 }
294 290
295 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) { 291 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
...@@ -313,7 +309,6 @@ public class DistributedFlowRuleStore ...@@ -313,7 +309,6 @@ public class DistributedFlowRuleStore
313 309
314 if (!replicaInfo.master().isPresent()) { 310 if (!replicaInfo.master().isPresent()) {
315 log.warn("Failed to getFlowEntries: No master for {}", deviceId); 311 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
316 - // TODO: should we try returning from backup?
317 return Collections.emptyList(); 312 return Collections.emptyList();
318 } 313 }
319 314
...@@ -333,9 +328,9 @@ public class DistributedFlowRuleStore ...@@ -333,9 +328,9 @@ public class DistributedFlowRuleStore
333 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 328 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
334 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); 329 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
335 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { 330 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
336 - // FIXME: throw a FlowStoreException 331 + log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
337 - throw new RuntimeException(e);
338 } 332 }
333 + return null;
339 } 334 }
340 335
341 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) { 336 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
......
...@@ -88,7 +88,7 @@ public class HazelcastIntentBatchQueue ...@@ -88,7 +88,7 @@ public class HazelcastIntentBatchQueue
88 private IntentBatchDelegate delegate; 88 private IntentBatchDelegate delegate;
89 private InternalLeaderListener leaderListener = new InternalLeaderListener(); 89 private InternalLeaderListener leaderListener = new InternalLeaderListener();
90 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues 90 private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
91 - = Maps.newHashMap(); // FIXME make distributed? 91 + = Maps.newHashMap();
92 private final Set<ApplicationId> myTopics = Sets.newHashSet(); 92 private final Set<ApplicationId> myTopics = Sets.newHashSet();
93 private final Map<ApplicationId, IntentOperations> outstandingOps 93 private final Map<ApplicationId, IntentOperations> outstandingOps
94 = Maps.newHashMap(); 94 = Maps.newHashMap();
...@@ -158,7 +158,7 @@ public class HazelcastIntentBatchQueue ...@@ -158,7 +158,7 @@ public class HazelcastIntentBatchQueue
158 public void addIntentOperations(IntentOperations ops) { 158 public void addIntentOperations(IntentOperations ops) {
159 checkNotNull(ops, "Intent operations cannot be null."); 159 checkNotNull(ops, "Intent operations cannot be null.");
160 ApplicationId appId = ops.appId(); 160 ApplicationId appId = ops.appId();
161 - getQueue(appId).add(ops); // TODO consider using put here 161 + getQueue(appId).add(ops);
162 dispatchNextOperation(appId); 162 dispatchNextOperation(appId);
163 } 163 }
164 164
...@@ -175,7 +175,6 @@ public class HazelcastIntentBatchQueue ...@@ -175,7 +175,6 @@ public class HazelcastIntentBatchQueue
175 log.warn("Operation {} not found", ops); 175 log.warn("Operation {} not found", ops);
176 } 176 }
177 SQueue<IntentOperations> queue = batchQueues.get(appId); 177 SQueue<IntentOperations> queue = batchQueues.get(appId);
178 - // TODO consider alternatives to remove
179 checkState(queue.remove().equals(ops), 178 checkState(queue.remove().equals(ops),
180 "Operations are wrong."); 179 "Operations are wrong.");
181 outstandingOps.remove(appId); 180 outstandingOps.remove(appId);
...@@ -214,7 +213,6 @@ public class HazelcastIntentBatchQueue ...@@ -214,7 +213,6 @@ public class HazelcastIntentBatchQueue
214 */ 213 */
215 private void leaderChanged(String topic, boolean leader) { 214 private void leaderChanged(String topic, boolean leader) {
216 ApplicationId appId = getAppId(topic); 215 ApplicationId appId = getAppId(topic);
217 - //TODO we are using the event caller's thread, should we use our own?
218 synchronized (this) { 216 synchronized (this) {
219 if (leader) { 217 if (leader) {
220 myTopics.add(appId); 218 myTopics.add(appId);
......
...@@ -99,7 +99,6 @@ public class HazelcastIntentStore ...@@ -99,7 +99,6 @@ public class HazelcastIntentStore
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected MetricsService metricsService; 100 protected MetricsService metricsService;
101 101
102 - // TODO make this configurable
103 private boolean onlyLogTransitionError = true; 102 private boolean onlyLogTransitionError = true;
104 103
105 private Timer createIntentTimer; 104 private Timer createIntentTimer;
...@@ -142,7 +141,7 @@ public class HazelcastIntentStore ...@@ -142,7 +141,7 @@ public class HazelcastIntentStore
142 getIntentTimer = createResponseTimer("getIntent"); 141 getIntentTimer = createResponseTimer("getIntent");
143 getIntentStateTimer = createResponseTimer("getIntentState"); 142 getIntentStateTimer = createResponseTimer("getIntentState");
144 143
145 - // FIXME: We need a way to add serializer for intents which has been plugged-in. 144 + // We need a way to add serializer for intents which has been plugged-in.
146 // As a short term workaround, relax Kryo config to 145 // As a short term workaround, relax Kryo config to
147 // registrationRequired=false 146 // registrationRequired=false
148 super.activate(); 147 super.activate();
...@@ -164,7 +163,6 @@ public class HazelcastIntentStore ...@@ -164,7 +163,6 @@ public class HazelcastIntentStore
164 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME); 163 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
165 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount()); 164 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
166 165
167 - // TODO: enable near cache, allow read from backup for this IMap
168 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME); 166 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
169 intents = new SMap<>(rawIntents , super.serializer); 167 intents = new SMap<>(rawIntents , super.serializer);
170 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true); 168 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
...@@ -556,7 +554,6 @@ public class HazelcastIntentStore ...@@ -556,7 +554,6 @@ public class HazelcastIntentStore
556 } 554 }
557 555
558 log.trace("{} - {} -> {}", intentId, prevIntentState, newState); 556 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
559 - // TODO sanity check and log?
560 } catch (InterruptedException e) { 557 } catch (InterruptedException e) {
561 log.error("Batch write was interrupted while processing {}", op, e); 558 log.error("Batch write was interrupted while processing {}", op, e);
562 failed.add(op); 559 failed.add(op);
......
...@@ -164,7 +164,6 @@ public class GossipLinkStore ...@@ -164,7 +164,6 @@ public class GossipLinkStore
164 backgroundExecutors = 164 backgroundExecutors =
165 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d"))); 165 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
166 166
167 - // TODO: Make these configurable
168 long initialDelaySec = 5; 167 long initialDelaySec = 5;
169 long periodSec = 5; 168 long periodSec = 5;
170 // start anti-entropy thread 169 // start anti-entropy thread
......
...@@ -48,8 +48,10 @@ import org.slf4j.Logger; ...@@ -48,8 +48,10 @@ import org.slf4j.Logger;
48 /** 48 /**
49 * Manages inventory of topology snapshots using trivial in-memory 49 * Manages inventory of topology snapshots using trivial in-memory
50 * structures implementation. 50 * structures implementation.
51 + *
52 + * Note: This component is not distributed per-se. It runs on every
53 + * instance and feeds off of other distributed stores.
51 */ 54 */
52 -//FIXME: I LIE I AM NOT DISTRIBUTED
53 @Component(immediate = true) 55 @Component(immediate = true)
54 @Service 56 @Service
55 public class DistributedTopologyStore 57 public class DistributedTopologyStore
......
...@@ -37,7 +37,6 @@ public class Ip4AddressSerializer extends Serializer<Ip4Address> { ...@@ -37,7 +37,6 @@ public class Ip4AddressSerializer extends Serializer<Ip4Address> {
37 @Override 37 @Override
38 public void write(Kryo kryo, Output output, Ip4Address object) { 38 public void write(Kryo kryo, Output output, Ip4Address object) {
39 byte[] octs = object.toOctets(); 39 byte[] octs = object.toOctets();
40 - // TODO: Writing (and reading) the number of octets is redundant:
41 // It is always Ip4Address.BYTE_LENGTH 40 // It is always Ip4Address.BYTE_LENGTH
42 output.writeInt(octs.length); 41 output.writeInt(octs.length);
43 output.writeBytes(octs); 42 output.writeBytes(octs);
......
...@@ -39,7 +39,6 @@ public final class Ip4PrefixSerializer extends Serializer<Ip4Prefix> { ...@@ -39,7 +39,6 @@ public final class Ip4PrefixSerializer extends Serializer<Ip4Prefix> {
39 public void write(Kryo kryo, Output output, 39 public void write(Kryo kryo, Output output,
40 Ip4Prefix object) { 40 Ip4Prefix object) {
41 byte[] octs = object.address().toOctets(); 41 byte[] octs = object.address().toOctets();
42 - // TODO: Writing (and reading) the number of octets is redundant:
43 // It is always Ip6Address.BYTE_LENGTH 42 // It is always Ip6Address.BYTE_LENGTH
44 output.writeInt(octs.length); 43 output.writeInt(octs.length);
45 output.writeBytes(octs); 44 output.writeBytes(octs);
......
...@@ -37,7 +37,6 @@ public class Ip6AddressSerializer extends Serializer<Ip6Address> { ...@@ -37,7 +37,6 @@ public class Ip6AddressSerializer extends Serializer<Ip6Address> {
37 @Override 37 @Override
38 public void write(Kryo kryo, Output output, Ip6Address object) { 38 public void write(Kryo kryo, Output output, Ip6Address object) {
39 byte[] octs = object.toOctets(); 39 byte[] octs = object.toOctets();
40 - // TODO: Writing (and reading) the number of octets is redundant:
41 // It is always Ip6Address.BYTE_LENGTH 40 // It is always Ip6Address.BYTE_LENGTH
42 output.writeInt(octs.length); 41 output.writeInt(octs.length);
43 output.writeBytes(octs); 42 output.writeBytes(octs);
......
...@@ -39,7 +39,6 @@ public final class Ip6PrefixSerializer extends Serializer<Ip6Prefix> { ...@@ -39,7 +39,6 @@ public final class Ip6PrefixSerializer extends Serializer<Ip6Prefix> {
39 public void write(Kryo kryo, Output output, 39 public void write(Kryo kryo, Output output,
40 Ip6Prefix object) { 40 Ip6Prefix object) {
41 byte[] octs = object.address().toOctets(); 41 byte[] octs = object.address().toOctets();
42 - // TODO: Writing (and reading) the number of octets is redundant:
43 // It is always Ip6Address.BYTE_LENGTH 42 // It is always Ip6Address.BYTE_LENGTH
44 output.writeInt(octs.length); 43 output.writeInt(octs.length);
45 output.writeBytes(octs); 44 output.writeBytes(octs);
......