sangyun-han
Committed by Gerrit Code Review

[ONOS-4004] DistributedStatisticStore and DistributedFlowStatisticStore make configurable

- Using @Property and @Modified annotations
- Fix DistributedPacketStore / DistributedStatisticStore / DistributedFlowStatisticStore

Change-Id: I6c907498496b9f21a8ef13b7badeb24770cb88ff
......@@ -21,6 +21,8 @@ import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
......@@ -44,14 +46,20 @@ import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.retryable;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -68,10 +76,7 @@ public class DistributedPacketStore
private final Logger log = getLogger(getClass());
private static final int MAX_BACKOFF = 50;
// TODO: make this configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
......@@ -102,10 +107,17 @@ public class DistributedPacketStore
private ExecutorService messageHandlingExecutor;
private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Size of thread pool to assign message handler")
private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
private static final int MAX_BACKOFF = 50;
@Activate
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
messageHandlerThreadPoolSize,
groupedThreads("onos/store/packet", "message-handlers"));
communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
......@@ -126,6 +138,33 @@ public class DistributedPacketStore
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
int newMessageHandlerThreadPoolSize;
try {
String s = get(properties, "messageHandlerThreadPoolSize");
newMessageHandlerThreadPoolSize =
isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
} catch (NumberFormatException e) {
log.warn(e.getMessage());
newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
}
// Any change in the following parameters implies thread pool restart
if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
restartMessageHandlerThreadPool();
}
log.info(FORMAT, messageHandlerThreadPoolSize);
}
@Override
public void emit(OutboundPacket packet) {
NodeId myId = clusterService.getLocalNode().id();
......@@ -239,4 +278,32 @@ public class DistributedPacketStore
return list;
}
}
/**
* Sets thread pool size of message handler.
*
* @param poolSize
*/
private void setMessageHandlerThreadPoolSize(int poolSize) {
checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
messageHandlerThreadPoolSize = poolSize;
}
/**
* Restarts thread pool of message handler.
*/
private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
prevExecutor.shutdown();
}
/**
* Gets current thread pool size of message handler.
*
* @return messageHandlerThreadPoolSize
*/
private int getMessageHandlerThreadPoolSize() {
return messageHandlerThreadPoolSize;
}
}
......
......@@ -20,6 +20,8 @@ import com.google.common.base.Objects;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
......@@ -39,18 +41,24 @@ import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
......@@ -65,8 +73,7 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DistributedFlowStatisticStore implements FlowStatisticStore {
private final Logger log = getLogger(getClass());
// TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
......@@ -97,6 +104,12 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
private NodeId local;
private ExecutorService messageHandlingExecutor;
private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Size of thread pool to assign message handler")
private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
......@@ -104,7 +117,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers"));
clusterCommunicator.addSubscriber(
......@@ -126,6 +139,32 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
int newMessageHandlerThreadPoolSize;
try {
String s = get(properties, "messageHandlerThreadPoolSize");
newMessageHandlerThreadPoolSize =
isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
} catch (NumberFormatException e) {
log.warn(e.getMessage());
newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
}
// Any change in the following parameters implies thread pool restart
if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
restartMessageHandlerThreadPool();
}
log.info(FORMAT, messageHandlerThreadPoolSize);
}
@Override
public synchronized void removeFlowStatistic(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
......@@ -134,10 +173,16 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
}
// remove this rule if present from current map
current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
current.computeIfPresent(cp, (c, e) -> {
e.remove(rule);
return e;
});
// remove this on if present from previous map
previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
previous.computeIfPresent(cp, (c, e) -> {
e.remove(rule);
return e;
});
}
@Override
......@@ -286,4 +331,32 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
}
return null;
}
/**
* Sets thread pool size of message handler.
*
* @param poolSize
*/
private void setMessageHandlerThreadPoolSize(int poolSize) {
checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
messageHandlerThreadPoolSize = poolSize;
}
/**
* Restarts thread pool of message handler.
*/
private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
prevExecutor.shutdown();
}
/**
* Gets current thread pool size of message handler.
*
* @return messageHandlerThreadPoolSize
*/
private int getMessageHandlerThreadPoolSize() {
return messageHandlerThreadPoolSize;
}
}
\ No newline at end of file
......
......@@ -20,6 +20,8 @@ import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
......@@ -39,11 +41,14 @@ import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
......@@ -51,6 +56,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
......@@ -67,8 +75,7 @@ public class DistributedStatisticStore implements StatisticStore {
private final Logger log = getLogger(getClass());
// TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
......@@ -101,13 +108,18 @@ public class DistributedStatisticStore implements StatisticStore {
private ExecutorService messageHandlingExecutor;
private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Size of thread pool to assign message handler")
private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers"));
clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
......@@ -133,6 +145,33 @@ public class DistributedStatisticStore implements StatisticStore {
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
int newMessageHandlerThreadPoolSize;
try {
String s = get(properties, "messageHandlerThreadPoolSize");
newMessageHandlerThreadPoolSize =
isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
} catch (NumberFormatException e) {
log.warn(e.getMessage());
newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
}
// Any change in the following parameters implies thread pool restart
if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
restartMessageHandlerThreadPool();
}
log.info(FORMAT, messageHandlerThreadPoolSize);
}
@Override
public void prepareForStatistics(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
......@@ -314,4 +353,32 @@ public class DistributedStatisticStore implements StatisticStore {
}
/**
* Sets thread pool size of message handler.
*
* @param poolSize
*/
private void setMessageHandlerThreadPoolSize(int poolSize) {
checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
messageHandlerThreadPoolSize = poolSize;
}
/**
* Restarts thread pool of message handler.
*/
private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
prevExecutor.shutdown();
}
/**
* Gets current thread pool size of message handler.
*
* @return messageHandlerThreadPoolSize
*/
private int getMessageHandlerThreadPoolSize() {
return messageHandlerThreadPoolSize;
}
}
......