helenyrwu
Committed by Gerrit Code Review

[ONOS-4681] Enables device registration in GossipDeviceStore,

exposes availability, and polls NETCONF device reachability.

Change-Id: I5492c7b6109c3431d71555a9104c7e97fc6e75be
......@@ -38,6 +38,7 @@ public class DefaultDeviceDescription extends AbstractDescription
private final String swVersion;
private final String serialNumber;
private final ChassisId chassisId;
private final boolean defaultAvailable;
/**
* Creates a device description using the supplied information.
......@@ -55,6 +56,28 @@ public class DefaultDeviceDescription extends AbstractDescription
String hwVersion, String swVersion,
String serialNumber, ChassisId chassis,
SparseAnnotations... annotations) {
this(uri, type, manufacturer, hwVersion, swVersion, serialNumber,
chassis, true, annotations);
}
/**
* Creates a device description using the supplied information.
*
* @param uri device URI
* @param type device type
* @param manufacturer device manufacturer
* @param hwVersion device HW version
* @param swVersion device SW version
* @param serialNumber device serial number
* @param chassis chassis id
* @param defaultAvailable optional whether device is by default available
* @param annotations optional key/value annotations map
*/
public DefaultDeviceDescription(URI uri, Type type, String manufacturer,
String hwVersion, String swVersion,
String serialNumber, ChassisId chassis,
boolean defaultAvailable,
SparseAnnotations... annotations) {
super(annotations);
this.uri = checkNotNull(uri, "Device URI cannot be null");
this.type = checkNotNull(type, "Device type cannot be null");
......@@ -63,6 +86,7 @@ public class DefaultDeviceDescription extends AbstractDescription
this.swVersion = swVersion;
this.serialNumber = serialNumber;
this.chassisId = chassis;
this.defaultAvailable = defaultAvailable;
}
/**
......@@ -74,7 +98,7 @@ public class DefaultDeviceDescription extends AbstractDescription
SparseAnnotations... annotations) {
this(base.deviceUri(), base.type(), base.manufacturer(),
base.hwVersion(), base.swVersion(), base.serialNumber(),
base.chassisId(), annotations);
base.chassisId(), base.isDefaultAvailable(), annotations);
}
/**
......@@ -83,10 +107,26 @@ public class DefaultDeviceDescription extends AbstractDescription
* @param type device type
* @param annotations Annotations to use.
*/
public DefaultDeviceDescription(DeviceDescription base, Type type, SparseAnnotations... annotations) {
public DefaultDeviceDescription(DeviceDescription base, Type type,
SparseAnnotations... annotations) {
this(base.deviceUri(), type, base.manufacturer(),
base.hwVersion(), base.swVersion(), base.serialNumber(),
base.chassisId(), annotations);
base.chassisId(), base.isDefaultAvailable(), annotations);
}
/**
* Creates a device description using the supplied information.
*
* @param base DeviceDescription to basic information (except for defaultAvailable)
* @param defaultAvailable whether device should be made available by default
* @param annotations Annotations to use.
*/
public DefaultDeviceDescription(DeviceDescription base,
boolean defaultAvailable,
SparseAnnotations... annotations) {
this(base.deviceUri(), base.type(), base.manufacturer(),
base.hwVersion(), base.swVersion(), base.serialNumber(),
base.chassisId(), defaultAvailable, annotations);
}
@Override
......@@ -125,6 +165,11 @@ public class DefaultDeviceDescription extends AbstractDescription
}
@Override
public boolean isDefaultAvailable() {
return defaultAvailable;
}
@Override
public String toString() {
return toStringHelper(this)
.add("uri", uri).add("type", type).add("mfr", manufacturer)
......@@ -137,7 +182,8 @@ public class DefaultDeviceDescription extends AbstractDescription
@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), uri, type, manufacturer,
hwVersion, swVersion, serialNumber, chassisId);
hwVersion, swVersion, serialNumber, chassisId,
defaultAvailable);
}
@Override
......@@ -153,7 +199,8 @@ public class DefaultDeviceDescription extends AbstractDescription
&& Objects.equal(this.hwVersion, that.hwVersion)
&& Objects.equal(this.swVersion, that.swVersion)
&& Objects.equal(this.serialNumber, that.serialNumber)
&& Objects.equal(this.chassisId, that.chassisId);
&& Objects.equal(this.chassisId, that.chassisId)
&& Objects.equal(this.defaultAvailable, that.defaultAvailable);
}
return false;
}
......@@ -167,5 +214,6 @@ public class DefaultDeviceDescription extends AbstractDescription
this.swVersion = null;
this.serialNumber = null;
this.chassisId = null;
this.defaultAvailable = true;
}
}
......
......@@ -77,4 +77,11 @@ public interface DeviceDescription extends Description {
*/
ChassisId chassisId();
/**
* Return whether device should be made available by default.
*
* @return default availability
*/
boolean isDefaultAvailable();
}
......
......@@ -52,8 +52,6 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
*/
Iterable<Device> getAvailableDevices();
/**
* Returns the device with the specified identifier.
*
......@@ -74,6 +72,7 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription);
// TODO: We may need to enforce that ancillary cannot interfere this state
/**
* Removes the specified infrastructure device.
......@@ -84,6 +83,14 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
DeviceEvent markOffline(DeviceId deviceId);
/**
* Marks the device as available.
*
* @param deviceId device identifier
* @return true if availability change request was accepted and changed the state
*/
boolean markOnline(DeviceId deviceId);
/**
* Updates the ports of the specified infrastructure device using the given
* list of port descriptions. The list is assumed to be comprehensive.
*
......
......@@ -57,6 +57,11 @@ public class DeviceStoreAdapter implements DeviceStore {
}
@Override
public boolean markOnline(DeviceId deviceId) {
return false;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
return null;
}
......
......@@ -245,6 +245,13 @@ public class SimpleDeviceStore
}
}
// implement differently if desired
@Override
public boolean markOnline(DeviceId deviceId) {
log.warn("Mark online not supported");
return false;
}
@Override
public List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
......
......@@ -79,7 +79,8 @@ public final class BasicDeviceOperator implements ConfigOperator {
SparseAnnotations sa = combine(bdc, descr.annotations());
return new DefaultDeviceDescription(descr.deviceUri(), type, manufacturer,
hwVersion, swVersion,
serial, descr.chassisId(), sa);
serial, descr.chassisId(),
descr.isDefaultAvailable(), sa);
}
/**
......
......@@ -64,7 +64,6 @@ import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.OpticalPortConfig;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceDescription;
......@@ -288,6 +287,19 @@ public class DeviceManager
log.trace("Checking device {}", deviceId);
if (!isReachable(deviceId)) {
if (mastershipService.getLocalRole(deviceId) != NONE) {
// can't be master if device is not reachable
try {
post(store.markOffline(deviceId));
//relinquish master role and ability to be backup.
mastershipService.relinquishMastership(deviceId).get();
} catch (InterruptedException e) {
log.warn("Interrupted while reliquishing role for {}", deviceId);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Exception thrown while relinquishing role for {}", deviceId, e);
}
}
continue;
}
......@@ -334,7 +346,6 @@ public class DeviceManager
}
provider.roleChanged(deviceId, newRole);
// not triggering probe when triggered by provider service event
return true;
}
......@@ -360,12 +371,15 @@ public class DeviceManager
DeviceEvent event = store.createOrUpdateDevice(provider().id(), deviceId,
deviceDescription);
log.info("Device {} connected", deviceId);
if (deviceDescription.isDefaultAvailable()) {
log.info("Device {} connected", deviceId);
} else {
log.info("Device {} registered", deviceId);
}
if (event != null) {
log.trace("event: {} {}", event.type(), event);
post(event);
}
}
private PortDescription ensurePortEnabledState(PortDescription desc, boolean enabled) {
......@@ -684,19 +698,7 @@ public class DeviceManager
case MASTER:
final Device device = getDevice(did);
if ((device != null) && !isAvailable(did)) {
//flag the device as online. Is there a better way to do this?
DefaultDeviceDescription deviceDescription
= new DefaultDeviceDescription(did.uri(),
device.type(),
device.manufacturer(),
device.hwVersion(),
device.swVersion(),
device.serialNumber(),
device.chassisId());
DeviceEvent devEvent =
store.createOrUpdateDevice(device.providerId(), did,
deviceDescription);
post(devEvent);
store.markOnline(did);
}
// TODO: should apply role only if there is mismatch
log.debug("Applying role {} to {}", myNextRole, did);
......
......@@ -275,6 +275,7 @@ public class ECDeviceStore
return devices.get(deviceId);
}
// FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
@Override
public DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
......@@ -392,8 +393,13 @@ public class ECDeviceStore
return null;
}
private boolean markOnline(DeviceId deviceId) {
return availableDevices.add(deviceId);
// FIXME publicization of markOnline -- trigger some action independently?
public boolean markOnline(DeviceId deviceId) {
if (devices.containsKey(deviceId)) {
return availableDevices.add(deviceId);
}
log.warn("Device {} does not exist in store", deviceId);
return false;
}
@Override
......
......@@ -341,7 +341,7 @@ public class GossipDeviceStore
if (deviceEvent != null) {
log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
providerId, deviceId);
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
}
......@@ -406,11 +406,16 @@ public class GossipDeviceStore
return null;
}
if (oldDevice == null) {
// REGISTER
if (!deltaDesc.value().isDefaultAvailable()) {
return registerDevice(providerId, newDevice);
}
// ADD
return createDevice(providerId, newDevice, deltaDesc.timestamp());
} else {
// UPDATE or ignore (no change or stale)
return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp(),
deltaDesc.value().isDefaultAvailable());
}
}
}
......@@ -437,7 +442,8 @@ public class GossipDeviceStore
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId,
Device oldDevice,
Device newDevice, Timestamp newTimestamp) {
Device newDevice, Timestamp newTimestamp,
boolean forceAvailable) {
// We allow only certain attributes to trigger update
boolean propertiesChanged =
!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
......@@ -461,7 +467,7 @@ public class GossipDeviceStore
event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
if (!providerId.isAncillary()) {
if (!providerId.isAncillary() && forceAvailable) {
boolean wasOnline = availableDevices.contains(newDevice.id());
markOnline(newDevice.id(), newTimestamp);
if (!wasOnline) {
......@@ -471,6 +477,20 @@ public class GossipDeviceStore
return event;
}
private DeviceEvent registerDevice(ProviderId providerId, Device newDevice) {
// update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
verify(oldDevice == null,
"Unexpected Device in cache. PID:%s [old=%s, new=%s]",
providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) {
markOffline(newDevice.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
......@@ -514,6 +534,24 @@ public class GossipDeviceStore
}
}
public boolean markOnline(DeviceId deviceId) {
if (devices.containsKey(deviceId)) {
final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Map<?, ?> deviceLock = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (deviceLock) {
if (markOnline(deviceId, timestamp)) {
notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, getDevice(deviceId), null));
return true;
} else {
return false;
}
}
}
log.warn("Device {} does not exist in store", deviceId);
return false;
}
/**
* Marks the device as available if the given timestamp is not outdated,
* compared to the time the device has been marked offline.
......@@ -1576,7 +1614,8 @@ public class GossipDeviceStore
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
try {
notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
deviceDescription));
} catch (Exception e) {
log.warn("Exception thrown handling device update", e);
}
......
......@@ -450,7 +450,6 @@ public class GrpcRemoteServiceTest {
deviceConnected.countDown();
}
final CountDownLatch updatePorts = new CountDownLatch(1);
DeviceId updatePortsDid;
List<PortDescription> updatePortsDescs;
......
......@@ -258,7 +258,6 @@ public class IsisTopologyProviderTest {
}
@Override
public void deviceDisconnected(DeviceId deviceId) {
......
......@@ -67,6 +67,9 @@ import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
......@@ -114,17 +117,21 @@ public class NetconfDeviceProvider extends AbstractProvider
private static final String IPADDRESS = "ipaddress";
private static final String NETCONF = "netconf";
private static final String PORT = "port";
private static final int CORE_POOL_SIZE = 10;
//FIXME eventually a property
private static final int ISREACHABLE_TIMEOUT = 2000;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 30;
private final ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider",
"device-installer-%d", log));
protected ScheduledExecutorService connectionExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
private DeviceProviderService providerService;
private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
private InternalDeviceListener deviceListener = new InternalDeviceListener();
private NodeId localNodeId;
private ScheduledFuture<?> scheduledTask;
private final ConfigFactory factory =
new ConfigFactory<ApplicationId, NetconfProviderConfig>(APP_SUBJECT_FACTORY,
......@@ -152,6 +159,7 @@ public class NetconfDeviceProvider extends AbstractProvider
deviceService.addListener(deviceListener);
executor.execute(NetconfDeviceProvider.this::connectDevices);
localNodeId = clusterService.getLocalNode().id();
scheduledTask = schedulePolling();
log.info("Started");
}
......@@ -169,6 +177,7 @@ public class NetconfDeviceProvider extends AbstractProvider
providerRegistry.unregister(this);
providerService = null;
cfgService.unregisterConfigFactory(factory);
scheduledTask.cancel(true);
executor.shutdown();
log.info("Stopped");
}
......@@ -177,6 +186,15 @@ public class NetconfDeviceProvider extends AbstractProvider
super(new ProviderId(SCHEME_NAME, DEVICE_PROVIDER_PACKAGE));
}
// Checks connection to devices in the config file
// every DEFAULT_POLL_FREQUENCY_SECONDS seconds.
private ScheduledFuture schedulePolling() {
return connectionExecutor.scheduleAtFixedRate(this::checkAndUpdateDevices,
DEFAULT_POLL_FREQUENCY_SECONDS / 10,
DEFAULT_POLL_FREQUENCY_SECONDS,
TimeUnit.SECONDS);
}
@Override
public void triggerProbe(DeviceId deviceId) {
// TODO: This will be implemented later.
......@@ -270,8 +288,14 @@ public class NetconfDeviceProvider extends AbstractProvider
@Override
public void deviceRemoved(DeviceId deviceId) {
Preconditions.checkNotNull(deviceId, ISNULL);
log.debug("Netconf device {} removed from Netconf subController", deviceId);
providerService.deviceDisconnected(deviceId);
if (deviceService.getDevice(deviceId) != null) {
providerService.deviceDisconnected(deviceId);
log.debug("Netconf device {} removed from Netconf subController", deviceId);
} else {
log.warn("Netconf device {} does not exist in the store, " +
"it may already have been removed", deviceId);
}
}
}
......@@ -295,13 +319,67 @@ public class NetconfDeviceProvider extends AbstractProvider
Device.Type.SWITCH,
UNKNOWN, UNKNOWN,
UNKNOWN, UNKNOWN,
cid,
cid, false,
annotations);
deviceKeyAdminService.addKey(
DeviceKey.createDeviceKeyUsingUsernamePassword(
DeviceKeyId.deviceKeyId(deviceId.toString()),
null, addr.name(), addr.password()));
if (deviceService.getDevice(deviceId) == null) {
providerService.deviceConnected(deviceId, deviceDescription);
}
checkAndUpdateDevice(deviceId, deviceDescription);
});
} catch (ConfigException e) {
log.error("Cannot read config error " + e);
}
}
}
private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription) {
if (deviceService.getDevice(deviceId) == null) {
log.warn("Device {} has not been added to store, " +
"maybe due to a problem in connectivity", deviceId);
} else {
boolean isReachable = isReachable(deviceId);
if (isReachable && !deviceService.isAvailable(deviceId)) {
providerService.deviceConnected(
deviceId, new DefaultDeviceDescription(
deviceDescription, true, deviceDescription.annotations()));
} else if (!isReachable && deviceService.isAvailable(deviceId)) {
providerService.deviceDisconnected(deviceId);
}
}
}
private void checkAndUpdateDevices() {
NetconfProviderConfig cfg = cfgService.getConfig(appId, NetconfProviderConfig.class);
if (cfg != null) {
log.info("Checking connection to devices in configuration");
try {
cfg.getDevicesAddresses().stream().forEach(addr -> {
DeviceId deviceId = getDeviceId(addr.ip().toString(), addr.port());
Preconditions.checkNotNull(deviceId, ISNULL);
//Netconf configuration object
ChassisId cid = new ChassisId();
String ipAddress = addr.ip().toString();
SparseAnnotations annotations = DefaultAnnotations.builder()
.set(IPADDRESS, ipAddress)
.set(PORT, String.valueOf(addr.port()))
.set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase())
.build();
DeviceDescription deviceDescription = new DefaultDeviceDescription(
deviceId.uri(),
Device.Type.SWITCH,
UNKNOWN, UNKNOWN,
UNKNOWN, UNKNOWN,
cid, false,
annotations);
deviceKeyAdminService.addKey(
DeviceKey.createDeviceKeyUsingUsernamePassword(
DeviceKeyId.deviceKeyId(deviceId.toString()),
null, addr.name(), addr.password()));
providerService.deviceConnected(deviceId, deviceDescription);
checkAndUpdateDevice(deviceId, deviceDescription);
});
} catch (ConfigException e) {
log.error("Cannot read config error " + e);
......