Ayaka Koshibe

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Conflicts:
	core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
	core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
	core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java

Change-Id: I6a8b756fc20968e18ea3fd145e155d6282cea945
Showing 117 changed files with 2902 additions and 159 deletions
......@@ -28,10 +28,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.livetribe.slp</groupId>
<artifactId>livetribe-slp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
......
......@@ -233,7 +233,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......
livetribe.slp.da.expired.services.purge.period=60
livetribe.slp.sa.client.connect.address=127.0.0.1
livetribe.slp.sa.client.factory=org.livetribe.slp.sa.StandardServiceAgentClient$Factory
livetribe.slp.sa.factory=org.livetribe.slp.sa.StandardServiceAgent$Factory
livetribe.slp.sa.service.renewal.enabled=true
livetribe.slp.sa.unicast.prefer.tcp=false
livetribe.slp.tcp.connector.factory=org.livetribe.slp.spi.net.SocketTCPConnector$Factory
livetribe.slp.tcp.connector.server.factory=org.livetribe.slp.spi.net.SocketTCPConnectorServer$Factory
livetribe.slp.tcp.message.max.length=4096
livetribe.slp.tcp.read.timeout=300000
livetribe.slp.ua.client.factory=org.livetribe.slp.ua.StandardUserAgentClient$Factory
livetribe.slp.ua.factory=org.livetribe.slp.ua.StandardUserAgent$Factory
livetribe.slp.ua.unicast.prefer.tcp=false
livetribe.slp.udp.connector.factory=org.livetribe.slp.spi.net.SocketUDPConnector$Factory
livetribe.slp.udp.connector.server.factory=org.livetribe.slp.spi.net.SocketUDPConnectorServer$Factory
net.slp.DAAddresses=
net.slp.DAAttributes=
net.slp.DAHeartBeat=10800
net.slp.MTU=1400
net.slp.SAAttributes=
net.slp.broadcastAddress=255.255.255.255
net.slp.datagramTimeouts=150,250,400
net.slp.interfaces=0.0.0.0
net.slp.isBroadcastOnly=false
net.slp.locale=en
net.slp.multicastAddress=239.255.255.253
net.slp.multicastMaximumWait=15000
net.slp.multicastTTL=255
net.slp.multicastTimeouts=150,250,400,600,1000
net.slp.notificationPort=1847
net.slp.port=427
net.slp.useScopes=default
org.onlab.cluster.name = TV-ONOS
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Adds a new controller cluster node.
*/
@Command(scope = "onos", name = "add-node",
description = "Adds a new controller cluster node")
public class NodeAddCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
@Argument(index = 1, name = "ip", description = "Node IP address",
required = true, multiValued = false)
String ip = null;
@Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
required = false, multiValued = false)
int tcpPort = 9876;
@Override
protected void execute() {
ClusterAdminService service = get(ClusterAdminService.class);
service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort);
}
}
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.NodeId;
/**
* Removes a controller cluster node.
*/
@Command(scope = "onos", name = "remove-node",
description = "Removes a new controller cluster node")
public class NodeRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
@Override
protected void execute() {
ClusterAdminService service = get(ClusterAdminService.class);
service.removeNode(new NodeId(nodeId));
}
}
......@@ -17,7 +17,7 @@ import static com.google.common.collect.Lists.newArrayList;
public class NodesListCommand extends AbstractShellCommand {
private static final String FMT =
"id=%s, ip=%s, state=%s %s";
"id=%s, address=%s:%s, state=%s %s";
@Override
protected void execute() {
......@@ -26,7 +26,7 @@ public class NodesListCommand extends AbstractShellCommand {
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
ControllerNode self = service.getLocalNode();
for (ControllerNode node : nodes) {
print(FMT, node.id(), node.ip(),
print(FMT, node.id(), node.ip(), node.tcpPort(),
service.getState(node.id()),
node.equals(self) ? "*" : "");
}
......
......@@ -5,6 +5,12 @@
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodeAddCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodeRemoveCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.MastersListCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
......
package org.onlab.onos.cluster;
import org.onlab.packet.IpPrefix;
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the cluster node list.
*
* @param nodeId controller node identifier
......
package org.onlab.onos.cluster;
import org.onlab.onos.store.Store;
import org.onlab.packet.IpPrefix;
import java.util.Set;
......@@ -40,6 +41,16 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
ControllerNode.State getState(NodeId nodeId);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the inventory of cluster nodes.
*
* @param nodeId controller instance identifier
......
......@@ -35,4 +35,12 @@ public interface ControllerNode {
*/
IpPrefix ip();
/**
* Returns the TCP port on which the node listens for connections.
*
* @return TCP port
*/
int tcpPort();
}
......
......@@ -11,13 +11,17 @@ import static com.google.common.base.MoreObjects.toStringHelper;
*/
public class DefaultControllerNode implements ControllerNode {
private static final int DEFAULT_PORT = 9876;
private final NodeId id;
private final IpPrefix ip;
private final int tcpPort;
// For serialization
private DefaultControllerNode() {
this.id = null;
this.ip = null;
this.tcpPort = 0;
}
/**
......@@ -27,8 +31,19 @@ public class DefaultControllerNode implements ControllerNode {
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip) {
this(id, ip, DEFAULT_PORT);
}
/**
* Creates a new instance with the specified id and IP address and TCP port.
*
* @param id instance identifier
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip, int tcpPort) {
this.id = id;
this.ip = ip;
this.tcpPort = tcpPort;
}
@Override
......@@ -42,6 +57,11 @@ public class DefaultControllerNode implements ControllerNode {
}
@Override
public int tcpPort() {
return tcpPort;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
......@@ -60,7 +80,8 @@ public class DefaultControllerNode implements ControllerNode {
@Override
public String toString() {
return toStringHelper(this).add("id", id).add("ip", ip).toString();
return toStringHelper(this).add("id", id)
.add("ip", ip).add("tcpPort", tcpPort).toString();
}
}
......
package org.onlab.onos.net.proxyarp;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
/**
* Service for processing arp requests on behalf of applications.
*/
public interface ProxyArpService {
/**
* Returns whether this particular ip address is known to the system.
*
* @param addr
* a ip address
* @return true if know, false otherwise
*/
boolean known(IpPrefix addr);
/**
* Sends a reply for a given request. If the host is not known then the arp
* will be flooded at all edge ports.
*
* @param request
* an arp request
*/
void reply(Ethernet request);
}
/**
* Base abstractions related to the proxy arp service.
*/
package org.onlab.onos.net.proxyarp;
package org.onlab.onos.store;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
// TODO: Consider renaming to DeviceClockService?
/**
* Interface for a logical clock service that vends per device timestamps.
*/
public interface ClockService {
/**
* Returns a new timestamp for the specified deviceId.
* @param deviceId device identifier.
* @return timestamp.
*/
public Timestamp getTimestamp(DeviceId deviceId);
// TODO: Should this be here or separate as Admin service, etc.?
/**
* Updates the mastership term for the specified deviceId.
* @param deviceId device identifier.
* @param term mastership term.
*/
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term);
}
......@@ -40,13 +40,14 @@
Currently required for DistributedDeviceManagerTest. -->
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<artifactId>onos-core-hz-net</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<!-- FIXME: should be somewhere else -->
<artifactId>onos-core-hz-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
......
......@@ -16,10 +16,12 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -81,6 +83,14 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
checkArgument(tcpPort > 5000, "TCP port must be > 5000");
return store.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
store.removeNode(nodeId);
......
package org.onlab.onos.cluster.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -14,6 +19,7 @@ import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
......@@ -23,15 +29,10 @@ import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class MastershipManager
implements MastershipService, MastershipAdminService {
implements MastershipService, MastershipAdminService {
private static final String NODE_ID_NULL = "Node ID cannot be null";
private static final String DEVICE_ID_NULL = "Device ID cannot be null";
......@@ -42,6 +43,8 @@ public class MastershipManager
protected final AbstractListenerRegistry<MastershipEvent, MastershipListener>
listenerRegistry = new AbstractListenerRegistry<>();
private final MastershipStoreDelegate delegate = new InternalDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipStore store;
......@@ -57,6 +60,7 @@ public class MastershipManager
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
......@@ -64,6 +68,7 @@ public class MastershipManager
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
......@@ -188,4 +193,15 @@ public class MastershipManager
}
}
public class InternalDelegate implements MastershipStoreDelegate {
@Override
public void notify(MastershipEvent event) {
log.info("dispatching mastership event {}", event);
eventDispatcher.post(event);
}
}
}
......
package org.onlab.onos.net.device.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -11,6 +17,7 @@ import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
......@@ -31,14 +38,9 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.onlab.onos.store.ClockService;
import org.slf4j.Logger;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides implementation of the device SB &amp; NB APIs.
*/
......@@ -56,10 +58,10 @@ public class DeviceManager
private final Logger log = getLogger(getClass());
protected final AbstractListenerRegistry<DeviceEvent, DeviceListener>
listenerRegistry = new AbstractListenerRegistry<>();
protected final AbstractListenerRegistry<DeviceEvent, DeviceListener> listenerRegistry =
new AbstractListenerRegistry<>();
private DeviceStoreDelegate delegate = new InternalStoreDelegate();
private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
private final MastershipListener mastershipListener = new InternalMastershipListener();
......@@ -77,6 +79,9 @@ public class DeviceManager
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Activate
public void activate() {
store.setDelegate(delegate);
......@@ -168,7 +173,8 @@ public class DeviceManager
}
@Override
protected DeviceProviderService createProviderService(DeviceProvider provider) {
protected DeviceProviderService createProviderService(
DeviceProvider provider) {
return new InternalDeviceProviderService(provider);
}
......@@ -182,14 +188,16 @@ public class DeviceManager
}
@Override
public void deviceConnected(DeviceId deviceId, DeviceDescription deviceDescription) {
public void deviceConnected(DeviceId deviceId,
DeviceDescription deviceDescription) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
// If there was a change of any kind, trigger role selection process.
// If there was a change of any kind, trigger role selection
// process.
if (event != null) {
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
......@@ -214,25 +222,30 @@ public class DeviceManager
}
@Override
public void updatePorts(DeviceId deviceId, List<PortDescription> portDescriptions) {
public void updatePorts(DeviceId deviceId,
List<PortDescription> portDescriptions) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(portDescriptions, "Port descriptions list cannot be null");
checkNotNull(portDescriptions,
"Port descriptions list cannot be null");
checkValidity();
List<DeviceEvent> events = store.updatePorts(deviceId, portDescriptions);
List<DeviceEvent> events = store.updatePorts(deviceId,
portDescriptions);
for (DeviceEvent event : events) {
post(event);
}
}
@Override
public void portStatusChanged(DeviceId deviceId, PortDescription portDescription) {
public void portStatusChanged(DeviceId deviceId,
PortDescription portDescription) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(portDescription, PORT_DESCRIPTION_NULL);
checkValidity();
DeviceEvent event = store.updatePortStatus(deviceId, portDescription);
DeviceEvent event = store.updatePortStatus(deviceId,
portDescription);
if (event != null) {
log.info("Device {} port {} status changed", deviceId,
event.port().number());
log.info("Device {} port {} status changed", deviceId, event
.port().number());
post(event);
}
}
......@@ -240,8 +253,8 @@ public class DeviceManager
@Override
public void unableToAssertRole(DeviceId deviceId, MastershipRole role) {
// FIXME: implement response to this notification
log.warn("Failed to assert role [{}] onto Device {}",
role, deviceId);
log.warn("Failed to assert role [{}] onto Device {}", role,
deviceId);
}
}
......@@ -253,18 +266,24 @@ public class DeviceManager
}
// Intercepts mastership events
private class InternalMastershipListener implements MastershipListener {
private class InternalMastershipListener
implements MastershipListener {
@Override
public void event(MastershipEvent event) {
// FIXME: for now we're taking action only on becoming master
if (event.master().equals(clusterService.getLocalNode().id())) {
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
clockService.setMastershipTerm(event.subject(), term);
applyRole(event.subject(), MastershipRole.MASTER);
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
}
}
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements DeviceStoreDelegate {
private class InternalStoreDelegate
implements DeviceStoreDelegate {
@Override
public void notify(DeviceEvent event) {
post(event);
......
package org.onlab.onos.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
public class ProxyArpManager implements ProxyArpService {
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet request) {
checkNotNull(REQUEST_NULL, request);
checkArgument(request.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) request.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(request.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host h = null;
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
h = host;
break;
}
}
if (h == null) {
flood(request);
return;
}
Ethernet arpReply = buildArpReply(h, request);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
private void flood(Ethernet request) {
// TODO: flood on all edge ports.
}
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
}
/**
* Core subsystem for responding to arp requests.
*/
package org.onlab.onos.proxyarp.impl;
......@@ -32,9 +32,9 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.impl.StoreManager;
import org.onlab.onos.store.impl.TestStoreManager;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -163,7 +163,7 @@ public class DistributedDeviceManagerTest {
public void deviceDisconnected() {
connectDevice(DID1, SW1);
connectDevice(DID2, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
assertTrue("device should be available", service.isAvailable(DID1));
// Disconnect
......@@ -182,10 +182,10 @@ public class DistributedDeviceManagerTest {
@Test
public void deviceUpdated() {
connectDevice(DID1, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED, DEVICE_UPDATED);
validateEvents(DEVICE_UPDATED);
}
@Test
......@@ -202,7 +202,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P2, true));
pds.add(new DefaultPortDescription(P3, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
pds.clear();
pds.add(new DefaultPortDescription(P1, false));
......@@ -218,7 +218,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
validateEvents(PORT_UPDATED);
......@@ -233,7 +233,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
assertEquals("wrong port count", 2, service.getPorts(DID1).size());
Port port = service.getPort(DID1, P1);
......@@ -247,7 +247,7 @@ public class DistributedDeviceManagerTest {
connectDevice(DID2, SW2);
assertEquals("incorrect device count", 2, service.getDeviceCount());
admin.removeDevice(DID1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED);
assertNull("device should not be found", service.getDevice(DID1));
assertNotNull("device should be found", service.getDevice(DID2));
assertEquals("incorrect device count", 1, service.getDeviceCount());
......
......@@ -20,7 +20,6 @@
<module>api</module>
<module>net</module>
<module>store</module>
<module>trivial</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-dist</artifactId>
<packaging>bundle</packaging>
<description>ONOS Gossip based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package org.onlab.onos.store.cluster.impl;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
public class ClusterDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the cluster definition file.
*
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Returns set of the controller nodes, including self.
*
* @return set of controller nodes
*/
public Set<DefaultControllerNode> read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
while (it.hasNext()) {
ObjectNode nodeDef = (ObjectNode) it.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpPrefix.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
return nodes;
}
/**
* Writes the given set of the controller nodes.
*
* @param nodes set of controller nodes
*/
public void write(Set<DefaultControllerNode> nodes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : nodes) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
clusterNodeDef);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param data message data bytes
*/
public TLVMessage(int type, byte[] data) {
this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data bytes.
*
* @return message data
*/
public byte[] data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafecafefeedL;
private DefaultControllerNode node;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
}
/**
* Returns the node with which this stream is associated.
*
* @return controller node
*/
DefaultControllerNode node() {
return node;
}
/**
* Sets the node with which this stream is affiliated.
*
* @param node controller node
*/
void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
length = buffer.getInt();
// TODO: add deserialization hook here
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return new TLVMessage(type, data);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
buffer.put(message.data());
}
}
package org.onlab.onos.store.device.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.impl.OnosTimestamp;
import org.slf4j.Logger;
@Component(immediate = true)
@Service
public class OnosClockService implements ClockService {
private final Logger log = getLogger(getClass());
// TODO: Implement per device ticker that is reset to 0 at the beginning of a new term.
private final AtomicInteger ticker = new AtomicInteger(0);
private ConcurrentMap<DeviceId, MastershipTerm> deviceMastershipTerms = new ConcurrentHashMap<>();
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
MastershipTerm term = deviceMastershipTerms.get(deviceId);
if (term == null) {
throw new IllegalStateException("Requesting timestamp for a deviceId without mastership");
}
return new OnosTimestamp(term.termNumber(), ticker.incrementAndGet());
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
deviceMastershipTerms.put(deviceId, term);
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.store.Timestamp;
/**
* Wrapper class for a entity that is versioned
* and can either be up or down.
*
* @param <T> type of the value.
*/
public class VersionedValue<T> {
private final T entity;
private final Timestamp timestamp;
private final boolean isUp;
public VersionedValue(T entity, boolean isUp, Timestamp timestamp) {
this.entity = entity;
this.isUp = isUp;
this.timestamp = timestamp;
}
/**
* Returns the value.
* @return value.
*/
public T entity() {
return entity;
}
/**
* Tells whether the entity is up or down.
* @return true if up, false otherwise.
*/
public boolean isUp() {
return isUp;
}
/**
* Returns the timestamp (version) associated with this entity.
* @return timestamp.
*/
public Timestamp timestamp() {
return timestamp;
}
}
/**
* Implementation of device store using distributed structures.
*/
package org.onlab.onos.store.device.impl;
package org.onlab.onos.store.flow.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Collections;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, FlowRule> flowEntries =
ArrayListMultimap.<DeviceId, FlowRule>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public synchronized FlowRule getFlowRule(FlowRule rule) {
for (FlowRule f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
}
return null;
}
@Override
public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
Collection<FlowRule> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public synchronized void storeFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
flowEntriesById.put(rule.appId(), f);
}
}
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
DeviceId did = f.deviceId();
/*
* find the rule and mark it for deletion.
* Ultimately a flow removed will come remove it.
*/
if (flowEntries.containsEntry(did, f)) {
//synchronized (flowEntries) {
flowEntries.remove(did, f);
flowEntries.put(did, f);
flowEntriesById.remove(rule.appId(), rule);
//}
}
}
@Override
public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
if (flowEntries.containsEntry(did, rule)) {
//synchronized (flowEntries) {
// Multimaps support duplicates so we have to remove our rule
// and replace it with the current version.
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
//}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
return new FlowRuleEvent(RULE_ADDED, rule);
}
@Override
public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
//synchronized (this) {
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
//}
}
}
package org.onlab.onos.store.host.impl;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.HostStoreDelegate;
import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Manages inventory of end-station hosts using trivial in-memory
* implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedHostStore
extends AbstractStore<HostEvent, HostStoreDelegate>
implements HostStore {
private final Logger log = getLogger(getClass());
// Host inventory
private final Map<HostId, Host> hosts = new ConcurrentHashMap<>();
// Hosts tracked by their location
private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
private final Map<ConnectPoint, PortAddresses> portAddresses =
new ConcurrentHashMap<>();
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
HostDescription hostDescription) {
Host host = hosts.get(hostId);
if (host == null) {
return createHost(providerId, hostId, hostDescription);
}
return updateHost(providerId, host, hostDescription);
}
// creates a new host and sends HOST_ADDED
private HostEvent createHost(ProviderId providerId, HostId hostId,
HostDescription descr) {
DefaultHost newhost = new DefaultHost(providerId, hostId,
descr.hwAddress(),
descr.vlan(),
descr.location(),
descr.ipAddresses());
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
}
return new HostEvent(HOST_ADDED, newhost);
}
// checks for type of update to host, sends appropriate event
private HostEvent updateHost(ProviderId providerId, Host host,
HostDescription descr) {
DefaultHost updated;
HostEvent event;
if (!host.location().equals(descr.location())) {
updated = new DefaultHost(providerId, host.id(),
host.mac(),
host.vlan(),
descr.location(),
host.ipAddresses());
event = new HostEvent(HOST_MOVED, updated);
} else if (!(host.ipAddresses().equals(descr.ipAddresses()))) {
updated = new DefaultHost(providerId, host.id(),
host.mac(),
host.vlan(),
descr.location(),
descr.ipAddresses());
event = new HostEvent(HOST_UPDATED, updated);
} else {
return null;
}
synchronized (this) {
hosts.put(host.id(), updated);
locations.remove(host.location(), host);
locations.put(updated.location(), updated);
}
return event;
}
@Override
public HostEvent removeHost(HostId hostId) {
synchronized (this) {
Host host = hosts.remove(hostId);
if (host != null) {
locations.remove((host.location()), host);
return new HostEvent(HOST_REMOVED, host);
}
return null;
}
}
@Override
public int getHostCount() {
return hosts.size();
}
@Override
public Iterable<Host> getHosts() {
return Collections.unmodifiableSet(new HashSet<>(hosts.values()));
}
@Override
public Host getHost(HostId hostId) {
return hosts.get(hostId);
}
@Override
public Set<Host> getHosts(VlanId vlanId) {
Set<Host> vlanset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.vlan().equals(vlanId)) {
vlanset.add(h);
}
}
return vlanset;
}
@Override
public Set<Host> getHosts(MacAddress mac) {
Set<Host> macset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.mac().equals(mac)) {
macset.add(h);
}
}
return macset;
}
@Override
public Set<Host> getHosts(IpPrefix ip) {
Set<Host> ipset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.ipAddresses().contains(ip)) {
ipset.add(h);
}
}
return ipset;
}
@Override
public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
return ImmutableSet.copyOf(locations.get(connectPoint));
}
@Override
public Set<Host> getConnectedHosts(DeviceId deviceId) {
Set<Host> hostset = new HashSet<>();
for (ConnectPoint p : locations.keySet()) {
if (p.deviceId().equals(deviceId)) {
hostset.addAll(locations.get(p));
}
}
return hostset;
}
@Override
public void updateAddressBindings(PortAddresses addresses) {
synchronized (portAddresses) {
PortAddresses existing = portAddresses.get(addresses.connectPoint());
if (existing == null) {
portAddresses.put(addresses.connectPoint(), addresses);
} else {
Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
.immutableCopy();
MacAddress newMac = (addresses.mac() == null) ? existing.mac()
: addresses.mac();
PortAddresses newAddresses =
new PortAddresses(addresses.connectPoint(), union, newMac);
portAddresses.put(newAddresses.connectPoint(), newAddresses);
}
}
}
@Override
public void removeAddressBindings(PortAddresses addresses) {
synchronized (portAddresses) {
PortAddresses existing = portAddresses.get(addresses.connectPoint());
if (existing != null) {
Set<IpPrefix> difference =
Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
// If they removed the existing mac, set the new mac to null.
// Otherwise, keep the existing mac.
MacAddress newMac = existing.mac();
if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
newMac = null;
}
PortAddresses newAddresses =
new PortAddresses(addresses.connectPoint(), difference, newMac);
portAddresses.put(newAddresses.connectPoint(), newAddresses);
}
}
}
@Override
public void clearAddressBindings(ConnectPoint connectPoint) {
synchronized (portAddresses) {
portAddresses.remove(connectPoint);
}
}
@Override
public Set<PortAddresses> getAddressBindings() {
synchronized (portAddresses) {
return new HashSet<>(portAddresses.values());
}
}
@Override
public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
PortAddresses addresses;
synchronized (portAddresses) {
addresses = portAddresses.get(connectPoint);
}
if (addresses == null) {
addresses = new PortAddresses(connectPoint, null, null);
}
return addresses;
}
}
package org.onlab.onos.store.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
import org.onlab.onos.net.ElementId;
import org.onlab.onos.store.Timestamp;
import com.google.common.base.MoreObjects;
......@@ -14,22 +12,20 @@ import com.google.common.collect.ComparisonChain;
// If it is store specific, implement serializable interfaces?
/**
* Default implementation of Timestamp.
* TODO: Better documentation.
*/
public final class OnosTimestamp implements Timestamp {
private final ElementId id;
private final int termNumber;
private final int sequenceNumber;
/**
* Default version tuple.
*
* @param id identifier of the element
* @param termNumber the mastership termNumber
* @param sequenceNumber the sequenceNumber number within the termNumber
*/
public OnosTimestamp(ElementId id, int termNumber, int sequenceNumber) {
this.id = checkNotNull(id);
public OnosTimestamp(int termNumber, int sequenceNumber) {
this.termNumber = termNumber;
this.sequenceNumber = sequenceNumber;
}
......@@ -38,9 +34,6 @@ public final class OnosTimestamp implements Timestamp {
public int compareTo(Timestamp o) {
checkArgument(o instanceof OnosTimestamp, "Must be OnosTimestamp", o);
OnosTimestamp that = (OnosTimestamp) o;
checkArgument(this.id.equals(that.id),
"Cannot compare version for different element this:%s, that:%s",
this, that);
return ComparisonChain.start()
.compare(this.termNumber, that.termNumber)
......@@ -50,7 +43,7 @@ public final class OnosTimestamp implements Timestamp {
@Override
public int hashCode() {
return Objects.hash(id, termNumber, sequenceNumber);
return Objects.hash(termNumber, sequenceNumber);
}
@Override
......@@ -62,30 +55,19 @@ public final class OnosTimestamp implements Timestamp {
return false;
}
OnosTimestamp that = (OnosTimestamp) obj;
return Objects.equals(this.id, that.id) &&
Objects.equals(this.termNumber, that.termNumber) &&
return Objects.equals(this.termNumber, that.termNumber) &&
Objects.equals(this.sequenceNumber, that.sequenceNumber);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", id)
.add("termNumber", termNumber)
.add("sequenceNumber", sequenceNumber)
.toString();
}
/**
* Returns the element.
*
* @return element identifier
*/
public ElementId id() {
return id;
}
/**
* Returns the termNumber.
*
* @return termNumber
......
package org.onlab.onos.store.link.impl;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.device.impl.VersionedValue;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.ImmutableSet.Builder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Manages inventory of infrastructure links using a protocol that takes into consideration
* the order in which events occur.
*/
// FIXME: This does not yet implement the full protocol.
// The full protocol requires the sender of LLDP message to include the
// version information of src device/port and the receiver to
// take that into account when figuring out if a more recent src
// device/port down event renders the link discovery obsolete.
@Component(immediate = true)
@Service
public class OnosDistributedLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
// Link inventory
private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
// TODO synchronize?
// Egress and ingress link sets
private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Activate
public void activate() {
links = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
Builder<Link> builder = ImmutableSet.builder();
synchronized (this) {
for (VersionedValue<Link> link : links.values()) {
builder.add(link.entity());
}
return builder.build();
}
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
Set<Link> rawEgressLinks = new HashSet<>();
for (VersionedValue<Link> link : egressLinks) {
rawEgressLinks.add(link.entity());
}
return rawEgressLinks;
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
Set<Link> rawIngressLinks = new HashSet<>();
for (VersionedValue<Link> link : ingressLinks) {
rawIngressLinks.add(link.entity());
}
return rawIngressLinks;
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
VersionedValue<Link> link = links.get(new LinkKey(src, dst));
checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
return link.entity();
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egressLinks = new HashSet<>();
for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
if (link.entity().src().equals(src)) {
egressLinks.add(link.entity());
}
}
return egressLinks;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingressLinks = new HashSet<>();
for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
if (link.entity().dst().equals(dst)) {
ingressLinks.add(link.entity());
}
}
return ingressLinks;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
VersionedValue<Link> link = links.get(key);
if (link == null) {
return createLink(providerId, key, linkDescription, newTimestamp);
}
checkState(newTimestamp.compareTo(link.timestamp()) > 0,
"Existing Link has a timestamp in the future!");
return updateLink(providerId, link, key, linkDescription, newTimestamp);
}
// Creates and stores the link and returns the appropriate event.
private LinkEvent createLink(ProviderId providerId, LinkKey key,
LinkDescription linkDescription, Timestamp timestamp) {
VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
linkDescription.type()), true, timestamp);
synchronized (this) {
links.put(key, link);
addNewLink(link, timestamp);
}
// FIXME: notify peers.
return new LinkEvent(LINK_ADDED, link.entity());
}
// update Egress and ingress link sets
private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
Link rawLink = link.entity();
synchronized (this) {
srcLinks.put(rawLink.src().deviceId(), link);
dstLinks.put(rawLink.dst().deviceId(), link);
}
}
// Updates, if necessary the specified link and returns the appropriate event.
private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
// FIXME confirm Link update condition is OK
if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
synchronized (this) {
VersionedValue<Link> updatedLink = new VersionedValue<Link>(
new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
linkDescription.type()), true, timestamp);
links.replace(key, existingLink, updatedLink);
replaceLink(existingLink, updatedLink);
// FIXME: notify peers.
return new LinkEvent(LINK_UPDATED, updatedLink.entity());
}
}
return null;
}
// update Egress and ingress link sets
private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
synchronized (this) {
srcLinks.remove(current.entity().src().deviceId(), current);
dstLinks.remove(current.entity().dst().deviceId(), current);
srcLinks.put(current.entity().src().deviceId(), updated);
dstLinks.put(current.entity().dst().deviceId(), updated);
}
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
LinkKey key = new LinkKey(src, dst);
VersionedValue<Link> link = links.remove(key);
if (link != null) {
removeLink(link);
// notify peers
return new LinkEvent(LINK_REMOVED, link.entity());
}
return null;
}
}
// update Egress and ingress link sets
private void removeLink(VersionedValue<Link> link) {
synchronized (this) {
srcLinks.remove(link.entity().src().deviceId(), link);
dstLinks.remove(link.entity().dst().deviceId(), link);
}
}
}
package org.onlab.onos.store.serializers;
import org.onlab.onos.net.ElementId;
import org.onlab.onos.store.impl.OnosTimestamp;
import com.esotericsoftware.kryo.Kryo;
......@@ -20,18 +19,17 @@ public class OnosTimestampSerializer extends Serializer<OnosTimestamp> {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, OnosTimestamp object) {
kryo.writeClassAndObject(output, object.id());
output.writeInt(object.termNumber());
output.writeInt(object.sequenceNumber());
}
@Override
public OnosTimestamp read(Kryo kryo, Input input, Class<OnosTimestamp> type) {
ElementId id = (ElementId) kryo.readClassAndObject(input);
final int term = input.readInt();
final int sequence = input.readInt();
return new OnosTimestamp(id, term, sequence);
return new OnosTimestamp(term, sequence);
}
}
......
package org.onlab.onos.store.topology.impl;
import org.onlab.graph.AdjacencyListsGraph;
import org.onlab.onos.net.topology.TopologyEdge;
import org.onlab.onos.net.topology.TopologyGraph;
import org.onlab.onos.net.topology.TopologyVertex;
import java.util.Set;
/**
* Default implementation of an immutable topology graph based on a generic
* implementation of adjacency lists graph.
*/
public class DefaultTopologyGraph
extends AdjacencyListsGraph<TopologyVertex, TopologyEdge>
implements TopologyGraph {
/**
* Creates a topology graph comprising of the specified vertexes and edges.
*
* @param vertexes set of graph vertexes
* @param edges set of graph edges
*/
public DefaultTopologyGraph(Set<TopologyVertex> vertexes, Set<TopologyEdge> edges) {
super(vertexes, edges);
}
}
package org.onlab.onos.store.topology.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.Event;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.ClusterId;
import org.onlab.onos.net.topology.GraphDescription;
import org.onlab.onos.net.topology.LinkWeight;
import org.onlab.onos.net.topology.Topology;
import org.onlab.onos.net.topology.TopologyCluster;
import org.onlab.onos.net.topology.TopologyEvent;
import org.onlab.onos.net.topology.TopologyGraph;
import org.onlab.onos.net.topology.TopologyStore;
import org.onlab.onos.net.topology.TopologyStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
/**
* Manages inventory of topology snapshots using trivial in-memory
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedTopologyStore
extends AbstractStore<TopologyEvent, TopologyStoreDelegate>
implements TopologyStore {
private final Logger log = getLogger(getClass());
private volatile DefaultTopology current;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public Topology currentTopology() {
return current;
}
@Override
public boolean isLatest(Topology topology) {
// Topology is current only if it is the same as our current topology
return topology == current;
}
@Override
public TopologyGraph getGraph(Topology topology) {
return defaultTopology(topology).getGraph();
}
@Override
public Set<TopologyCluster> getClusters(Topology topology) {
return defaultTopology(topology).getClusters();
}
@Override
public TopologyCluster getCluster(Topology topology, ClusterId clusterId) {
return defaultTopology(topology).getCluster(clusterId);
}
@Override
public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) {
return defaultTopology(topology).getClusterDevices(cluster);
}
@Override
public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) {
return defaultTopology(topology).getClusterLinks(cluster);
}
@Override
public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) {
return defaultTopology(topology).getPaths(src, dst);
}
@Override
public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst,
LinkWeight weight) {
return defaultTopology(topology).getPaths(src, dst, weight);
}
@Override
public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
return defaultTopology(topology).isInfrastructure(connectPoint);
}
@Override
public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) {
return defaultTopology(topology).isBroadcastPoint(connectPoint);
}
@Override
public TopologyEvent updateTopology(ProviderId providerId,
GraphDescription graphDescription,
List<Event> reasons) {
// First off, make sure that what we're given is indeed newer than
// what we already have.
if (current != null && graphDescription.timestamp() < current.time()) {
return null;
}
// Have the default topology construct self from the description data.
DefaultTopology newTopology =
new DefaultTopology(providerId, graphDescription);
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
}
}
// Validates the specified topology and returns it as a default
private DefaultTopology defaultTopology(Topology topology) {
if (topology instanceof DefaultTopology) {
return (DefaultTopology) topology;
}
throw new IllegalArgumentException("Topology class " + topology.getClass() +
" not supported");
}
}
package org.onlab.onos.store.topology.impl;
import org.onlab.onos.net.DeviceId;
import java.util.Objects;
/**
* Key for filing pre-computed paths between source and destination devices.
*/
class PathKey {
private final DeviceId src;
private final DeviceId dst;
/**
* Creates a path key from the given source/dest pair.
* @param src source device
* @param dst destination device
*/
PathKey(DeviceId src, DeviceId dst) {
this.src = src;
this.dst = dst;
}
@Override
public int hashCode() {
return Objects.hash(src, dst);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof PathKey) {
final PathKey other = (PathKey) obj;
return Objects.equals(this.src, other.src) && Objects.equals(this.dst, other.dst);
}
return false;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz-cluster</artifactId>
<packaging>bundle</packaging>
<description>ONOS Hazelcast based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
......@@ -8,6 +8,7 @@ import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -18,9 +19,9 @@ import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import org.onlab.packet.IpPrefix;
import java.util.Map;
......@@ -38,7 +39,7 @@ import static org.onlab.onos.cluster.ControllerNode.State;
@Component(immediate = true)
@Service
public class DistributedClusterStore
extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private IMap<byte[], byte[]> rawNodes;
......@@ -57,7 +58,7 @@ public class DistributedClusterStore
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(storeService, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
loadClusterNodes();
......@@ -67,7 +68,7 @@ public class DistributedClusterStore
// Loads the initial set of cluster nodes
private void loadClusterNodes() {
for (Member member : theInstance.getCluster().getMembers()) {
addMember(member);
addNode(node(member));
}
}
......@@ -103,6 +104,11 @@ public class DistributedClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
@Override
public void removeNode(NodeId nodeId) {
synchronized (this) {
rawNodes.remove(serialize(nodeId));
......@@ -111,8 +117,7 @@ public class DistributedClusterStore
}
// Adds a new node based on the specified member
private synchronized ControllerNode addMember(Member member) {
DefaultControllerNode node = node(member);
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
......@@ -135,7 +140,7 @@ public class DistributedClusterStore
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
ControllerNode node = addMember(membershipEvent.getMember());
ControllerNode node = addNode(node(membershipEvent.getMember()));
notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
......
package org.onlab.onos.store.cluster.impl;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.IMap;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -19,15 +21,14 @@ import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static com.google.common.cache.CacheBuilder.newBuilder;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.IMap;
/**
* Distributed implementation of the cluster nodes store.
......@@ -35,8 +36,8 @@ import static com.google.common.cache.CacheBuilder.newBuilder;
@Component(immediate = true)
@Service
public class DistributedMastershipStore
extends AbstractDistributedStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
private IMap<byte[], byte[]> rawMasters;
private LoadingCache<DeviceId, Optional<NodeId>> masters;
......@@ -53,7 +54,7 @@ public class DistributedMastershipStore
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(storeService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteEventHandler<>(masters), true);
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
loadMasters();
......@@ -128,4 +129,25 @@ public class DistributedMastershipStore
return null;
}
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
}
@Override
protected void onAdd(DeviceId deviceId, NodeId nodeId) {
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
@Override
protected void onRemove(DeviceId deviceId, NodeId nodeId) {
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
@Override
protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
}
}
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz-common</artifactId>
<packaging>bundle</packaging>
<description>ONOS Hazelcast based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package org.onlab.onos.store.impl;
package org.onlab.onos.store.common;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
......
package org.onlab.onos.store.impl;
package org.onlab.onos.store.common;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
......@@ -6,6 +6,8 @@ import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
......@@ -13,7 +15,6 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.common.StoreService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -23,7 +24,7 @@ import static org.slf4j.LoggerFactory.getLogger;
* Abstraction of a distributed store based on Hazelcast.
*/
@Component(componentAbstract = true)
public abstract class AbstractDistributedStore<E extends Event, D extends StoreDelegate<E>>
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
protected final Logger log = getLogger(getClass());
......@@ -66,8 +67,9 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
private LoadingCache<K, Optional<V>> cache;
/**
......@@ -75,17 +77,26 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
*
* @param cache cache to update
*/
public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
this.localMember = theInstance.getCluster().getLocalMember();
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
cache.invalidateAll();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
......@@ -95,6 +106,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
......@@ -106,6 +121,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getOldValue());
cache.invalidate(key);
......@@ -141,4 +160,80 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
}
}
/**
* Distributed object remote event entry listener.
*
* @param <K> Entry key type after deserialization
* @param <V> Entry value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
public RemoteEventHandler() {
this.localMember = theInstance.getCluster().getLocalMember();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
onAdd(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
onRemove(key, val);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
V newVal = deserialize(event.getValue());
onUpdate(key, oldVal, newVal);
}
/**
* Remote entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Remote entry update hook.
*
* @param key new key
* @param oldValue old value
* @param newVal new value
*/
protected void onUpdate(K key, V oldValue, V newVal) {
}
/**
* Remote entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
}
......
package org.onlab.onos.store.impl;
package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.common.StoreService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
......
package org.onlab.onos.store.impl;
package org.onlab.onos.store.common;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
......@@ -27,7 +27,6 @@ import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
......@@ -35,7 +34,6 @@ import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.OnosTimestampSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
......@@ -102,7 +100,6 @@ public class StoreManager implements StoreService {
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(OnosTimestamp.class, new OnosTimestampSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
......
package org.onlab.onos.store.impl;
package org.onlab.onos.store.common;
import java.io.FileNotFoundException;
import java.util.UUID;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz-net</artifactId>
<packaging>bundle</packaging>
<description>ONOS Hazelcast based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Predicates.notNull;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
......@@ -26,9 +27,9 @@ import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -52,7 +53,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class DistributedDeviceStore
extends AbstractDistributedStore<DeviceEvent, DeviceStoreDelegate>
extends AbstractHazelcastStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
......@@ -71,6 +72,10 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevicePorts;
private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
private String devicesListener;
private String portsListener;
@Override
@Activate
public void activate() {
......@@ -85,7 +90,7 @@ public class DistributedDeviceStore
= new OptionalCacheLoader<>(storeService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
......@@ -95,7 +100,7 @@ public class DistributedDeviceStore
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
loadDeviceCache();
loadDevicePortsCache();
......@@ -105,6 +110,8 @@ public class DistributedDeviceStore
@Deactivate
public void deactivate() {
rawDevicePorts.removeEntryListener(portsListener);
rawDevices.removeEntryListener(devicesListener);
log.info("Stopped");
}
......@@ -353,7 +360,7 @@ public class DistributedDeviceStore
}
}
private class RemoteDeviceEventHandler extends RemoteEventHandler<DeviceId, DefaultDevice> {
private class RemoteDeviceEventHandler extends RemoteCacheEventHandler<DeviceId, DefaultDevice> {
public RemoteDeviceEventHandler(LoadingCache<DeviceId, Optional<DefaultDevice>> cache) {
super(cache);
}
......@@ -374,7 +381,7 @@ public class DistributedDeviceStore
}
}
private class RemotePortEventHandler extends RemoteEventHandler<DeviceId, Map<PortNumber, Port>> {
private class RemotePortEventHandler extends RemoteCacheEventHandler<DeviceId, Map<PortNumber, Port>> {
public RemotePortEventHandler(LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> cache) {
super(cache);
}
......
package org.onlab.onos.store.device.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
/**
* Dummy implementation of {@link ClockService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockService implements ClockService {
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
return new Timestamp() {
@Override
public int compareTo(Timestamp o) {
throw new IllegalStateException("Never expected to be used.");
}
};
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
}
}
package org.onlab.onos.store.flow.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Collections;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, FlowRule> flowEntries =
ArrayListMultimap.<DeviceId, FlowRule>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public synchronized FlowRule getFlowRule(FlowRule rule) {
for (FlowRule f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
}
return null;
}
@Override
public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
Collection<FlowRule> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
}
return ImmutableSet.copyOf(rules);
}
@Override
public synchronized void storeFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
flowEntriesById.put(rule.appId(), f);
}
}
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
DeviceId did = f.deviceId();
/*
* find the rule and mark it for deletion.
* Ultimately a flow removed will come remove it.
*/
if (flowEntries.containsEntry(did, f)) {
//synchronized (flowEntries) {
flowEntries.remove(did, f);
flowEntries.put(did, f);
flowEntriesById.remove(rule.appId(), rule);
//}
}
}
@Override
public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
if (flowEntries.containsEntry(did, rule)) {
//synchronized (flowEntries) {
// Multimaps support duplicates so we have to remove our rule
// and replace it with the current version.
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
//}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
return new FlowRuleEvent(RULE_ADDED, rule);
}
@Override
public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
//synchronized (this) {
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
//}
}
}
package org.onlab.onos.store.host.impl;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.HostStoreDelegate;
import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Manages inventory of end-station hosts using trivial in-memory
* implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedHostStore
extends AbstractStore<HostEvent, HostStoreDelegate>
implements HostStore {
private final Logger log = getLogger(getClass());
// Host inventory
private final Map<HostId, Host> hosts = new ConcurrentHashMap<>();
// Hosts tracked by their location
private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
private final Map<ConnectPoint, PortAddresses> portAddresses =
new ConcurrentHashMap<>();
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
HostDescription hostDescription) {
Host host = hosts.get(hostId);
if (host == null) {
return createHost(providerId, hostId, hostDescription);
}
return updateHost(providerId, host, hostDescription);
}
// creates a new host and sends HOST_ADDED
private HostEvent createHost(ProviderId providerId, HostId hostId,
HostDescription descr) {
DefaultHost newhost = new DefaultHost(providerId, hostId,
descr.hwAddress(),
descr.vlan(),
descr.location(),
descr.ipAddresses());
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
}
return new HostEvent(HOST_ADDED, newhost);
}
// checks for type of update to host, sends appropriate event
private HostEvent updateHost(ProviderId providerId, Host host,
HostDescription descr) {
DefaultHost updated;
HostEvent event;
if (!host.location().equals(descr.location())) {
updated = new DefaultHost(providerId, host.id(),
host.mac(),
host.vlan(),
descr.location(),
host.ipAddresses());
event = new HostEvent(HOST_MOVED, updated);
} else if (!(host.ipAddresses().equals(descr.ipAddresses()))) {
updated = new DefaultHost(providerId, host.id(),
host.mac(),
host.vlan(),
descr.location(),
descr.ipAddresses());
event = new HostEvent(HOST_UPDATED, updated);
} else {
return null;
}
synchronized (this) {
hosts.put(host.id(), updated);
locations.remove(host.location(), host);
locations.put(updated.location(), updated);
}
return event;
}
@Override
public HostEvent removeHost(HostId hostId) {
synchronized (this) {
Host host = hosts.remove(hostId);
if (host != null) {
locations.remove((host.location()), host);
return new HostEvent(HOST_REMOVED, host);
}
return null;
}
}
@Override
public int getHostCount() {
return hosts.size();
}
@Override
public Iterable<Host> getHosts() {
return Collections.unmodifiableSet(new HashSet<>(hosts.values()));
}
@Override
public Host getHost(HostId hostId) {
return hosts.get(hostId);
}
@Override
public Set<Host> getHosts(VlanId vlanId) {
Set<Host> vlanset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.vlan().equals(vlanId)) {
vlanset.add(h);
}
}
return vlanset;
}
@Override
public Set<Host> getHosts(MacAddress mac) {
Set<Host> macset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.mac().equals(mac)) {
macset.add(h);
}
}
return macset;
}
@Override
public Set<Host> getHosts(IpPrefix ip) {
Set<Host> ipset = new HashSet<>();
for (Host h : hosts.values()) {
if (h.ipAddresses().contains(ip)) {
ipset.add(h);
}
}
return ipset;
}
@Override
public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
return ImmutableSet.copyOf(locations.get(connectPoint));
}
@Override
public Set<Host> getConnectedHosts(DeviceId deviceId) {
Set<Host> hostset = new HashSet<>();
for (ConnectPoint p : locations.keySet()) {
if (p.deviceId().equals(deviceId)) {
hostset.addAll(locations.get(p));
}
}
return hostset;
}
@Override
public void updateAddressBindings(PortAddresses addresses) {
synchronized (portAddresses) {
PortAddresses existing = portAddresses.get(addresses.connectPoint());
if (existing == null) {
portAddresses.put(addresses.connectPoint(), addresses);
} else {
Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
.immutableCopy();
MacAddress newMac = (addresses.mac() == null) ? existing.mac()
: addresses.mac();
PortAddresses newAddresses =
new PortAddresses(addresses.connectPoint(), union, newMac);
portAddresses.put(newAddresses.connectPoint(), newAddresses);
}
}
}
@Override
public void removeAddressBindings(PortAddresses addresses) {
synchronized (portAddresses) {
PortAddresses existing = portAddresses.get(addresses.connectPoint());
if (existing != null) {
Set<IpPrefix> difference =
Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
// If they removed the existing mac, set the new mac to null.
// Otherwise, keep the existing mac.
MacAddress newMac = existing.mac();
if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
newMac = null;
}
PortAddresses newAddresses =
new PortAddresses(addresses.connectPoint(), difference, newMac);
portAddresses.put(newAddresses.connectPoint(), newAddresses);
}
}
}
@Override
public void clearAddressBindings(ConnectPoint connectPoint) {
synchronized (portAddresses) {
portAddresses.remove(connectPoint);
}
}
@Override
public Set<PortAddresses> getAddressBindings() {
synchronized (portAddresses) {
return new HashSet<>(portAddresses.values());
}
}
@Override
public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
PortAddresses addresses;
synchronized (portAddresses) {
addresses = portAddresses.get(connectPoint);
}
if (addresses == null) {
addresses = new PortAddresses(connectPoint, null, null);
}
return addresses;
}
}
......@@ -10,6 +10,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -24,9 +25,9 @@ import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import org.slf4j.Logger;
import com.google.common.base.Optional;
......@@ -43,7 +44,7 @@ import com.hazelcast.core.IMap;
@Component(immediate = true)
@Service
public class DistributedLinkStore
extends AbstractDistributedStore<LinkEvent, LinkStoreDelegate>
extends AbstractHazelcastStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
......@@ -57,6 +58,8 @@ public class DistributedLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private String linksListener;
@Override
@Activate
public void activate() {
......@@ -70,7 +73,7 @@ public class DistributedLinkStore
= new OptionalCacheLoader<>(storeService, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
loadLinkCache();
......@@ -79,7 +82,7 @@ public class DistributedLinkStore
@Deactivate
public void deactivate() {
super.activate();
rawLinks.removeEntryListener(linksListener);
log.info("Stopped");
}
......@@ -232,7 +235,7 @@ public class DistributedLinkStore
}
}
private class RemoteLinkEventHandler extends RemoteEventHandler<LinkKey, DefaultLink> {
private class RemoteLinkEventHandler extends RemoteCacheEventHandler<LinkKey, DefaultLink> {
public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
super(cache);
}
......
package org.onlab.onos.store.topology.impl;
import org.onlab.graph.AdjacencyListsGraph;
import org.onlab.onos.net.topology.TopologyEdge;
import org.onlab.onos.net.topology.TopologyGraph;
import org.onlab.onos.net.topology.TopologyVertex;
import java.util.Set;
/**
* Default implementation of an immutable topology graph based on a generic
* implementation of adjacency lists graph.
*/
public class DefaultTopologyGraph
extends AdjacencyListsGraph<TopologyVertex, TopologyEdge>
implements TopologyGraph {
/**
* Creates a topology graph comprising of the specified vertexes and edges.
*
* @param vertexes set of graph vertexes
* @param edges set of graph edges
*/
public DefaultTopologyGraph(Set<TopologyVertex> vertexes, Set<TopologyEdge> edges) {
super(vertexes, edges);
}
}
package org.onlab.onos.store.topology.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.Event;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.ClusterId;
import org.onlab.onos.net.topology.GraphDescription;
import org.onlab.onos.net.topology.LinkWeight;
import org.onlab.onos.net.topology.Topology;
import org.onlab.onos.net.topology.TopologyCluster;
import org.onlab.onos.net.topology.TopologyEvent;
import org.onlab.onos.net.topology.TopologyGraph;
import org.onlab.onos.net.topology.TopologyStore;
import org.onlab.onos.net.topology.TopologyStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
/**
* Manages inventory of topology snapshots using trivial in-memory
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedTopologyStore
extends AbstractStore<TopologyEvent, TopologyStoreDelegate>
implements TopologyStore {
private final Logger log = getLogger(getClass());
private volatile DefaultTopology current;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public Topology currentTopology() {
return current;
}
@Override
public boolean isLatest(Topology topology) {
// Topology is current only if it is the same as our current topology
return topology == current;
}
@Override
public TopologyGraph getGraph(Topology topology) {
return defaultTopology(topology).getGraph();
}
@Override
public Set<TopologyCluster> getClusters(Topology topology) {
return defaultTopology(topology).getClusters();
}
@Override
public TopologyCluster getCluster(Topology topology, ClusterId clusterId) {
return defaultTopology(topology).getCluster(clusterId);
}
@Override
public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) {
return defaultTopology(topology).getClusterDevices(cluster);
}
@Override
public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) {
return defaultTopology(topology).getClusterLinks(cluster);
}
@Override
public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) {
return defaultTopology(topology).getPaths(src, dst);
}
@Override
public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst,
LinkWeight weight) {
return defaultTopology(topology).getPaths(src, dst, weight);
}
@Override
public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
return defaultTopology(topology).isInfrastructure(connectPoint);
}
@Override
public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) {
return defaultTopology(topology).isBroadcastPoint(connectPoint);
}
@Override
public TopologyEvent updateTopology(ProviderId providerId,
GraphDescription graphDescription,
List<Event> reasons) {
// First off, make sure that what we're given is indeed newer than
// what we already have.
if (current != null && graphDescription.timestamp() < current.time()) {
return null;
}
// Have the default topology construct self from the description data.
DefaultTopology newTopology =
new DefaultTopology(providerId, graphDescription);
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
}
}
// Validates the specified topology and returns it as a default
private DefaultTopology defaultTopology(Topology topology) {
if (topology instanceof DefaultTopology) {
return (DefaultTopology) topology;
}
throw new IllegalArgumentException("Topology class " + topology.getClass() +
" not supported");
}
}
package org.onlab.onos.store.topology.impl;
import org.onlab.onos.net.DeviceId;
import java.util.Objects;
/**
* Key for filing pre-computed paths between source and destination devices.
*/
class PathKey {
private final DeviceId src;
private final DeviceId dst;
/**
* Creates a path key from the given source/dest pair.
* @param src source device
* @param dst destination device
*/
PathKey(DeviceId src, DeviceId dst) {
this.src = src;
this.dst = dst;
}
@Override
public int hashCode() {
return Objects.hash(src, dst);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof PathKey) {
final PathKey other = (PathKey) obj;
return Objects.equals(this.src, other.src) && Objects.equals(this.dst, other.dst);
}
return false;
}
}
......@@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -32,15 +33,18 @@ import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.impl.StoreManager;
import org.onlab.onos.store.impl.TestStoreManager;
import org.onlab.onos.store.common.TestStoreManager;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed DeviceStore implementation.
*/
public class DistributedDeviceStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
......@@ -326,6 +330,7 @@ public class DistributedDeviceStoreTest {
}
// TODO add test for Port events when we have them
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
final CountDownLatch addLatch = new CountDownLatch(1);
......
......@@ -15,6 +15,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
......@@ -26,24 +27,22 @@ import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.impl.StoreManager;
import org.onlab.onos.store.impl.TestStoreManager;
import org.onlab.onos.store.common.TestStoreManager;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed LinkStore implementation.
*/
public class DistributedLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
// private static final String MFR = "whitebox";
// private static final String HW = "1.1.x";
// private static final String SW1 = "3.8.1";
// private static final String SW2 = "3.9.5";
// private static final String SN = "43311-12345";
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
......@@ -302,6 +301,7 @@ public class DistributedLinkStoreTest {
assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
}
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz</artifactId>
<packaging>pom</packaging>
<description>ONOS Core Hazelcast Store subsystem</description>
<modules>
<module>common</module>
<module>cluster</module>
<module>net</module>
</modules>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-junit</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
......@@ -12,34 +10,41 @@
</parent>
<artifactId>onos-core-store</artifactId>
<packaging>bundle</packaging>
<packaging>pom</packaging>
<description>ONOS distributed store subsystems</description>
<description>ONOS Core Store subsystem</description>
<modules>
<module>trivial</module>
<module>dist</module>
<module>hz</module>
<module>serializers</module>
</modules>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-junit</artifactId>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<artifactId>maven-bundle-plugin</artifactId>
</plugin>
</plugins>
</build>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-serializers</artifactId>
<packaging>bundle</packaging>
<description>Serializers for ONOS classes</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
......@@ -6,7 +6,7 @@
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core</artifactId>
<artifactId>onos-core-store</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
package org.onlab.onos.net.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
/**
* Dummy implementation of {@link ClockService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockService implements ClockService {
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
return new Timestamp() {
@Override
public int compareTo(Timestamp o) {
throw new IllegalStateException("Never expected to be used.");
}
};
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
}
}
......@@ -20,7 +20,7 @@ import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure DEVICES using trivial in-memory
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
*/
@Component(immediate = true)
......@@ -68,6 +68,11 @@ public class SimpleClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return null;
}
@Override
public void removeNode(NodeId nodeId) {
}
......
......@@ -101,9 +101,6 @@ public class SimpleDeviceStore
synchronized (this) {
devices.put(deviceId, device);
availableDevices.add(deviceId);
// For now claim the device as a master automatically.
// roles.put(deviceId, MastershipRole.MASTER);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
}
......@@ -189,7 +186,7 @@ public class SimpleDeviceStore
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled());
ports.put(port.number(), updatedPort);
return new DeviceEvent(PORT_UPDATED, device, port);
return new DeviceEvent(PORT_UPDATED, device, updatedPort);
}
return null;
}
......
......@@ -51,8 +51,6 @@ public class SimpleLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private static final Set<Link> EMPTY = ImmutableSet.of();
@Activate
public void activate() {
log.info("Started");
......
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.