Preliminary checkin for implementing version aware topology state.
Showing
7 changed files
with
137 additions
and
27 deletions
| ... | @@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService; | ... | @@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService; |
| 16 | import org.onlab.onos.cluster.MastershipEvent; | 16 | import org.onlab.onos.cluster.MastershipEvent; |
| 17 | import org.onlab.onos.cluster.MastershipListener; | 17 | import org.onlab.onos.cluster.MastershipListener; |
| 18 | import org.onlab.onos.cluster.MastershipService; | 18 | import org.onlab.onos.cluster.MastershipService; |
| 19 | +import org.onlab.onos.cluster.MastershipTerm; | ||
| 19 | import org.onlab.onos.event.AbstractListenerRegistry; | 20 | import org.onlab.onos.event.AbstractListenerRegistry; |
| 20 | import org.onlab.onos.event.EventDeliveryService; | 21 | import org.onlab.onos.event.EventDeliveryService; |
| 21 | import org.onlab.onos.net.Device; | 22 | import org.onlab.onos.net.Device; |
| ... | @@ -36,6 +37,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate; | ... | @@ -36,6 +37,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate; |
| 36 | import org.onlab.onos.net.device.PortDescription; | 37 | import org.onlab.onos.net.device.PortDescription; |
| 37 | import org.onlab.onos.net.provider.AbstractProviderRegistry; | 38 | import org.onlab.onos.net.provider.AbstractProviderRegistry; |
| 38 | import org.onlab.onos.net.provider.AbstractProviderService; | 39 | import org.onlab.onos.net.provider.AbstractProviderService; |
| 40 | +import org.onlab.onos.store.common.ClockService; | ||
| 39 | import org.slf4j.Logger; | 41 | import org.slf4j.Logger; |
| 40 | 42 | ||
| 41 | /** | 43 | /** |
| ... | @@ -74,6 +76,9 @@ implements DeviceService, DeviceAdminService, DeviceProviderRegistry { | ... | @@ -74,6 +76,9 @@ implements DeviceService, DeviceAdminService, DeviceProviderRegistry { |
| 74 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 76 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 75 | protected MastershipService mastershipService; | 77 | protected MastershipService mastershipService; |
| 76 | 78 | ||
| 79 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
| 80 | + protected ClockService clockService; | ||
| 81 | + | ||
| 77 | @Activate | 82 | @Activate |
| 78 | public void activate() { | 83 | public void activate() { |
| 79 | store.setDelegate(delegate); | 84 | store.setDelegate(delegate); |
| ... | @@ -250,6 +255,8 @@ implements DeviceService, DeviceAdminService, DeviceProviderRegistry { | ... | @@ -250,6 +255,8 @@ implements DeviceService, DeviceAdminService, DeviceProviderRegistry { |
| 250 | @Override | 255 | @Override |
| 251 | public void event(MastershipEvent event) { | 256 | public void event(MastershipEvent event) { |
| 252 | if (event.master().equals(clusterService.getLocalNode().id())) { | 257 | if (event.master().equals(clusterService.getLocalNode().id())) { |
| 258 | + MastershipTerm term = mastershipService.requestTermService().getMastershipTerm(event.subject()); | ||
| 259 | + clockService.setMastershipTerm(event.subject(), term); | ||
| 253 | applyRole(event.subject(), MastershipRole.MASTER); | 260 | applyRole(event.subject(), MastershipRole.MASTER); |
| 254 | } else { | 261 | } else { |
| 255 | applyRole(event.subject(), MastershipRole.STANDBY); | 262 | applyRole(event.subject(), MastershipRole.STANDBY); | ... | ... |
| 1 | +package org.onlab.onos.store.common; | ||
| 2 | + | ||
| 3 | +import org.onlab.onos.cluster.MastershipTerm; | ||
| 4 | +import org.onlab.onos.net.DeviceId; | ||
| 5 | +import org.onlab.onos.store.Timestamp; | ||
| 6 | + | ||
| 7 | +/** | ||
| 8 | + * Interface for a logical clock service that vends per device timestamps. | ||
| 9 | + */ | ||
| 10 | +public interface ClockService { | ||
| 11 | + | ||
| 12 | + /** | ||
| 13 | + * Returns a new timestamp for the specified deviceId. | ||
| 14 | + * @param deviceId device identifier. | ||
| 15 | + * @return timestamp. | ||
| 16 | + */ | ||
| 17 | + public Timestamp getTimestamp(DeviceId deviceId); | ||
| 18 | + | ||
| 19 | + /** | ||
| 20 | + * Updates the mastership term for the specified deviceId. | ||
| 21 | + * @param deviceId device identifier. | ||
| 22 | + * @param term mastership term. | ||
| 23 | + */ | ||
| 24 | + public void setMastershipTerm(DeviceId deviceId, MastershipTerm term); | ||
| 25 | +} |
| 1 | +package org.onlab.onos.store.device.impl; | ||
| 2 | + | ||
| 3 | +import static org.slf4j.LoggerFactory.getLogger; | ||
| 4 | + | ||
| 5 | +import java.util.concurrent.ConcurrentHashMap; | ||
| 6 | +import java.util.concurrent.ConcurrentMap; | ||
| 7 | +import java.util.concurrent.atomic.AtomicInteger; | ||
| 8 | + | ||
| 9 | +import org.apache.felix.scr.annotations.Activate; | ||
| 10 | +import org.apache.felix.scr.annotations.Component; | ||
| 11 | +import org.apache.felix.scr.annotations.Deactivate; | ||
| 12 | +import org.apache.felix.scr.annotations.Service; | ||
| 13 | +import org.onlab.onos.cluster.MastershipTerm; | ||
| 14 | +import org.onlab.onos.net.DeviceId; | ||
| 15 | +import org.onlab.onos.store.Timestamp; | ||
| 16 | +import org.onlab.onos.store.common.ClockService; | ||
| 17 | +import org.onlab.onos.store.impl.OnosTimestamp; | ||
| 18 | +import org.slf4j.Logger; | ||
| 19 | + | ||
| 20 | +@Component(immediate = true) | ||
| 21 | +@Service | ||
| 22 | +public class OnosClockService implements ClockService { | ||
| 23 | + | ||
| 24 | + private final Logger log = getLogger(getClass()); | ||
| 25 | + | ||
| 26 | + // TODO: Implement per device ticker that is reset to 0 at the beginning of a new term. | ||
| 27 | + private final AtomicInteger ticker = new AtomicInteger(0); | ||
| 28 | + private ConcurrentMap<DeviceId, MastershipTerm> deviceMastershipTerms = new ConcurrentHashMap<>(); | ||
| 29 | + | ||
| 30 | + @Activate | ||
| 31 | + public void activate() { | ||
| 32 | + log.info("Started"); | ||
| 33 | + } | ||
| 34 | + | ||
| 35 | + @Deactivate | ||
| 36 | + public void deactivate() { | ||
| 37 | + log.info("Stopped"); | ||
| 38 | + } | ||
| 39 | + | ||
| 40 | + @Override | ||
| 41 | + public Timestamp getTimestamp(DeviceId deviceId) { | ||
| 42 | + MastershipTerm term = deviceMastershipTerms.get(deviceId); | ||
| 43 | + if (term == null) { | ||
| 44 | + throw new IllegalStateException("Requesting timestamp for a deviceId without mastership"); | ||
| 45 | + } | ||
| 46 | + return new OnosTimestamp(term.termNumber(), ticker.incrementAndGet()); | ||
| 47 | + } | ||
| 48 | + | ||
| 49 | + @Override | ||
| 50 | + public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) { | ||
| 51 | + deviceMastershipTerms.put(deviceId, term); | ||
| 52 | + } | ||
| 53 | +} |
core/store/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
0 → 100644
This diff is collapsed. Click to expand it.
| 1 | +package org.onlab.onos.store.device.impl; | ||
| 2 | + | ||
| 3 | +import org.onlab.onos.store.Timestamp; | ||
| 4 | + | ||
| 5 | +/** | ||
| 6 | + * Wrapper class for a entity that is versioned | ||
| 7 | + * and can either be up or down. | ||
| 8 | + * | ||
| 9 | + * @param <T> type of the value. | ||
| 10 | + */ | ||
| 11 | +public class VersionedValue<T> { | ||
| 12 | + private final T entity; | ||
| 13 | + private final Timestamp timestamp; | ||
| 14 | + private final boolean isUp; | ||
| 15 | + | ||
| 16 | + public VersionedValue(T entity, boolean isUp, Timestamp timestamp) { | ||
| 17 | + this.entity = entity; | ||
| 18 | + this.isUp = isUp; | ||
| 19 | + this.timestamp = timestamp; | ||
| 20 | + } | ||
| 21 | + | ||
| 22 | + /** | ||
| 23 | + * Returns the value. | ||
| 24 | + * @return value. | ||
| 25 | + */ | ||
| 26 | + public T entity() { | ||
| 27 | + return entity; | ||
| 28 | + } | ||
| 29 | + | ||
| 30 | + /** | ||
| 31 | + * Tells whether the entity is up or down. | ||
| 32 | + * @return true if up, false otherwise. | ||
| 33 | + */ | ||
| 34 | + public boolean isUp() { | ||
| 35 | + return isUp; | ||
| 36 | + } | ||
| 37 | + | ||
| 38 | + /** | ||
| 39 | + * Returns the timestamp (version) associated with this entity. | ||
| 40 | + * @return timestamp. | ||
| 41 | + */ | ||
| 42 | + public Timestamp timestamp() { | ||
| 43 | + return timestamp; | ||
| 44 | + } | ||
| 45 | +} |
| 1 | package org.onlab.onos.store.impl; | 1 | package org.onlab.onos.store.impl; |
| 2 | 2 | ||
| 3 | -import static com.google.common.base.Preconditions.checkNotNull; | ||
| 4 | import static com.google.common.base.Preconditions.checkArgument; | 3 | import static com.google.common.base.Preconditions.checkArgument; |
| 5 | 4 | ||
| 6 | import java.util.Objects; | 5 | import java.util.Objects; |
| 7 | 6 | ||
| 8 | -import org.onlab.onos.net.ElementId; | ||
| 9 | import org.onlab.onos.store.Timestamp; | 7 | import org.onlab.onos.store.Timestamp; |
| 10 | 8 | ||
| 11 | import com.google.common.base.MoreObjects; | 9 | import com.google.common.base.MoreObjects; |
| ... | @@ -14,22 +12,20 @@ import com.google.common.collect.ComparisonChain; | ... | @@ -14,22 +12,20 @@ import com.google.common.collect.ComparisonChain; |
| 14 | // If it is store specific, implement serializable interfaces? | 12 | // If it is store specific, implement serializable interfaces? |
| 15 | /** | 13 | /** |
| 16 | * Default implementation of Timestamp. | 14 | * Default implementation of Timestamp. |
| 15 | + * TODO: Better documentation. | ||
| 17 | */ | 16 | */ |
| 18 | public final class OnosTimestamp implements Timestamp { | 17 | public final class OnosTimestamp implements Timestamp { |
| 19 | 18 | ||
| 20 | - private final ElementId id; | ||
| 21 | private final int termNumber; | 19 | private final int termNumber; |
| 22 | private final int sequenceNumber; | 20 | private final int sequenceNumber; |
| 23 | 21 | ||
| 24 | /** | 22 | /** |
| 25 | * Default version tuple. | 23 | * Default version tuple. |
| 26 | * | 24 | * |
| 27 | - * @param id identifier of the element | ||
| 28 | * @param termNumber the mastership termNumber | 25 | * @param termNumber the mastership termNumber |
| 29 | * @param sequenceNumber the sequenceNumber number within the termNumber | 26 | * @param sequenceNumber the sequenceNumber number within the termNumber |
| 30 | */ | 27 | */ |
| 31 | - public OnosTimestamp(ElementId id, int termNumber, int sequenceNumber) { | 28 | + public OnosTimestamp(int termNumber, int sequenceNumber) { |
| 32 | - this.id = checkNotNull(id); | ||
| 33 | this.termNumber = termNumber; | 29 | this.termNumber = termNumber; |
| 34 | this.sequenceNumber = sequenceNumber; | 30 | this.sequenceNumber = sequenceNumber; |
| 35 | } | 31 | } |
| ... | @@ -38,9 +34,6 @@ public final class OnosTimestamp implements Timestamp { | ... | @@ -38,9 +34,6 @@ public final class OnosTimestamp implements Timestamp { |
| 38 | public int compareTo(Timestamp o) { | 34 | public int compareTo(Timestamp o) { |
| 39 | checkArgument(o instanceof OnosTimestamp, "Must be OnosTimestamp", o); | 35 | checkArgument(o instanceof OnosTimestamp, "Must be OnosTimestamp", o); |
| 40 | OnosTimestamp that = (OnosTimestamp) o; | 36 | OnosTimestamp that = (OnosTimestamp) o; |
| 41 | - checkArgument(this.id.equals(that.id), | ||
| 42 | - "Cannot compare version for different element this:%s, that:%s", | ||
| 43 | - this, that); | ||
| 44 | 37 | ||
| 45 | return ComparisonChain.start() | 38 | return ComparisonChain.start() |
| 46 | .compare(this.termNumber, that.termNumber) | 39 | .compare(this.termNumber, that.termNumber) |
| ... | @@ -50,7 +43,7 @@ public final class OnosTimestamp implements Timestamp { | ... | @@ -50,7 +43,7 @@ public final class OnosTimestamp implements Timestamp { |
| 50 | 43 | ||
| 51 | @Override | 44 | @Override |
| 52 | public int hashCode() { | 45 | public int hashCode() { |
| 53 | - return Objects.hash(id, termNumber, sequenceNumber); | 46 | + return Objects.hash(termNumber, sequenceNumber); |
| 54 | } | 47 | } |
| 55 | 48 | ||
| 56 | @Override | 49 | @Override |
| ... | @@ -62,30 +55,19 @@ public final class OnosTimestamp implements Timestamp { | ... | @@ -62,30 +55,19 @@ public final class OnosTimestamp implements Timestamp { |
| 62 | return false; | 55 | return false; |
| 63 | } | 56 | } |
| 64 | OnosTimestamp that = (OnosTimestamp) obj; | 57 | OnosTimestamp that = (OnosTimestamp) obj; |
| 65 | - return Objects.equals(this.id, that.id) && | 58 | + return Objects.equals(this.termNumber, that.termNumber) && |
| 66 | - Objects.equals(this.termNumber, that.termNumber) && | ||
| 67 | Objects.equals(this.sequenceNumber, that.sequenceNumber); | 59 | Objects.equals(this.sequenceNumber, that.sequenceNumber); |
| 68 | } | 60 | } |
| 69 | 61 | ||
| 70 | @Override | 62 | @Override |
| 71 | public String toString() { | 63 | public String toString() { |
| 72 | return MoreObjects.toStringHelper(getClass()) | 64 | return MoreObjects.toStringHelper(getClass()) |
| 73 | - .add("id", id) | ||
| 74 | .add("termNumber", termNumber) | 65 | .add("termNumber", termNumber) |
| 75 | .add("sequenceNumber", sequenceNumber) | 66 | .add("sequenceNumber", sequenceNumber) |
| 76 | .toString(); | 67 | .toString(); |
| 77 | } | 68 | } |
| 78 | 69 | ||
| 79 | /** | 70 | /** |
| 80 | - * Returns the element. | ||
| 81 | - * | ||
| 82 | - * @return element identifier | ||
| 83 | - */ | ||
| 84 | - public ElementId id() { | ||
| 85 | - return id; | ||
| 86 | - } | ||
| 87 | - | ||
| 88 | - /** | ||
| 89 | * Returns the termNumber. | 71 | * Returns the termNumber. |
| 90 | * | 72 | * |
| 91 | * @return termNumber | 73 | * @return termNumber |
| ... | @@ -102,4 +84,4 @@ public final class OnosTimestamp implements Timestamp { | ... | @@ -102,4 +84,4 @@ public final class OnosTimestamp implements Timestamp { |
| 102 | public int sequenceNumber() { | 84 | public int sequenceNumber() { |
| 103 | return sequenceNumber; | 85 | return sequenceNumber; |
| 104 | } | 86 | } |
| 105 | -} | 87 | +} |
| ... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
| 1 | package org.onlab.onos.store.serializers; | 1 | package org.onlab.onos.store.serializers; |
| 2 | 2 | ||
| 3 | -import org.onlab.onos.net.ElementId; | ||
| 4 | import org.onlab.onos.store.impl.OnosTimestamp; | 3 | import org.onlab.onos.store.impl.OnosTimestamp; |
| 5 | 4 | ||
| 6 | import com.esotericsoftware.kryo.Kryo; | 5 | import com.esotericsoftware.kryo.Kryo; |
| ... | @@ -20,18 +19,17 @@ public class OnosTimestampSerializer extends Serializer<OnosTimestamp> { | ... | @@ -20,18 +19,17 @@ public class OnosTimestampSerializer extends Serializer<OnosTimestamp> { |
| 20 | // non-null, immutable | 19 | // non-null, immutable |
| 21 | super(false, true); | 20 | super(false, true); |
| 22 | } | 21 | } |
| 22 | + | ||
| 23 | @Override | 23 | @Override |
| 24 | public void write(Kryo kryo, Output output, OnosTimestamp object) { | 24 | public void write(Kryo kryo, Output output, OnosTimestamp object) { |
| 25 | - kryo.writeClassAndObject(output, object.id()); | ||
| 26 | output.writeInt(object.termNumber()); | 25 | output.writeInt(object.termNumber()); |
| 27 | output.writeInt(object.sequenceNumber()); | 26 | output.writeInt(object.sequenceNumber()); |
| 28 | } | 27 | } |
| 29 | 28 | ||
| 30 | @Override | 29 | @Override |
| 31 | public OnosTimestamp read(Kryo kryo, Input input, Class<OnosTimestamp> type) { | 30 | public OnosTimestamp read(Kryo kryo, Input input, Class<OnosTimestamp> type) { |
| 32 | - ElementId id = (ElementId) kryo.readClassAndObject(input); | ||
| 33 | final int term = input.readInt(); | 31 | final int term = input.readInt(); |
| 34 | final int sequence = input.readInt(); | 32 | final int sequence = input.readInt(); |
| 35 | - return new OnosTimestamp(id, term, sequence); | 33 | + return new OnosTimestamp(term, sequence); |
| 36 | } | 34 | } |
| 37 | } | 35 | } | ... | ... |
-
Please register or login to post a comment