Yuta HIGUCHI

moving around Madan's pieces

update features.xml to use hazelcast distributed bundle for now

Change-Id: I806dc7f9f2f1db1fdfa8e16f083025888b237937
Showing 14 changed files with 119 additions and 49 deletions
package org.onlab.onos.store.common;
package org.onlab.onos.store;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
// TODO: Consider renaming to DeviceClockService?
/**
* Interface for a logical clock service that vends per device timestamps.
*/
public interface ClockService {
/**
* Returns a new timestamp for the specified deviceId.
* @param deviceId device identifier.
* @return timestamp.
*/
public Timestamp getTimestamp(DeviceId deviceId);
// TODO: Should this be here or separate as Admin service, etc.?
/**
* Updates the mastership term for the specified deviceId.
* @param deviceId device identifier.
......
......@@ -37,7 +37,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.onlab.onos.store.common.ClockService;
import org.onlab.onos.store.ClockService;
import org.slf4j.Logger;
/**
......
......@@ -12,21 +12,21 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.common.ClockService;
import org.onlab.onos.store.impl.OnosTimestamp;
import org.slf4j.Logger;
@Component(immediate = true)
@Service
public class OnosClockService implements ClockService {
private final Logger log = getLogger(getClass());
// TODO: Implement per device ticker that is reset to 0 at the beginning of a new term.
private final AtomicInteger ticker = new AtomicInteger(0);
private ConcurrentMap<DeviceId, MastershipTerm> deviceMastershipTerms = new ConcurrentHashMap<>();
@Activate
public void activate() {
log.info("Started");
......@@ -36,7 +36,7 @@ public class OnosClockService implements ClockService {
public void deactivate() {
log.info("Stopped");
}
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
MastershipTerm term = deviceMastershipTerms.get(deviceId);
......
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
......@@ -25,9 +25,9 @@ import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.common.ClockService;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -52,7 +52,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class OnosDistributedDeviceStore
extends AbstractDistributedStore<DeviceEvent, DeviceStoreDelegate>
extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
......@@ -61,21 +61,19 @@ public class OnosDistributedDeviceStore
private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Override
@Activate
public void activate() {
super.activate();
devices = new ConcurrentHashMap<>();
devicePorts = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
......@@ -107,12 +105,13 @@ public class OnosDistributedDeviceStore
DeviceDescription deviceDescription) {
Timestamp now = clockService.getTimestamp(deviceId);
VersionedValue<Device> device = devices.get(deviceId);
if (device == null) {
return createDevice(providerId, deviceId, deviceDescription, now);
}
Preconditions.checkState(now.compareTo(device.timestamp()) > 0, "Existing device has a timestamp in the future!");
checkState(now.compareTo(device.timestamp()) > 0,
"Existing device has a timestamp in the future!");
return updateDevice(providerId, device.entity(), deviceDescription, now);
}
......@@ -156,7 +155,8 @@ public class OnosDistributedDeviceStore
desc.swVersion(),
desc.serialNumber());
VersionedValue<Device> oldDevice = devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
VersionedValue<Device> oldDevice = devices.put(device.id(),
new VersionedValue<Device>(updated, true, timestamp));
if (!oldDevice.isUp()) {
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
} else {
......@@ -168,21 +168,20 @@ public class OnosDistributedDeviceStore
public DeviceEvent markOffline(DeviceId deviceId) {
VersionedValue<Device> device = devices.get(deviceId);
boolean willRemove = device != null && device.isUp();
if (!willRemove) return null;
if (!willRemove) {
return null;
}
Timestamp timestamp = clockService.getTimestamp(deviceId);
if (replaceIfLatest(device.entity(), false, timestamp))
{
if (replaceIfLatest(device.entity(), false, timestamp)) {
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
}
return null;
}
// Replace existing value if its timestamp is older.
private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp)
{
private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
VersionedValue<Device> existingValue = devices.get(device.id());
if (timestamp.compareTo(existingValue.timestamp()) > 0)
{
if (timestamp.compareTo(existingValue.timestamp()) > 0) {
devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
return true;
}
......@@ -203,8 +202,11 @@ public class OnosDistributedDeviceStore
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
VersionedValue<Port> port = ports.get(portDescription.portNumber());
if (port == null) events.add(createPort(device, portDescription, ports, timestamp));
Preconditions.checkState(timestamp.compareTo(port.timestamp()) > 0, "Existing port state has a timestamp in the future!");
if (port == null) {
events.add(createPort(device, portDescription, ports, timestamp));
}
checkState(timestamp.compareTo(port.timestamp()) > 0,
"Existing port state has a timestamp in the future!");
events.add(updatePort(device, port, portDescription, ports, timestamp));
processed.add(portDescription.portNumber());
}
......@@ -304,8 +306,10 @@ public class OnosDistributedDeviceStore
@Override
public List<Port> getPorts(DeviceId deviceId) {
Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
if (versionedPorts == null) return Collections.emptyList();
List<Port> ports = new ArrayList<Port>();
if (versionedPorts == null) {
return Collections.emptyList();
}
List<Port> ports = new ArrayList<>();
for (VersionedValue<Port> port : versionedPorts.values()) {
ports.add(port.entity());
}
......
......@@ -3,7 +3,7 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.store.Timestamp;
/**
* Wrapper class for a entity that is versioned
* Wrapper class for a entity that is versioned
* and can either be up or down.
*
* @param <T> type of the value.
......@@ -12,13 +12,13 @@ public class VersionedValue<T> {
private final T entity;
private final Timestamp timestamp;
private final boolean isUp;
public VersionedValue(T entity, boolean isUp, Timestamp timestamp) {
this.entity = entity;
this.isUp = isUp;
this.timestamp = timestamp;
}
/**
* Returns the value.
* @return value.
......@@ -26,7 +26,7 @@ public class VersionedValue<T> {
public T entity() {
return entity;
}
/**
* Tells whether the entity is up or down.
* @return true if up, false otherwise.
......@@ -34,7 +34,7 @@ public class VersionedValue<T> {
public boolean isUp() {
return isUp;
}
/**
* Returns the timestamp (version) associated with this entity.
* @return timestamp.
......
......@@ -84,4 +84,4 @@ public final class OnosTimestamp implements Timestamp {
public int sequenceNumber() {
return sequenceNumber;
}
}
\ No newline at end of file
}
......
......@@ -19,7 +19,7 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.AbstractHazelcastStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.packet.IpPrefix;
......@@ -38,7 +38,7 @@ import static org.onlab.onos.cluster.ControllerNode.State;
@Component(immediate = true)
@Service
public class DistributedClusterStore
extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private IMap<byte[], byte[]> rawNodes;
......
......@@ -22,7 +22,7 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.AbstractHazelcastStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import com.google.common.base.Optional;
......@@ -36,7 +36,7 @@ import com.hazelcast.core.IMap;
@Component(immediate = true)
@Service
public class DistributedMastershipStore
extends AbstractDistributedStore<MastershipEvent, MastershipStoreDelegate>
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
private IMap<byte[], byte[]> rawMasters;
......
package org.onlab.onos.store.common;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
// FIXME: Code clone in onos-core-trivial, onos-core-hz
/**
* Dummy implementation of {@link ClockService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockService implements ClockService {
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
return new Timestamp() {
@Override
public int compareTo(Timestamp o) {
throw new IllegalStateException("Never expected to be used.");
}
};
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
}
}
......@@ -27,7 +27,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.AbstractHazelcastStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.slf4j.Logger;
......@@ -52,7 +52,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class DistributedDeviceStore
extends AbstractDistributedStore<DeviceEvent, DeviceStoreDelegate>
extends AbstractHazelcastStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
......
......@@ -23,7 +23,7 @@ import static org.slf4j.LoggerFactory.getLogger;
* Abstraction of a distributed store based on Hazelcast.
*/
@Component(componentAbstract = true)
public abstract class AbstractDistributedStore<E extends Event, D extends StoreDelegate<E>>
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
protected final Logger log = getLogger(getClass());
......
......@@ -25,7 +25,7 @@ import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.AbstractHazelcastStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.slf4j.Logger;
......@@ -43,7 +43,7 @@ import com.hazelcast.core.IMap;
@Component(immediate = true)
@Service
public class DistributedLinkStore
extends AbstractDistributedStore<LinkEvent, LinkStoreDelegate>
extends AbstractHazelcastStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
......
package org.onlab.onos.net.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
//FIXME: Code clone in onos-core-trivial, onos-core-hz
/**
* Dummy implementation of {@link ClockService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockService implements ClockService {
@Override
public Timestamp getTimestamp(DeviceId deviceId) {
return new Timestamp() {
@Override
public int compareTo(Timestamp o) {
throw new IllegalStateException("Never expected to be used.");
}
};
}
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
}
}
......@@ -48,7 +48,8 @@
description="ONOS core components">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-store/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-trivial" version="1.0.0"
......