Jonathan Hart

Added a trivial implementation of the LeadershipService.

Also renamed SdnIpLeadershipService to HazelcastLeadershipService, and moved
it into the distributed core bundle.

This allows applications which depend on LeadershipService to be used with
the trivial core.

Change-Id: Ie71a946d95653a4d2209afd3af0e7f23b5a4f818
......@@ -29,18 +29,15 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
// import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
import org.onlab.onos.sdnip.bgp.BgpSession;
import org.onlab.onos.sdnip.bgp.BgpSessionManager;
import org.onlab.onos.sdnip.config.SdnIpConfigReader;
import org.onlab.onos.store.hz.StoreService;
import org.slf4j.Logger;
/**
......@@ -70,13 +67,7 @@ public class SdnIp implements SdnIpService {
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
// @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SdnIpLeadershipService leadershipService;
protected LeadershipService leadershipService;
private IntentSynchronizer intentSynchronizer;
private SdnIpConfigReader config;
......@@ -114,10 +105,6 @@ public class SdnIp implements SdnIpService {
interfaceService, hostService);
router.start();
leadershipService = new SdnIpLeadershipService(clusterService,
storeService,
eventDispatcher);
leadershipService.start();
leadershipService.addListener(leadershipEventListener);
leadershipService.runForLeadership(appId.name());
......@@ -138,7 +125,6 @@ public class SdnIp implements SdnIpService {
leadershipService.withdraw(appId.name());
leadershipService.removeListener(leadershipEventListener);
leadershipService.stop();
log.info("SDN-IP Stopped");
}
......
......@@ -13,7 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.sdnip;
package org.onlab.onos.store.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.namedThreads;
import java.util.Map;
import java.util.concurrent.ExecutorService;
......@@ -21,6 +24,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
......@@ -34,18 +43,15 @@ import org.onlab.onos.store.hz.StoreService;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import static org.onlab.util.Tools.namedThreads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Distributed implementation of LeadershipService that is based on Hazelcast.
* <p>
......@@ -63,9 +69,11 @@ import static com.google.common.base.Preconditions.checkArgument;
* the current leader (e.g., for informational purpose).
* </p>
*/
public class SdnIpLeadershipService implements LeadershipService {
@Component(immediate = true)
@Service
public class HazelcastLeadershipService implements LeadershipService {
private static final Logger log =
LoggerFactory.getLogger(SdnIpLeadershipService.class);
LoggerFactory.getLogger(HazelcastLeadershipService.class);
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
......@@ -80,45 +88,31 @@ public class SdnIpLeadershipService implements LeadershipService {
private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
private ClusterService clusterService;
private StoreService storeService;
private EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Topic> topics = Maps.newConcurrentMap();
private ControllerNode localNode;
/**
* Constructor.
*
* @param clusterService the cluster service to use
* @param storeService the store service to use
* @param eventDispatcher the event dispacher to use
*/
SdnIpLeadershipService(ClusterService clusterService,
StoreService storeService,
EventDeliveryService eventDispatcher) {
this.clusterService = clusterService;
this.storeService = storeService;
this.eventDispatcher = eventDispatcher;
}
/**
* Starts operation.
*/
void start() {
@Activate
protected void activate() {
localNode = clusterService.getLocalNode();
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
log.info("SDN-IP Leadership Service started");
log.info("Hazelcast Leadership Service started");
}
/**
* Stops operation.
*/
void stop() {
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
for (Topic topic : topics.values()) {
......@@ -126,7 +120,7 @@ public class SdnIpLeadershipService implements LeadershipService {
}
topics.clear();
log.info("SDN-IP Leadership Service stopped");
log.info("Hazelcast Leadership Service stopped");
}
@Override
......
......@@ -43,7 +43,7 @@ import com.google.common.collect.Sets;
* Distributed implementation of LeadershipService that is based on the primitives exposed by
* LockService.
*/
@Component(immediate = true)
@Component(enabled = false)
@Service
public class LeadershipManager implements LeadershipService {
......
package org.onlab.onos.store.trivial.impl;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEvent.Type;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
/**
* A trivial implementation of the leadership service.
* <p></p>
* The service is not distributed, so it can assume there's a single leadership
* contender. This contender is always granted leadership whenever it asks.
*/
@Component(immediate = true)
@Service
public class SimpleLeadershipManager implements LeadershipService {
private Set<LeadershipEventListener> listeners = new CopyOnWriteArraySet<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private Map<String, Boolean> elections = new ConcurrentHashMap<>();
@Override
public ControllerNode getLeader(String path) {
return elections.get(path) ? clusterService.getLocalNode() : null;
}
@Override
public void runForLeadership(String path) {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
new Leadership(path, clusterService.getLocalNode(), 0)));
}
}
@Override
public void withdraw(String path) {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
new Leadership(path, clusterService.getLocalNode(), 0)));
}
}
@Override
public Map<String, Leadership> getLeaderBoard() {
throw new UnsupportedOperationException("I don't know what to do." +
" I wish you luck.");
}
@Override
public void addListener(LeadershipEventListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(LeadershipEventListener listener) {
listeners.remove(listener);
}
}