sangyun-han
Committed by Gerrit Code Review

[ONOS-4668] Refactoring port statistic collector using SharedExecutor

- Add OpenFlowSwitchAdapter

Change-Id: I7bd9c61d8961bee18eca2c1ac0e5fca610e166e5
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.openflow.controller;
import org.onosproject.net.Device;
import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPortDesc;
import java.util.List;
/**
* Test adapter for the OpenFlow switch interface.
*/
public class OpenFlowSwitchAdapter implements OpenFlowSwitch {
@Override
public void sendMsg(OFMessage msg) {
}
@Override
public void sendMsg(List<OFMessage> msgs) {
}
@Override
public void handleMessage(OFMessage fromSwitch) {
}
@Override
public void setRole(RoleState role) {
}
@Override
public RoleState getRole() {
return null;
}
@Override
public List<OFPortDesc> getPorts() {
return null;
}
@Override
public OFFactory factory() {
return null;
}
@Override
public String getStringId() {
return null;
}
@Override
public long getId() {
return 0;
}
@Override
public String manufacturerDescription() {
return null;
}
@Override
public String datapathDescription() {
return null;
}
@Override
public String hardwareDescription() {
return null;
}
@Override
public String softwareDescription() {
return null;
}
@Override
public String serialNumber() {
return null;
}
@Override
public boolean isConnected() {
return false;
}
@Override
public void disconnectSwitch() {
}
@Override
public void returnRoleReply(RoleState requested, RoleState response) {
}
@Override
public Device.Type deviceType() {
return null;
}
@Override
public String channelId() {
return null;
}
}
\ No newline at end of file
/*
* Copyright 2014-present Open Networking Laboratory
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -159,6 +160,8 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
label = "Frequency (in seconds) for polling switch Port statistics")
private int portStatsPollFrequency = POLL_INTERVAL;
private final Timer timer = new Timer("onos-openflow-collector");
private HashMap<Dpid, PortStatsCollector> collectors = Maps.newHashMap();
/**
......@@ -221,7 +224,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
// disconnect to trigger switch-add later
sw.disconnectSwitch();
}
PortStatsCollector psc = new PortStatsCollector(sw, portStatsPollFrequency);
PortStatsCollector psc = new PortStatsCollector(timer, sw, portStatsPollFrequency);
psc.start();
collectors.put(new Dpid(sw.getId()), psc);
}
......@@ -382,7 +385,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
providerService.deviceConnected(did, description);
providerService.updatePorts(did, buildPortDescriptions(sw));
PortStatsCollector psc = new PortStatsCollector(sw, portStatsPollFrequency);
PortStatsCollector psc = new PortStatsCollector(timer, sw, portStatsPollFrequency);
stopCollectorIfNeeded(collectors.put(dpid, psc));
psc.start();
......
......@@ -16,69 +16,87 @@
package org.onosproject.provider.of.device.impl;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onlab.util.SharedExecutors;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFPortStatsRequest;
import org.projectfloodlight.openflow.types.OFPort;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Sends Port Stats Request and collect the port statistics with a time interval.
*/
public class PortStatsCollector implements TimerTask {
public class PortStatsCollector {
// TODO: Refactoring is required using ScheduledExecutorService
private final HashedWheelTimer timer = Timer.getTimer();
private final OpenFlowSwitch sw;
private final Logger log = getLogger(getClass());
private static final int SECONDS = 1000;
private OpenFlowSwitch sw;
private Timer timer;
private TimerTask task;
private int refreshInterval;
private final AtomicLong xidAtomic = new AtomicLong(1);
private Timeout timeout;
private volatile boolean stopped;
/**
* Creates a PortStatsCollector object.
* Creates a port states collector object.
*
* @param sw Open Flow switch
* @param interval time interval for collecting port statistic
* @param timer timer to use for scheduling
* @param sw switch to pull
* @param interval interval for collecting port statistic
*/
public PortStatsCollector(OpenFlowSwitch sw, int interval) {
this.sw = sw;
PortStatsCollector(Timer timer, OpenFlowSwitch sw, int interval) {
this.timer = timer;
this.sw = checkNotNull(sw, "Null switch");
this.refreshInterval = interval;
}
@Override
public void run(Timeout to) throws Exception {
if (stopped || timeout.isCancelled()) {
return;
private class InternalTimerTask extends TimerTask {
@Override
public void run() {
sendPortStatistic();
}
log.trace("Collecting stats for {}", sw.getStringId());
}
sendPortStatistic();
/**
* Starts the port statistic collector.
*/
public synchronized void start() {
log.info("Starting Port Stats collection thread for {}", sw.getStringId());
task = new InternalTimerTask();
SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS,
refreshInterval * SECONDS);
}
if (!stopped && !timeout.isCancelled()) {
log.trace("Scheduling stats collection in {} seconds for {}",
this.refreshInterval, this.sw.getStringId());
timeout.getTimer().newTimeout(this, refreshInterval, TimeUnit.SECONDS);
}
/**
* Stops the port statistic collector.
*/
public synchronized void stop() {
log.info("Stopping Port Stats collection thread for {}", sw.getStringId());
task.cancel();
task = null;
}
synchronized void adjustPollInterval(int pollInterval) {
/**
* Adjusts poll interval of the port statistic collector and restart.
*
* @param pollInterval period of collecting port statistic
*/
public synchronized void adjustPollInterval(int pollInterval) {
this.refreshInterval = pollInterval;
// task.cancel();
// task = new InternalTimerTask();
// timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000);
task.cancel();
task = new InternalTimerTask();
timer.scheduleAtFixedRate(task, refreshInterval * SECONDS,
refreshInterval * SECONDS);
}
/**
......@@ -95,22 +113,4 @@ public class PortStatsCollector implements TimerTask {
.build();
sw.sendMsg(statsRequest);
}
/**
* Starts the collector.
*/
public synchronized void start() {
log.info("Starting Port Stats collection thread for {}", sw.getStringId());
stopped = false;
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**
* Stops the collector.
*/
public synchronized void stop() {
log.info("Stopping Port Stats collection thread for {}", sw.getStringId());
stopped = true;
timeout.cancel();
}
}
}
\ No newline at end of file
......