Thomas Vachuska
Committed by Gerrit Code Review

Fixed a slew of shutdown exceptions that arose due to improper or out-of-order r…

…esource clean-up, e.g. listeners, timers, executors.

Change-Id: I37c351c4202b32e92c076d9d566b96d7ff8d313a
......@@ -114,11 +114,8 @@ public class ComponentConfigManager implements ComponentConfigService {
String componentName = componentClass.getName();
checkNotNull(componentName, COMPONENT_NULL);
Map<String, ConfigProperty> cps = properties.remove(componentName);
if (cps != null) {
if (clear && cps != null) {
cps.keySet().forEach(name -> store.unsetProperty(componentName, name));
}
if (clear) {
clearExistingValues(componentName);
}
}
......
......@@ -95,11 +95,11 @@ public class ProxyArpManager implements ProxyArpService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final Multimap<Device, PortNumber> internalPorts =
HashMultimap.<Device, PortNumber>create();
private final Multimap<Device, PortNumber> internalPorts = HashMultimap.create();
private final Multimap<Device, PortNumber> externalPorts = HashMultimap.create();
private final Multimap<Device, PortNumber> externalPorts =
HashMultimap.<Device, PortNumber>create();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final InternalLinkListener linkListener = new InternalLinkListener();
/**
* Listens to both device service and link service to determine
......@@ -107,16 +107,17 @@ public class ProxyArpManager implements ProxyArpService {
*/
@Activate
public void activate() {
deviceService.addListener(new InternalDeviceListener());
linkService.addListener(new InternalLinkListener());
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
determinePortLocations();
log.info("Started");
}
@Deactivate
public void deactivate() {
deviceService.removeListener(deviceListener);
linkService.removeListener(linkListener);
log.info("Stopped");
}
......
......@@ -96,14 +96,14 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public final void sendMsg(OFMessage m) {
if (role == RoleState.MASTER) {
if (role == RoleState.MASTER && channel.isWritable()) {
channel.write(Collections.singletonList(m));
}
}
@Override
public final void sendMsg(List<OFMessage> msgs) {
if (role == RoleState.MASTER) {
if (role == RoleState.MASTER && channel.isWritable()) {
channel.write(msgs);
}
}
......
......@@ -73,6 +73,8 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
private static final String PROVIDER_NAME = "org.onosproject.provider.lldp";
private static final String PROP_USE_BDDP = "useBDDP";
private static final String PROP_DISABLE_LD = "disableLinkDiscovery";
private static final String PROP_LLDP_SUPPRESSION = "lldpSuppression";
......@@ -132,13 +134,13 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
* Creates an OpenFlow link provider.
*/
public LLDPLinkProvider() {
super(new ProviderId("lldp", "org.onosproject.provider.lldp"));
super(new ProviderId("lldp", PROVIDER_NAME));
}
@Activate
public void activate(ComponentContext context) {
cfgService.registerProperties(getClass());
appId = coreService.registerApplication("org.onosproject.provider.lldp");
appId = coreService.registerApplication(PROVIDER_NAME);
// to load configuration at startup
modified(context);
......@@ -188,14 +190,14 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
if (disableLinkDiscovery) {
return;
}
executor.shutdownNow();
for (LinkDiscovery ld : discoverers.values()) {
ld.stop();
}
providerRegistry.unregister(this);
deviceService.removeListener(listener);
packetService.removeProcessor(listener);
masterService.removeListener(roleListener);
executor.shutdownNow();
discoverers.values().forEach(LinkDiscovery::stop);
discoverers.clear();
providerService = null;
log.info("Stopped");
......
......@@ -104,8 +104,8 @@ public class LinkDiscovery implements TimerTask {
this.pktService = pktService;
this.mastershipService = checkNotNull(masterService, "WTF!");
this.slowPorts = Collections.synchronizedSet(new HashSet<Long>());
this.fastPorts = Collections.synchronizedSet(new HashSet<Long>());
this.slowPorts = Collections.synchronizedSet(new HashSet<>());
this.fastPorts = Collections.synchronizedSet(new HashSet<>());
this.portProbeCount = new HashMap<>();
this.lldpPacket = new ONOSLLDP();
this.lldpPacket.setChassisId(device.chassisId());
......@@ -296,14 +296,14 @@ public class LinkDiscovery implements TimerTask {
}
public synchronized void stop() {
timeout.cancel();
isStopped = true;
timeout.cancel();
}
public synchronized void start() {
if (isStopped) {
timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
isStopped = false;
timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
} else {
log.warn("LinkDiscovery started multiple times?");
}
......@@ -361,8 +361,8 @@ public class LinkDiscovery implements TimerTask {
return slowPorts.contains(portNumber) || fastPorts.contains(portNumber);
}
public boolean isStopped() {
return isStopped;
public synchronized boolean isStopped() {
return isStopped || timeout.isCancelled();
}
}
......
......@@ -268,8 +268,8 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
providerService.deviceConnected(did, description);
providerService.updatePorts(did, buildPortDescriptions(sw));
PortStatsCollector psc = new PortStatsCollector(
controller.getSwitch(dpid), POLL_INTERVAL);
PortStatsCollector psc =
new PortStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
psc.start();
collectors.put(dpid, psc);
}
......@@ -314,7 +314,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
/**
* Translates a RoleState to the corresponding MastershipRole.
*
* @param response
* @param response role state
* @return a MastershipRole
*/
private MastershipRole roleOf(RoleState response) {
......@@ -334,7 +334,6 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
/**
* Builds a list of port descriptions for a given list of ports.
*
* @param ports the list of ports
* @return list of portdescriptions
*/
private List<PortDescription> buildPortDescriptions(OpenFlowSwitch sw) {
......
......@@ -45,8 +45,7 @@ public class PortStatsCollector implements TimerTask {
private final AtomicLong xidAtomic = new AtomicLong(1);
private Timeout timeout;
private boolean stopTimer = false;
private volatile boolean stopped;
/**
* Creates a GroupStatsCollector object.
......@@ -60,23 +59,22 @@ public class PortStatsCollector implements TimerTask {
}
@Override
public void run(Timeout timeout) throws Exception {
public void run(Timeout to) throws Exception {
if (stopped || timeout.isCancelled()) {
return;
}
log.trace("Collecting stats for {}", sw.getStringId());
sendPortStatistic();
if (!this.stopTimer) {
if (!stopped && !timeout.isCancelled()) {
log.trace("Scheduling stats collection in {} seconds for {}",
this.refreshInterval, this.sw.getStringId());
timeout.getTimer().newTimeout(this, refreshInterval,
TimeUnit.SECONDS);
timeout.getTimer().newTimeout(this, refreshInterval, TimeUnit.SECONDS);
}
}
private void sendPortStatistic() {
if (log.isTraceEnabled()) {
log.trace("sendGroupStatistics {}:{}", sw.getStringId(), sw.getRole());
}
if (sw.getRole() != RoleState.MASTER) {
return;
}
......@@ -91,17 +89,18 @@ public class PortStatsCollector implements TimerTask {
/**
* Starts the collector.
*/
public void start() {
public synchronized void start() {
log.info("Starting Port Stats collection thread for {}", sw.getStringId());
stopped = false;
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**
* Stops the collector.
*/
public void stop() {
public synchronized void stop() {
log.info("Stopping Port Stats collection thread for {}", sw.getStringId());
this.stopTimer = true;
stopped = true;
timeout.cancel();
}
}
......
......@@ -43,6 +43,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
......@@ -301,7 +302,11 @@ public class NettyMessagingManager implements MessagingService {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
dispatchLocally(message);
try {
dispatchLocally(message);
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
}
}
@Override
......
......@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.eclipse.jetty.websocket.WebSocket;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.osgi.ServiceNotFoundException;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.ui.UiConnection;
......@@ -100,11 +101,19 @@ public class UiWebSocket
@Override
public void onOpen(Connection connection) {
log.info("GUI client connected");
this.connection = connection;
this.control = (FrameConnection) connection;
createHandlers();
sendInstanceData();
try {
createHandlers();
sendInstanceData();
log.info("GUI client connected");
} catch (ServiceNotFoundException e) {
log.warn("Unable to open GUI connection; services have been shut-down");
this.connection.close();
this.connection = null;
this.control = null;
}
}
@Override
......