Thomas Vachuska

ONOS-841 Renaming ONOS threads to simplify profiling.

Change-Id: I83a96bd875a0af9f3b78c06a9f9107c093b8e64e
Showing 24 changed files with 65 additions and 65 deletions
......@@ -15,17 +15,6 @@
*/
package org.onosproject.sdnip.bgp;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
......@@ -37,33 +26,44 @@ import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.Ip4Prefix;
import org.onlab.packet.Ip6Prefix;
import org.onlab.packet.IpPrefix;
import org.onosproject.sdnip.RouteListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.onlab.util.Tools.namedThreads;
/**
* BGP Session Manager class.
*/
public class BgpSessionManager {
private static final Logger log =
LoggerFactory.getLogger(BgpSessionManager.class);
LoggerFactory.getLogger(BgpSessionManager.class);
boolean isShutdown = true;
private Channel serverChannel; // Listener for incoming BGP connections
private ServerBootstrap serverBootstrap;
private ChannelGroup allChannels = new DefaultChannelGroup();
private ConcurrentMap<SocketAddress, BgpSession> bgpSessions =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>();
private Ip4Address myBgpId; // Same BGP ID for all peers
private BgpRouteSelector bgpRouteSelector = new BgpRouteSelector(this);
private ConcurrentMap<Ip4Prefix, BgpRouteEntry> bgpRoutes4 =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>();
private ConcurrentMap<Ip6Prefix, BgpRouteEntry> bgpRoutes6 =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>();
private final RouteListener routeListener;
......@@ -200,7 +200,7 @@ public class BgpSessionManager {
//
if (bgpSession.getLocalAddress() instanceof InetSocketAddress) {
InetAddress inetAddr =
((InetSocketAddress) bgpSession.getLocalAddress()).getAddress();
((InetSocketAddress) bgpSession.getLocalAddress()).getAddress();
Ip4Address ip4Address = Ip4Address.valueOf(inetAddr.getAddress());
updateMyBgpId(ip4Address);
}
......@@ -252,33 +252,33 @@ public class BgpSessionManager {
* Starts up BGP Session Manager operation.
*
* @param listenPortNumber the port number to listen on. By default
* it should be BgpConstants.BGP_PORT (179)
* it should be BgpConstants.BGP_PORT (179)
*/
public void start(int listenPortNumber) {
log.debug("BGP Session Manager start.");
isShutdown = false;
ChannelFactory channelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(namedThreads("BGP-SM-boss-%d")),
Executors.newCachedThreadPool(namedThreads("BGP-SM-worker-%d")));
newCachedThreadPool(namedThreads("onos-bgp-sm-boss-%d")),
newCachedThreadPool(namedThreads("onos-bgp-sm-worker-%d")));
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Allocate a new session per connection
BgpSession bgpSessionHandler =
@Override
public ChannelPipeline getPipeline() throws Exception {
// Allocate a new session per connection
BgpSession bgpSessionHandler =
new BgpSession(BgpSessionManager.this);
BgpFrameDecoder bgpFrameDecoder =
BgpFrameDecoder bgpFrameDecoder =
new BgpFrameDecoder(bgpSessionHandler);
// Setup the processing pipeline
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("BgpFrameDecoder", bgpFrameDecoder);
pipeline.addLast("BgpSession", bgpSessionHandler);
return pipeline;
}
};
// Setup the processing pipeline
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("BgpFrameDecoder", bgpFrameDecoder);
pipeline.addLast("BgpSession", bgpSessionHandler);
return pipeline;
}
};
InetSocketAddress listenAddress =
new InetSocketAddress(listenPortNumber);
new InetSocketAddress(listenPortNumber);
serverBootstrap = new ServerBootstrap(channelFactory);
// serverBootstrap.setOptions("reuseAddr", true);
......
......@@ -45,7 +45,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
private final Logger log = getLogger(getClass());
private final ExecutorService executor =
newSingleThreadExecutor(namedThreads("event-dispatch-%d"));
newSingleThreadExecutor(namedThreads("onos-event-dispatch-%d"));
@SuppressWarnings("unchecked")
private static final Event KILL_PILL = new AbstractEvent(null, 0) {
......
......@@ -111,7 +111,7 @@ public class DeviceManager
@Activate
public void activate() {
backgroundService = Executors.newSingleThreadScheduledExecutor(namedThreads("device-manager-background"));
backgroundService = Executors.newSingleThreadScheduledExecutor(namedThreads("onos-device-manager-background"));
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
......
......@@ -104,7 +104,7 @@ public class FlowRuleManager
@Activate
public void activate() {
futureService =
Executors.newFixedThreadPool(32, namedThreads("provider-future-listeners-%d"));
Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
......
......@@ -116,7 +116,7 @@ public class DefaultTopologyProvider extends AbstractProvider
@Activate
public synchronized void activate(ComponentContext context) {
executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-build-%d"));
executor = newFixedThreadPool(MAX_THREADS, namedThreads("onos-topo-build-%d"));
accumulator = new TopologyChangeAccumulator();
logConfig("Configured");
......
......@@ -266,7 +266,7 @@ public class HazelcastLeadershipService implements LeadershipService,
*/
private void start() {
isShutdown = false;
String threadPoolName = "leader-election-" + topicName + "-%d";
String threadPoolName = "onos-leader-election-" + topicName + "-%d";
leaderElectionExecutor = Executors.newScheduledThreadPool(2,
namedThreads(threadPoolName));
......
......@@ -74,7 +74,7 @@ public class LeadershipManager implements LeadershipService {
// TODO: Make Thread pool size configurable.
private final ScheduledExecutorService threadPool =
Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
Executors.newScheduledThreadPool(25, namedThreads("onos-leadership-manager-%d"));
private static final MessageSubject LEADERSHIP_UPDATES =
new MessageSubject("leadership-contest-updates");
......
......@@ -187,10 +187,10 @@ public class GossipDeviceStore
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-device-bg-%d")));
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
......
......@@ -146,10 +146,10 @@ public class DistributedFlowRuleStore
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(namedThreads("async-backups"));
Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
private boolean syncBackup = false;
......
......@@ -165,10 +165,10 @@ public class GossipHostStore
HOST_ANTI_ENTROPY_ADVERTISEMENT,
new InternalHostAntiEntropyAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
executor = Executors.newCachedThreadPool(namedThreads("onos-host-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-host-bg-%d")));
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
......
......@@ -132,10 +132,10 @@ public class GossipIntentStore
INTENT_ANTI_ENTROPY_ADVERTISEMENT,
new InternalIntentAntiEntropyAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("intent-fg-%d"));
executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("intent-bg-%d")));
newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
// start anti-entropy thread
//backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
......
......@@ -163,10 +163,10 @@ public class GossipLinkStore
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-link-bg-%d")));
long initialDelaySec = 5;
long periodSec = 5;
......
......@@ -103,7 +103,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
public synchronized CompletableFuture<Void> connect() {
if (pool == null || pool.isShutdown()) {
// TODO include remote name?
pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-client-%d"));
}
return CompletableFuture.completedFuture(null);
}
......
......@@ -62,7 +62,7 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
@Override
public CompletableFuture<Void> listen() {
if (pool == null || pool.isShutdown()) {
pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-server-%d"));
}
clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
......
......@@ -55,7 +55,7 @@ public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("database-stale-entry-expirer-%d"));
Executors.newCachedThreadPool(namedThreads("onos-db-stale-entry-expirer-%d"));
private final Logger log = LoggerFactory.getLogger(getClass());
......
......@@ -218,7 +218,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
copycat.start().get();
executor =
newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
newSingleThreadScheduledExecutor(namedThreads("onos-db-heartbeat-%d"));
executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
}
......
......@@ -66,7 +66,7 @@ public class DatabaseStateMachine implements StateMachine {
private final Logger log = getLogger(getClass());
private final ExecutorService updatesExecutor =
Executors.newSingleThreadExecutor(namedThreads("database-statemachine-updates"));
Executors.newSingleThreadExecutor(namedThreads("onos-db-statemachine-updates"));
// message subject for database update notifications.
public static final MessageSubject DATABASE_UPDATE_EVENTS =
......
......@@ -53,7 +53,7 @@ import com.google.common.collect.Multimaps;
public class DistributedLockManager implements LockService {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
Executors.newCachedThreadPool(namedThreads("onos-lock-manager-%d"));
private final Logger log = getLogger(getClass());
......
......@@ -137,13 +137,13 @@ public class Controller {
if (workerThreads == 0) {
execFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(namedThreads("Controller-boss-%d")),
Executors.newCachedThreadPool(namedThreads("Controller-worker-%d")));
Executors.newCachedThreadPool(namedThreads("onos-of-boss-%d")),
Executors.newCachedThreadPool(namedThreads("onos-of-worker-%d")));
return new ServerBootstrap(execFactory);
} else {
execFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(namedThreads("Controller-boss-%d")),
Executors.newCachedThreadPool(namedThreads("Controller-worker-%d")), workerThreads);
Executors.newCachedThreadPool(namedThreads("onos-of-boss-%d")),
Executors.newCachedThreadPool(namedThreads("onos-of-worker-%d")), workerThreads);
return new ServerBootstrap(execFactory);
}
}
......
......@@ -69,11 +69,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
private final ExecutorService executorMsgs =
Executors.newFixedThreadPool(32,
namedThreads("of-event-stats-%d"));
namedThreads("onos-of-event-stats-%d"));
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4,
namedThreads("of-event-barrier-%d"));
namedThreads("onos-of-event-barrier-%d"));
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
......
......@@ -161,7 +161,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
}
executor = newSingleThreadScheduledExecutor(namedThreads("device-sync-%d"));
executor = newSingleThreadScheduledExecutor(namedThreads("onos-device-sync-%d"));
executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY,
DELAY, TimeUnit.SECONDS);
......
......@@ -73,7 +73,7 @@ public class NullDeviceProvider extends AbstractProvider implements DeviceProvid
private DeviceProviderService providerService;
private ExecutorService deviceBuilder = Executors.newFixedThreadPool(1,
namedThreads("null-device-creator"));
namedThreads("onos-null-device-creator"));
//currently hardcoded. will be made configurable via rest/cli.
......
......@@ -93,7 +93,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
private final List<DeviceId> devices = Lists.newArrayList();
private ExecutorService linkDriver = Executors.newFixedThreadPool(1,
namedThreads("null-link-driver"));
namedThreads("onos-null-link-driver"));
// If true, 'flickers' links by alternating link up/down events at eventRate
@Property(name = "flicker", boolValue = FLICKER,
......
......@@ -83,7 +83,7 @@ public class NullPacketProvider extends AbstractProvider implements
private int pktRate = DEFAULT_RATE;
private ExecutorService packetDriver = Executors.newFixedThreadPool(1,
namedThreads("null-packet-driver"));
namedThreads("onos-null-packet-driver"));
public NullPacketProvider() {
super(new ProviderId("null", "org.onosproject.provider.nil"));
......