tom

Added notion of a general Store abstraction and wired it up in ClusterStore.

Showing 24 changed files with 231 additions and 19 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-foo</artifactId>
<packaging>bundle</packaging>
<description>ONOS application for miscellaneous experiments</description>
</project>
package org.onlab.onos.foo;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Playground app component.
*/
@Component(immediate = true)
public class FooComponent {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private ClusterEventListener clusterListener = new InnerClusterListener();
@Activate
public void activate() {
clusterService.addListener(clusterListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterService.removeListener(clusterListener);
log.info("Stopped");
}
private class InnerClusterListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
log.info("WOOOOT! {}", event);
}
}
}
......@@ -19,6 +19,7 @@
<modules>
<module>tvue</module>
<module>fwd</module>
<module>foo</module>
</modules>
<properties>
......
package org.onlab.onos.cluster;
import org.onlab.onos.store.Store;
import java.util.Set;
/**
* Manages inventory of controller cluster nodes; not intended for direct use.
*/
public interface ClusterStore {
public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate> {
/**
* Returns the local controller node.
......
package org.onlab.onos.cluster;
import org.onlab.onos.store.StoreDelegate;
/**
* Cluster store delegate abstraction.
*/
public interface ClusterStoreDelegate extends StoreDelegate<ClusterEvent> {
}
package org.onlab.onos.cluster;
import org.onlab.onos.store.StoreDelegate;
/**
* Mastership store delegate abstraction.
*/
public interface MastershipStoreDelegate extends StoreDelegate<MastershipEvent> {
}
package org.onlab.onos.net.device;
import org.onlab.onos.store.StoreDelegate;
/**
* Infrastructure device store delegate abstraction.
*/
public interface DeviceStoreDelegate extends StoreDelegate<DeviceEvent> {
}
package org.onlab.onos.store;
import org.onlab.onos.event.Event;
/**
* Base implementation of a store.
*/
public class AbstractStore<E extends Event, D extends StoreDelegate<E>>
implements Store<E, D> {
protected D delegate;
@Override
public void setDelegate(D delegate) {
this.delegate = delegate;
}
@Override
public D getDelegate() {
return delegate;
}
/**
* Notifies the delegate with the specified event.
*
* @param event event to delegate
*/
protected void notifyDelegate(E event) {
if (delegate != null) {
delegate.notify(event);
}
}
}
package org.onlab.onos.store;
import org.onlab.onos.event.Event;
/**
* Abstraction of a entity capable of storing and/or distributing information
* across a cluster.
*/
public interface Store<E extends Event, D extends StoreDelegate<E>> {
/**
* Sets the delegate on the store.
*
* @param delegate new store delegate
*/
void setDelegate(D delegate);
/**
* Get the current store delegate.
*
* @return store delegate
*/
D getDelegate();
}
package org.onlab.onos.store;
import org.onlab.onos.event.Event;
/**
* Entity associated with a store and capable of receiving notifications of
* events within the store.
*/
public interface StoreDelegate<E extends Event> {
void notify(E event);
}
/**
* Abstractions for creating and interacting with distributed stores.
*/
package org.onlab.onos.store;
\ No newline at end of file
......@@ -11,6 +11,7 @@ import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
......@@ -32,6 +33,8 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private final Logger log = getLogger(getClass());
private ClusterStoreDelegate delegate = new InternalStoreDelegate();
protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
listenerRegistry = new AbstractListenerRegistry<>();
......@@ -43,6 +46,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
log.info("Started");
}
......@@ -90,4 +94,13 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
public void removeListener(ClusterEventListener listener) {
listenerRegistry.removeListener(listener);
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements ClusterStoreDelegate {
@Override
public void notify(ClusterEvent event) {
checkNotNull(event, "Event cannot be null");
eventDispatcher.post(event);
}
}
}
......
......@@ -30,7 +30,7 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.StoreService;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.impl.StoreManager;
......
......@@ -12,7 +12,9 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
......@@ -26,6 +28,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
import static org.onlab.onos.cluster.ControllerNode.State;
/**
......@@ -33,14 +37,15 @@ import static org.onlab.onos.cluster.ControllerNode.State;
*/
@Component(immediate = true)
@Service
public class DistributedClusterStore extends AbstractDistributedStore
public class DistributedClusterStore
extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private IMap<byte[], byte[]> rawNodes;
private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
private String listenerId;
private final MembershipListener listener = new InnerMembershipListener();
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Activate
......@@ -106,11 +111,12 @@ public class DistributedClusterStore extends AbstractDistributedStore
}
// Adds a new node based on the specified member
private synchronized void addMember(Member member) {
private synchronized ControllerNode addMember(Member member) {
DefaultControllerNode node = node(member);
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
return node;
}
// Creates a controller node descriptor from the Hazelcast member.
......@@ -125,18 +131,20 @@ public class DistributedClusterStore extends AbstractDistributedStore
}
// Interceptor for membership events.
private class InnerMembershipListener implements MembershipListener {
private class InternalMembershipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
addMember(membershipEvent.getMember());
ControllerNode node = addMember(membershipEvent.getMember());
notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
log.info("Member {} removed", membershipEvent.getMember());
states.put(new NodeId(memberAddress(membershipEvent.getMember()).toString()),
State.INACTIVE);
NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
states.put(nodeId, State.INACTIVE);
notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
}
@Override
......
......@@ -13,6 +13,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
......@@ -31,7 +32,8 @@ import static com.google.common.cache.CacheBuilder.newBuilder;
*/
@Component(immediate = true)
@Service
public class DistributedMastershipStore extends AbstractDistributedStore
public class DistributedMastershipStore
extends AbstractDistributedStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
private IMap<byte[], byte[]> rawMasters;
......
package org.onlab.onos.store;
package org.onlab.onos.store.common;
import com.hazelcast.core.HazelcastInstance;
......
......@@ -2,4 +2,4 @@
* Common abstractions and facilities for implementing distributed store
* using Hazelcast.
*/
package org.onlab.onos.store;
package org.onlab.onos.store.common;
......
......@@ -21,6 +21,7 @@ import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
......@@ -48,7 +49,8 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component(immediate = true)
@Service
public class DistributedDeviceStore extends AbstractDistributedStore
public class DistributedDeviceStore
extends AbstractDistributedStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
......
......@@ -10,7 +10,10 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.store.StoreService;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.common.StoreService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -20,7 +23,8 @@ import static org.slf4j.LoggerFactory.getLogger;
* Abstraction of a distributed store based on Hazelcast.
*/
@Component(componentAbstract = true)
public abstract class AbstractDistributedStore {
public abstract class AbstractDistributedStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
protected final Logger log = getLogger(getClass());
......
......@@ -2,7 +2,7 @@ package org.onlab.onos.store.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.StoreService;
import org.onlab.onos.store.common.StoreService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
......
......@@ -21,7 +21,7 @@ import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.StoreService;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
......
......@@ -5,10 +5,13 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
......@@ -22,7 +25,9 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component(immediate = true)
@Service
public class SimpleClusterStore implements ClusterStore {
public class SimpleClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
public static final IpPrefix LOCALHOST = IpPrefix.valueOf("127.0.0.1");
......
......@@ -104,4 +104,10 @@
<bundle>mvn:org.onlab.onos/onos-app-fwd/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-foo" version="1.0.0"
description="ONOS sample playground application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
</feature>
</features>
......
......@@ -51,7 +51,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution file to load ONOS features
perl -pi.old -e 's|^(featuresBoot=.*)|\1,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd|' \
perl -pi.old -e 's|^(featuresBoot=.*)|\1,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo|' \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution with ONOS branding bundle
......