Jonathan Hart
Committed by Ray Milkey

Don't run anti-entropy when under high load

Change-Id: I9e480708b9eced73da98e5c4cb27a18aeb08f09a
......@@ -19,6 +19,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -49,6 +50,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -100,6 +102,12 @@ public class EventuallyConsistentMapImpl<K, V>
private long periodSec = 5;
private boolean lightweightAntiEntropy = true;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
AtomicLong operations = new AtomicLong();
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
......@@ -162,7 +170,7 @@ public class EventuallyConsistentMapImpl<K, V>
//FIXME anti-entropy can take >60 seconds and it blocks fg workers
// ... dropping minPriority to try to help until this can be parallel
newSingleThreadScheduledExecutor(//minPriority(
groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
// start anti-entropy thread
//TODO disable anti-entropy for now in testing (it is unstable)
......@@ -271,6 +279,7 @@ public class EventuallyConsistentMapImpl<K, V>
}
private boolean putInternal(K key, V value, Timestamp timestamp) {
counter.incrementCount();
Timestamp removed = removedItems.get(key);
if (removed != null && removed.isNewerThan(timestamp)) {
log.debug("ecmap - removed was newer {}", value);
......@@ -318,6 +327,7 @@ public class EventuallyConsistentMapImpl<K, V>
}
private boolean removeInternal(K key, Timestamp timestamp) {
counter.incrementCount();
final MutableBoolean updated = new MutableBoolean(false);
items.compute(key, (k, existing) -> {
......@@ -515,6 +525,10 @@ public class EventuallyConsistentMapImpl<K, V>
clusterCommunicator.unicast(message, peer);
}
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
......@@ -523,6 +537,10 @@ public class EventuallyConsistentMapImpl<K, V>
return;
}
if (underHighLoad()) {
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
......@@ -745,7 +763,9 @@ public class EventuallyConsistentMapImpl<K, V>
message.sender());
AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
try {
handleAntiEntropyAdvertisement(advertisement);
if (!underHighLoad()) {
handleAntiEntropyAdvertisement(advertisement);
}
} catch (Exception e) {
log.warn("Exception thrown handling advertisements", e);
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Maintains a sliding window of value counts. The sliding window counter is
* initialized with a number of window slots. Calls to #incrementCount() will
* increment the value in the current window slot. Periodically the window
* slides and the oldest value count is dropped. Calls to #get() will get the
* total count for the last N window slots.
*/
public final class SlidingWindowCounter {
private volatile int headSlot;
private final int windowSlots;
private final List<AtomicLong> counters;
private final ScheduledExecutorService background;
private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
/**
* Creates a new sliding window counter with the given total number of
* window slots.
*
* @param windowSlots total number of window slots
*/
public SlidingWindowCounter(int windowSlots) {
checkArgument(windowSlots > 0, "Window size must be a positive integer");
this.windowSlots = windowSlots;
this.headSlot = 0;
// Initialize each item in the list to an AtomicLong of 0
this.counters = Collections.nCopies(windowSlots, 0)
.stream()
.map(AtomicLong::new)
.collect(Collectors.toCollection(ArrayList::new));
background = Executors.newSingleThreadScheduledExecutor();
background.scheduleWithFixedDelay(this::advanceHead, 0,
SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
}
/**
* Releases resources used by the SlidingWindowCounter.
*/
public void destroy() {
background.shutdownNow();
}
/**
* Increments the count of the current window slot by 1.
*/
public void incrementCount() {
incrementCount(headSlot, 1);
}
/**
* Increments the count of the current window slot by the given value.
*
* @param value value to increment by
*/
public void incrementCount(long value) {
incrementCount(headSlot, value);
}
private void incrementCount(int slot, long value) {
counters.get(slot).addAndGet(value);
}
/**
* Gets the total count for the last N window slots.
*
* @param slots number of slots to include in the count
* @return total count for last N slots
*/
public long get(int slots) {
checkArgument(slots <= windowSlots,
"Requested window must be less than the total window slots");
long sum = 0;
for (int i = 0; i < slots; i++) {
int currentIndex = headSlot - i;
if (currentIndex < 0) {
currentIndex = counters.size() + currentIndex;
}
sum += counters.get(currentIndex).get();
}
return sum;
}
void advanceHead() {
counters.get(slotAfter(headSlot)).set(0);
headSlot = slotAfter(headSlot);
}
private int slotAfter(int slot) {
return (slot + 1) % windowSlots;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for the sliding window counter.
*/
public class SlidingWindowCounterTest {
private SlidingWindowCounter counter;
@Before
public void setUp() {
counter = new SlidingWindowCounter(2);
}
@After
public void tearDown() {
counter.destroy();
}
@Test
public void testIncrementCount() {
assertEquals(0, counter.get(1));
assertEquals(0, counter.get(2));
counter.incrementCount();
assertEquals(1, counter.get(1));
assertEquals(1, counter.get(2));
counter.incrementCount(2);
assertEquals(3, counter.get(2));
}
@Test
public void testSlide() {
counter.incrementCount();
counter.advanceHead();
assertEquals(0, counter.get(1));
assertEquals(1, counter.get(2));
counter.incrementCount(2);
assertEquals(2, counter.get(1));
assertEquals(3, counter.get(2));
}
@Test
public void testWrap() {
counter.incrementCount();
counter.advanceHead();
counter.incrementCount(2);
counter.advanceHead();
assertEquals(0, counter.get(1));
assertEquals(2, counter.get(2));
counter.advanceHead();
assertEquals(0, counter.get(1));
assertEquals(0, counter.get(2));
}
@Test
public void testCornerCases() {
try {
counter.get(3);
fail("Exception should have been thrown");
} catch (IllegalArgumentException e) {
assertTrue(true);
}
try {
new SlidingWindowCounter(0);
fail("Exception should have been thrown");
} catch (IllegalArgumentException e) {
assertTrue(true);
}
try {
new SlidingWindowCounter(-1);
fail("Exception should have been thrown");
} catch (IllegalArgumentException e) {
assertTrue(true);
}
}
}