Brian O'Connor
Committed by Gerrit Code Review

Adding BoundedThreadPool and BlockingBoolean

Updating EventuallyConsistentMap to use BoundedThreadPool for broadcast threads,
and disabling anti-entropy for now.

Change-Id: Id1bfcdaf1d0a19745fe7336e4ac9eaf649871d5d
......@@ -53,6 +53,12 @@
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-common</artifactId>
</dependency>
......
......@@ -54,8 +54,8 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.minPriority;
/**
* Distributed Map implementation which uses optimistic replication and gossip
......@@ -149,16 +149,23 @@ public class EventuallyConsistentMapImpl<K, V>
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
executor = Executors //FIXME
.newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
// should be a normal executor; it's used for receiving messages
//TODO make # of threads configurable
executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
// sending executor; should be capped
//TODO make # of threads configurable
broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(
groupedThreads("onos/ecm", mapName + "-bg-%d")));
//FIXME anti-entropy can take >60 seconds and it blocks fg workers
// ... dropping minPriority to try to help until this can be parallel
newSingleThreadScheduledExecutor(//minPriority(
groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
// start anti-entropy thread
//TODO disable anti-entropy for now in testing (it is unstable)
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
......@@ -494,8 +501,8 @@ public class EventuallyConsistentMapImpl<K, V>
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
//broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
clusterCommunicator.broadcast(message);
broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
// clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Mutable boolean that allows threads to wait for a specified value.
*/
public class BlockingBoolean extends AbstractQueuedSynchronizer {
private static final int TRUE = 1;
private static final int FALSE = 0;
/**
* Creates a new blocking boolean with the specified value.
*
* @param value the starting value
*/
public BlockingBoolean(boolean value) {
setState(value ? TRUE : FALSE);
}
/**
* Causes the current thread to wait until the boolean equals the specified
* value unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* @param value specified value
* @throws InterruptedException
*/
public void await(boolean value) throws InterruptedException {
acquireSharedInterruptibly(value ? TRUE : FALSE);
}
/**
* Causes the current thread to wait until the boolean equals the specified
* value unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
*
* @param value specified value
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false}
* if the waiting time elapsed before the count reached zero
* @throws InterruptedException
*/
public boolean await(boolean value, long timeout, TimeUnit unit)
throws InterruptedException {
return tryAcquireSharedNanos(value ? TRUE : FALSE, unit.toNanos(timeout));
}
protected int tryAcquireShared(int acquires) {
return (getState() == acquires) ? 1 : -1;
}
/**
* Sets the value of the blocking boolean.
*
* @param value new value
*/
public void set(boolean value) {
releaseShared(value ? TRUE : FALSE);
}
/**
* Gets the value of the blocking boolean.
*
* @return current value
*/
public boolean get() {
return getState() == TRUE;
}
protected boolean tryReleaseShared(int releases) {
// Signal on state change only
int state = getState();
if (state == releases) {
return false;
}
return compareAndSetState(state, releases);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Implementation of ThreadPoolExecutor that bounds the work queue.
* <p>
* When a new job would exceed the queue bound, the job is run on the caller's
* thread rather than on a thread from the pool.
* </p>
*/
public final class BoundedThreadPool extends ThreadPoolExecutor {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
protected static int maxQueueSize = 80_000; //TODO tune this value
//private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
private static final long STATS_INTERVAL = 5_000; //ms
private final BlockingBoolean underHighLoad;
private BoundedThreadPool(int numberOfThreads,
ThreadFactory threadFactory) {
super(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(maxQueueSize),
threadFactory,
new CallerFeedbackPolicy());
underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
}
/**
* Returns a single-thread, bounded executor service.
*
* @param threadFactory thread factory for the worker thread.
* @return the bounded thread pool
*/
public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
return new BoundedThreadPool(1, threadFactory);
}
/**
* Returns a fixed-size, bounded executor service.
*
* @param threadFactory thread factory for the worker threads.
* @return the bounded thread pool
*/
public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
return new BoundedThreadPool(numberOfThreads, threadFactory);
}
//TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
private final Counter submitted = new Counter();
private final Counter taken = new Counter();
@Override
public Future<?> submit(Runnable task) {
submitted.add(1);
return super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
submitted.add(1);
return super.submit(task, result);
}
@Override
public void execute(Runnable command) {
submitted.add(1);
super.execute(command);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
submitted.add(1);
return super.submit(task);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
taken.add(1);
periodicallyPrintStats();
updateLoad();
}
// TODO schedule this with a fixed delay from a scheduled executor
private final AtomicLong lastPrinted = new AtomicLong(0L);
private void periodicallyPrintStats() {
long now = System.currentTimeMillis();
long prev = lastPrinted.get();
if (now - prev > STATS_INTERVAL) {
if (lastPrinted.compareAndSet(prev, now)) {
log.warn("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
getQueue().size(),
submitted.throughput(), taken.throughput());
submitted.reset();
taken.reset();
}
}
}
// TODO consider updating load whenever queue changes
private void updateLoad() {
underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
}
/**
* Feedback policy that delays the caller's thread until the executor's work
* queue falls below a threshold, then runs the job on the caller's thread.
*/
private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
private final BlockingBoolean underLoad = new BlockingBoolean(false);
public BlockingBoolean load() {
return underLoad;
}
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// Wait for up to 1 second while the queue drains...
boolean notified = false;
try {
notified = underLoad.await(false, 1, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
log.debug("Got exception waiting for notification:", exception);
} finally {
if (!notified) {
log.info("Waited for 1 second on {}. Proceeding with work...",
Thread.currentThread().getName());
} else {
log.info("FIXME we got a notice");
}
}
// Do the work on the submitter's thread
r.run();
}
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
/**
* Tests of the BlockingBoolean utility.
*/
public class BlockingBooleanTest {
@Test
public void basics() {
BlockingBoolean b = new BlockingBoolean(false);
assertEquals(false, b.get());
b.set(true);
assertEquals(true, b.get());
b.set(true);
assertEquals(true, b.get());
b.set(false);
assertEquals(false, b.get());
}
private void waitChange(boolean value, int numThreads) {
BlockingBoolean b = new BlockingBoolean(!value);
CountDownLatch latch = new CountDownLatch(numThreads);
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
exec.submit(() -> {
try {
b.await(value);
latch.countDown();
} catch (InterruptedException e) {
fail();
}
});
}
b.set(value);
try {
assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
exec.shutdown();
}
@Test
public void waitTrueChange() {
waitChange(true, 4);
}
@Test
public void waitFalseChange() {
waitChange(false, 4);
}
@Test
public void waitSame() {
BlockingBoolean b = new BlockingBoolean(true);
CountDownLatch latch = new CountDownLatch(1);
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.submit(() -> {
try {
b.await(true);
latch.countDown();
} catch (InterruptedException e) {
fail();
}
});
try {
assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
exec.shutdown();
}
@Test
public void someWait() {
BlockingBoolean b = new BlockingBoolean(false);
int numThreads = 4;
CountDownLatch sameLatch = new CountDownLatch(numThreads / 2);
CountDownLatch waitLatch = new CountDownLatch(numThreads / 2);
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
final boolean value = (i % 2 == 1);
exec.submit(() -> {
try {
b.await(value);
if (value) {
waitLatch.countDown();
} else {
sameLatch.countDown();
}
} catch (InterruptedException e) {
fail();
}
});
}
try {
assertTrue(sameLatch.await(10, TimeUnit.MILLISECONDS));
assertEquals(waitLatch.getCount(), numThreads / 2);
} catch (InterruptedException e) {
fail();
}
b.set(true);
try {
assertTrue(waitLatch.await(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
exec.shutdown();
}
@Test
public void waitTimeout() {
BlockingBoolean b = new BlockingBoolean(true);
CountDownLatch latch = new CountDownLatch(1);
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.submit(() -> {
try {
if (!b.await(false, 1, TimeUnit.NANOSECONDS)) {
latch.countDown();
} else {
fail();
}
} catch (InterruptedException e) {
fail();
}
});
try {
assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
exec.shutdown();
}
@Test
@Ignore
public void samePerf() {
int iters = 10_000;
BlockingBoolean b1 = new BlockingBoolean(false);
long t1 = System.nanoTime();
for (int i = 0; i < iters; i++) {
b1.set(false);
}
long t2 = System.nanoTime();
MutableBoolean b2 = new MutableBoolean(false);
for (int i = 0; i < iters; i++) {
b2.setValue(false);
}
long t3 = System.nanoTime();
System.out.println((t2 - t1) + " " + (t3 - t2) + " " + ((t2 - t1) <= (t3 - t2)));
}
@Test
@Ignore
public void changePerf() {
int iters = 10_000;
BlockingBoolean b1 = new BlockingBoolean(false);
boolean v = true;
long t1 = System.nanoTime();
for (int i = 0; i < iters; i++) {
b1.set(v);
v = !v;
}
long t2 = System.nanoTime();
MutableBoolean b2 = new MutableBoolean(false);
for (int i = 0; i < iters; i++) {
b2.setValue(v);
v = !v;
}
long t3 = System.nanoTime();
System.out.println((t2 - t1) + " " + (t3 - t2) + " " + ((t2 - t1) <= (t3 - t2)));
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
import static org.onlab.util.BoundedThreadPool.*;
import static org.onlab.util.Tools.namedThreads;
/**
* Test of BoundedThreadPool.
*/
public final class BoundedThreadPoolTest {
@Test
public void simpleJob() {
final Thread myThread = Thread.currentThread();
final AtomicBoolean sameThread = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(1);
BoundedThreadPool exec = newSingleThreadExecutor(namedThreads("test"));
exec.submit(() -> {
sameThread.set(myThread.equals(Thread.currentThread()));
latch.countDown();
});
try {
assertTrue("Job not run", latch.await(100, TimeUnit.MILLISECONDS));
assertFalse("Runnable used caller thread", sameThread.get());
} catch (InterruptedException e) {
fail();
} finally {
exec.shutdown();
}
// TODO perhaps move to tearDown
try {
assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail();
}
}
private List<CountDownLatch> fillExecutor(BoundedThreadPool exec) {
int numThreads = exec.getMaximumPoolSize();
List<CountDownLatch> latches = Lists.newArrayList();
final CountDownLatch started = new CountDownLatch(numThreads);
List<CountDownLatch> finished = Lists.newArrayList();
// seed the executor's threads
for (int i = 0; i < numThreads; i++) {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch fin = new CountDownLatch(1);
latches.add(latch);
finished.add(fin);
exec.submit(() -> {
try {
started.countDown();
latch.await();
fin.countDown();
} catch (InterruptedException e) {
fail();
}
});
}
try {
assertTrue(started.await(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
// fill the queue
CountDownLatch startedBlocked = new CountDownLatch(1);
while (exec.getQueue().remainingCapacity() > 0) {
final CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
exec.submit(() -> {
try {
startedBlocked.countDown();
latch.await();
} catch (InterruptedException e) {
fail();
}
});
}
latches.remove(0).countDown(); // release one of the executors
// ... we need to do this because load is recomputed when jobs are taken
// Note: For this to work, 1 / numThreads must be less than the load threshold (0.2)
// verify that the old job has terminated
try {
assertTrue("Job didn't finish",
finished.remove(0).await(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
// verify that a previously blocked thread has started
try {
assertTrue(startedBlocked.await(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
// add another job to fill the queue
final CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
exec.submit(() -> {
try {
latch.await();
} catch (InterruptedException e) {
fail();
}
});
assertEquals(exec.getQueue().size(), maxQueueSize);
return latches;
}
@Test
public void releaseOneThread() {
maxQueueSize = 10;
BoundedThreadPool exec = newFixedThreadPool(4, namedThreads("test"));
List<CountDownLatch> latches = fillExecutor(exec);
CountDownLatch myLatch = new CountDownLatch(1);
ExecutorService myExec = Executors.newSingleThreadExecutor();
Future<Thread> expected = myExec.submit(Thread::currentThread);
assertEquals(exec.getQueue().size(), maxQueueSize);
long start = System.nanoTime();
Future<Thread> actual = myExec.submit(() -> {
return exec.submit(() -> {
myLatch.countDown();
return Thread.currentThread();
}).get();
});
try {
assertFalse("Thread should still be blocked",
myLatch.await(10, TimeUnit.MILLISECONDS));
latches.remove(0).countDown(); // release the first thread
assertFalse("Thread should still be blocked",
myLatch.await(10, TimeUnit.MILLISECONDS));
latches.remove(0).countDown(); // release the second thread
assertTrue("Thread should be unblocked",
myLatch.await(10, TimeUnit.MILLISECONDS));
long delta = System.nanoTime() - start;
double load = exec.getQueue().size() / (double) maxQueueSize;
assertTrue("Load is greater than threshold", load <= 0.8);
assertTrue("Load is less than threshold", load >= 0.6);
assertEquals("Work done on wrong thread", expected.get(), actual.get());
assertTrue("Took more than one second", delta < Math.pow(10, 9));
} catch (InterruptedException | ExecutionException e) {
fail();
} finally {
latches.forEach(CountDownLatch::countDown);
exec.shutdown();
}
// TODO perhaps move to tearDown
try {
assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail();
}
}
@Test
public void highLoadTimeout() {
maxQueueSize = 10;
BoundedThreadPool exec = newFixedThreadPool(2, namedThreads("test"));
List<CountDownLatch> latches = fillExecutor(exec);
// true if the job is executed and it is done on the test thread
final AtomicBoolean sameThread = new AtomicBoolean(false);
final Thread myThread = Thread.currentThread();
long start = System.nanoTime();
exec.submit(() -> {
sameThread.set(myThread.equals(Thread.currentThread()));
});
long delta = System.nanoTime() - start;
assertEquals(maxQueueSize, exec.getQueue().size());
assertTrue("Work done on wrong thread (or didn't happen)", sameThread.get());
assertTrue("Took less than one second. Actual: " + delta / 1_000_000.0 + "ms",
delta > Math.pow(10, 9));
assertTrue("Took more than two seconds", delta < 2 * Math.pow(10, 9));
latches.forEach(CountDownLatch::countDown);
exec.shutdown();
// TODO perhaps move to tearDown
try {
assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail();
}
}
}
\ No newline at end of file