Madan Jampani

Bug fixes/improvements:

1. DeviceManager must not have a dependency on DeviceClockService. Clocks and Timestamps are purely distribution concerns.
2. Eliminated DeviceClockProviderService which merely served as a cache for mastership terms thereby introducing a source of staleness. Now we directly query mastership service which is already optimized for high volume reads.
3. DistributedLeadershipManager fix to ensure election won by local node immediately reflects in the local leaderboard. This is to ensure a subsequent read does not return a stale value.

Change-Id: I774b64923e382b788df5f8bde2a9fafc0294bad0
......@@ -15,6 +15,8 @@
*/
package org.onosproject.mastership;
import static org.onosproject.net.MastershipRole.MASTER;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -43,6 +45,16 @@ public interface MastershipService
MastershipRole getLocalRole(DeviceId deviceId);
/**
* Returns true if the local controller is the Master for the specified deviceId.
*
* @param deviceId the the identifier of the device
* @return true if local node is master; false otherwise
*/
default boolean isLocalMaster(DeviceId deviceId) {
return getLocalRole(deviceId) == MASTER;
}
/**
* Returns the mastership status of the local controller for a given
* device forcing master selection if necessary.
*
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.device;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
/**
* Interface for feeding term information to a logical clock service
* that vends per device timestamps.
*/
public interface DeviceClockProviderService {
/**
* Checks if this service can issue Timestamp for specified device.
*
* @param deviceId device identifier.
* @return true if timestamp can be issued for specified device
*/
boolean isTimestampAvailable(DeviceId deviceId);
/**
* Updates the mastership term for the specified deviceId.
*
* @param deviceId device identifier.
* @param term mastership term.
*/
void setMastershipTerm(DeviceId deviceId, MastershipTerm term);
}
package org.onosproject.net.device;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Timestamp;
/**
* Test adapter for device clock service.
*/
public class DeviceClockServiceAdapter implements DeviceClockService {
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return false;
}
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
return null;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.trivial;
import java.util.Set;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceClockProviderService;
import com.google.common.collect.Sets;
/**
* Dummy implementation of {@link DeviceClockProviderService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockProviderService implements DeviceClockProviderService {
private Set<DeviceId> registerdBefore = Sets.newConcurrentHashSet();
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
registerdBefore.add(deviceId);
}
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return registerdBefore.contains(deviceId);
}
}
......@@ -16,6 +16,8 @@
package org.onosproject.net.device.impl;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -46,7 +48,6 @@ import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceClockProviderService;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
......@@ -118,12 +119,8 @@ public class DeviceManager
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockProviderService deviceClockProviderService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigService networkConfigService;
@Activate
public void activate() {
backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
......@@ -316,31 +313,13 @@ public class DeviceManager
checkValidity();
deviceDescription = validateDevice(deviceDescription, deviceId);
// check my Role
CompletableFuture<MastershipRole> role = mastershipService.requestRoleFor(deviceId);
try {
// Device subsystem must wait for role assignment
// to avoid losing Device information.
// (This node could be the only Node connected to the Device.)
role.get();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting role-assignment for {}", deviceId);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Exception thrown while waiting role-assignment for {}",
deviceId, e);
}
// Establish my Role
Futures.getUnchecked(mastershipService.requestRoleFor(deviceId)
.thenAccept(role -> {
log.info("Local role is {} for {}", role, deviceId);
applyRole(deviceId, role);
}));
final MastershipTerm term = termService.getMastershipTerm(deviceId);
if (term == null || !localNodeId.equals(term.master())) {
log.info("Role of this node is STANDBY for {}", deviceId);
applyRole(deviceId, MastershipRole.STANDBY);
} else {
log.info("Role of this node is MASTER for {}", deviceId);
// tell clock provider if this instance is the master
deviceClockProviderService.setMastershipTerm(deviceId, term);
applyRole(deviceId, MastershipRole.MASTER);
}
DeviceEvent event = store.createOrUpdateDevice(provider().id(), deviceId,
deviceDescription);
if (event != null) {
......@@ -383,7 +362,7 @@ public class DeviceManager
port.portSpeed())));
store.updatePorts(this.provider().id(), deviceId, descs);
try {
if (mastershipService.getLocalRole(deviceId) == MASTER) {
if (mastershipService.isLocalMaster(deviceId)) {
post(store.markOffline(deviceId));
}
} catch (IllegalStateException e) {
......@@ -405,7 +384,6 @@ public class DeviceManager
// TODO: Move this type of check inside device clock manager, etc.
if (term != null && localNodeId.equals(term.master())) {
log.info("Retry marking {} offline", deviceId);
deviceClockProviderService.setMastershipTerm(deviceId, term);
post(store.markOffline(deviceId));
} else {
log.info("Failed again marking {} offline. {}", deviceId, role);
......@@ -431,7 +409,7 @@ public class DeviceManager
checkNotNull(portDescriptions,
"Port descriptions list cannot be null");
checkValidity();
if (!deviceClockProviderService.isTimestampAvailable(deviceId)) {
if (!mastershipService.isLocalMaster(deviceId)) {
// Never been a master for this device
// any update will be ignored.
log.trace("Ignoring {} port updates on standby node. {}", deviceId, portDescriptions);
......@@ -452,7 +430,7 @@ public class DeviceManager
checkNotNull(portDescription, PORT_DESCRIPTION_NULL);
checkValidity();
if (!deviceClockProviderService.isTimestampAvailable(deviceId)) {
if (!mastershipService.isLocalMaster(deviceId)) {
// Never been a master for this device
// any update will be ignored.
log.trace("Ignoring {} port update on standby node. {}", deviceId,
......@@ -664,7 +642,6 @@ public class DeviceManager
MastershipTerm term = termService.getMastershipTerm(did);
final boolean iHaveControl = term != null && localNodeId.equals(term.master());
if (iHaveControl) {
deviceClockProviderService.setMastershipTerm(did, term);
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
......
......@@ -39,7 +39,6 @@ import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceClockProviderService;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
......@@ -106,7 +105,6 @@ public class DeviceManagerTest {
mgr.mastershipService = mastershipManager;
mgr.termService = mastershipManager;
mgr.clusterService = new TestClusterService();
mgr.deviceClockProviderService = new TestClockProviderService();
mgr.networkConfigService = new TestNetworkConfigService();
mgr.activate();
......@@ -328,22 +326,6 @@ public class DeviceManagerTest {
}
private final class TestClockProviderService implements
DeviceClockProviderService {
private Set<DeviceId> registerdBefore = Sets.newConcurrentHashSet();
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
registerdBefore.add(deviceId);
}
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return registerdBefore.contains(deviceId);
}
}
private class TestNetworkConfigService extends NetworkConfigServiceAdapter {
}
}
......
......@@ -374,6 +374,10 @@ public class DistributedLeadershipManager implements LeadershipService {
leader.value(),
leader.version(),
leader.creationTime());
// Since reads only go through the local copy of leader board, we ought to update it
// first before returning from this method.
// This is to ensure a subsequent read will not read a stale value.
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
return newLeadership;
}
} catch (Exception e) {
......
......@@ -17,17 +17,19 @@ package org.onosproject.store.device.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.mastership.MastershipTermService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceClockProviderService;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.MastershipBasedTimestamp;
......@@ -38,16 +40,23 @@ import org.slf4j.Logger;
*/
@Component(immediate = true)
@Service
public class DeviceClockManager implements DeviceClockService, DeviceClockProviderService {
public class DeviceClockManager implements DeviceClockService {
private final Logger log = getLogger(getClass());
// TODO: Implement per device ticker that is reset to 0 at the beginning of a new term.
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipTermService mastershipTermService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
protected NodeId localNodeId;
private final AtomicLong ticker = new AtomicLong(0);
private ConcurrentMap<DeviceId, MastershipTerm> deviceMastershipTerms = new ConcurrentHashMap<>();
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
......@@ -58,23 +67,16 @@ public class DeviceClockManager implements DeviceClockService, DeviceClockProvid
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
MastershipTerm term = deviceMastershipTerms.get(deviceId);
log.trace("term info for {} is: {}", deviceId, term);
if (term == null) {
MastershipTerm term = mastershipTermService.getMastershipTerm(deviceId);
if (term == null || !localNodeId.equals(term.master())) {
throw new IllegalStateException("Requesting timestamp for " + deviceId + " without mastership");
}
return new MastershipBasedTimestamp(term.termNumber(), ticker.incrementAndGet());
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
log.debug("adding term info {} {}", deviceId, term.master());
deviceMastershipTerms.put(deviceId, term);
}
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return deviceMastershipTerms.containsKey(deviceId);
MastershipTerm term = mastershipTermService.getMastershipTerm(deviceId);
return term != null && localNodeId.equals(term.master());
}
}
......
......@@ -32,7 +32,6 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.Annotations;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
......@@ -44,18 +43,21 @@ import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceClockServiceAdapter;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceStore;
import org.onosproject.net.device.DeviceStoreDelegate;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.consistent.impl.DatabaseManager;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import java.io.IOException;
import java.util.Arrays;
......@@ -68,6 +70,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static java.util.Arrays.asList;
......@@ -134,8 +137,7 @@ public class GossipDeviceStoreTest {
private GossipDeviceStore gossipDeviceStore;
private DeviceStore deviceStore;
private DeviceClockManager deviceClockManager;
private DeviceClockService deviceClockService;
private DeviceClockService deviceClockService = new TestDeviceClockService();
private ClusterCommunicationService clusterCommunicator;
@BeforeClass
......@@ -149,13 +151,6 @@ public class GossipDeviceStoreTest {
@Before
public void setUp() throws Exception {
deviceClockManager = new DeviceClockManager();
deviceClockManager.activate();
deviceClockService = deviceClockManager;
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
......@@ -169,6 +164,7 @@ public class GossipDeviceStoreTest {
TestDatabaseManager testDatabaseManager = new TestDatabaseManager();
testDatabaseManager.init(clusterService, clusterCommunicator);
testGossipDeviceStore.storageService = testDatabaseManager;
testGossipDeviceStore.deviceClockService = deviceClockService;
gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate();
......@@ -180,7 +176,6 @@ public class GossipDeviceStoreTest {
@After
public void tearDown() throws Exception {
gossipDeviceStore.deactivate();
deviceClockManager.deactivate();
}
private void putDevice(DeviceId deviceId, String swVersion,
......@@ -891,6 +886,27 @@ public class GossipDeviceStoreTest {
}
}
private final class TestDeviceClockService extends DeviceClockServiceAdapter {
private final AtomicLong ticker = new AtomicLong();
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
if (DID1.equals(deviceId)) {
return new MastershipBasedTimestamp(1, ticker.getAndIncrement());
} else if (DID2.equals(deviceId)) {
return new MastershipBasedTimestamp(2, ticker.getAndIncrement());
} else {
throw new IllegalStateException();
}
}
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return DID1.equals(deviceId) || DID2.equals(deviceId);
}
}
private class TestDatabaseManager extends DatabaseManager {
void init(ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
......
......@@ -29,7 +29,6 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
......@@ -39,17 +38,19 @@ import org.onosproject.net.LinkKey;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceClockServiceAdapter;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkStore;
import org.onosproject.net.link.LinkStoreDelegate;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.device.impl.DeviceClockManager;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import java.util.HashMap;
import java.util.Map;
......@@ -57,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static org.easymock.EasyMock.*;
......@@ -115,7 +117,7 @@ public class GossipLinkStoreTest {
private GossipLinkStore linkStoreImpl;
private LinkStore linkStore;
private DeviceClockManager deviceClockManager;
private final AtomicLong ticker = new AtomicLong();
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
......@@ -129,14 +131,6 @@ public class GossipLinkStoreTest {
@Before
public void setUp() throws Exception {
deviceClockManager = new DeviceClockManager();
deviceClockManager.activate();
deviceClockService = deviceClockManager;
// set initial terms
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
// TODO mock clusterCommunicator
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
......@@ -149,6 +143,7 @@ public class GossipLinkStoreTest {
linkStoreImpl.deviceClockService = deviceClockService;
linkStoreImpl.clusterCommunicator = clusterCommunicator;
linkStoreImpl.clusterService = new TestClusterService();
linkStoreImpl.deviceClockService = new TestDeviceClockService();
linkStoreImpl.mastershipService = new TestMastershipService();
linkStoreImpl.activate();
linkStore = linkStoreImpl;
......@@ -607,6 +602,27 @@ public class GossipLinkStoreTest {
}
}
private final class TestDeviceClockService extends DeviceClockServiceAdapter {
private final AtomicLong ticker = new AtomicLong();
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
if (DID1.equals(deviceId)) {
return new MastershipBasedTimestamp(1, ticker.getAndIncrement());
} else if (DID2.equals(deviceId)) {
return new MastershipBasedTimestamp(2, ticker.getAndIncrement());
} else {
throw new IllegalStateException();
}
}
@Override
public boolean isTimestampAvailable(DeviceId deviceId) {
return DID1.equals(deviceId) || DID2.equals(deviceId);
}
}
private final class TestMastershipService extends MastershipServiceAdapter {
@Override
public NodeId getMasterFor(DeviceId deviceId) {
......
......@@ -15,6 +15,21 @@
*/
package org.onosproject.provider.lldp.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.Ethernet;
......@@ -36,22 +51,6 @@ import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketService;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
// TODO: add 'fast discovery' mode: drop LLDPs in destination switch but listen for flow_removed messages
/**
......@@ -149,7 +148,7 @@ public class LinkDiscovery implements TimerTask {
}
}
boolean isMaster = mastershipService.getLocalRole(device.id()) == MASTER;
boolean isMaster = mastershipService.isLocalMaster(device.id());
if (newPort && isMaster) {
this.log.debug("Sending init probe to port {}@{}",
port.number().toLong(), device.id());
......@@ -258,8 +257,7 @@ public class LinkDiscovery implements TimerTask {
if (isStopped()) {
return;
}
boolean isMaster = mastershipService.getLocalRole(device.id()) == MASTER;
if (!isMaster) {
if (!mastershipService.isLocalMaster(device.id())) {
if (!isStopped()) {
// reschedule timer
timeout = Timer.getTimer().newTimeout(this, this.probeRate, MILLISECONDS);
......