pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

......@@ -18,10 +18,10 @@ import com.google.common.collect.ImmutableSet;
* Suggest to the sender about the more up-to-date data this node has,
* and request for more recent data that the receiver has.
*/
public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
private final ImmutableMap<ID, V> suggestion;
private final ImmutableSet<ID> request;
/**
......@@ -32,7 +32,7 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
* @param request Collection of identifiers
*/
public AntiEntropyReply(NodeId sender,
Map<ID, VersionedValue<VALUE>> suggestion,
Map<ID, V> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
......@@ -44,14 +44,34 @@ public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
return sender;
}
public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
/**
* Returns collection of values, which the recipient of this reply is likely
* to be missing or has outdated version.
*
* @return
*/
public ImmutableMap<ID, V> suggestion() {
return suggestion;
}
/**
* Returns collection of identifier to request.
*
* @return collection of identifier to request
*/
public ImmutableSet<ID> request() {
return request;
}
/**
* Checks if reply contains any suggestion or request.
*
* @return true if nothing is suggested and requested
*/
public boolean isEmpty() {
return suggestion.isEmpty() && request.isEmpty();
}
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
......
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
public class DeviceAntiEntropyAdvertisement
extends AntiEntropyAdvertisement<DeviceId> {
public DeviceAntiEntropyAdvertisement(NodeId sender,
Map<DeviceId, Timestamp> advertisement) {
super(sender, advertisement);
}
// May need to add ProviderID, etc.
public static DeviceAntiEntropyAdvertisement create(
NodeId self,
Collection<VersionedValue<Device>> localValues) {
Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
for (VersionedValue<Device> e : localValues) {
ads.put(e.entity().id(), e.timestamp());
}
return new DeviceAntiEntropyAdvertisement(self, ads);
}
// For serializer
protected DeviceAntiEntropyAdvertisement() {}
}
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class DeviceAntiEntropyReply
extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
public DeviceAntiEntropyReply(NodeId sender,
Map<DeviceId, VersionedValue<Device>> suggestion,
Set<DeviceId> request) {
super(sender, suggestion, request);
}
/**
* Creates a reply to Anti-Entropy advertisement.
*
* @param advertisement to respond to
* @param self node identifier representing local node
* @param localValues local values held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyAdvertisement advertisement,
NodeId self,
Collection<VersionedValue<Device>> localValues
) {
ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
sug = ImmutableMap.builder();
Set<DeviceId> req = new HashSet<>(ads.keySet());
for (VersionedValue<Device> e : localValues) {
final DeviceId id = e.entity().id();
final Timestamp local = e.timestamp();
final Timestamp theirs = ads.get(id);
if (theirs == null) {
// they don't have it, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.compareTo(theirs) < 0) {
// they got older one, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.equals(theirs)) {
// same, don't need theirs
req.remove(id);
}
}
return new DeviceAntiEntropyReply(self, sug.build(), req);
}
/**
* Creates a reply to request for values held locally.
*
* @param requests message containing the request
* @param self node identifier representing local node
* @param localValues local valeds held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyReply requests,
NodeId self,
Map<DeviceId, VersionedValue<Device>> localValues
) {
Set<DeviceId> reqs = requests.request();
Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
for (DeviceId id : reqs) {
final VersionedValue<Device> value = localValues.get(id);
if (value != null) {
requested.put(id, value);
}
}
Set<DeviceId> empty = ImmutableSet.of();
return new DeviceAntiEntropyReply(self, requested, empty);
}
// For serializer
protected DeviceAntiEntropyReply() {}
}
......@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
......
package org.onlab.onos.store.device.impl;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
/**
......@@ -44,6 +46,29 @@ public class VersionedValue<T> {
}
@Override
public int hashCode() {
return Objects.hash(entity, timestamp, isUp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked")
VersionedValue<T> that = (VersionedValue<T>) obj;
return Objects.equals(this.entity, that.entity) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.isUp, that.isUp);
}
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -100,4 +101,14 @@ public class KryoSerializationManager implements KryoSerializationService {
return serializerPool.deserialize(bytes);
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
}
......
package org.onlab.onos.store.serializers;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
......@@ -16,6 +18,15 @@ public interface KryoSerializationService {
public byte[] serialize(final Object obj);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
......@@ -24,4 +35,12 @@ public interface KryoSerializationService {
*/
public <T> T deserialize(final byte[] bytes);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param buffer bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final ByteBuffer buffer);
}
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.net.MastershipRole;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.net.MastershipRole}.
*/
public class MastershipRoleSerializer extends Serializer<MastershipRole> {
@Override
public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
final String role = kryo.readObject(input, String.class);
return MastershipRole.valueOf(role);
}
@Override
public void write(Kryo kryo, Output output, MastershipRole object) {
kryo.writeObject(output, object.toString());
}
}
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.cluster.MastershipTerm}.
*/
public class MastershipTermSerializer extends Serializer<MastershipTerm> {
@Override
public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
final NodeId node = new NodeId(kryo.readObject(input, String.class));
final int term = input.readInt();
return MastershipTerm.of(node, term);
}
@Override
public void write(Kryo kryo, Output output, MastershipTerm object) {
output.writeString(object.master().toString());
output.writeInt(object.termNumber());
}
}
......@@ -12,6 +12,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
......@@ -21,6 +22,7 @@ import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
......@@ -81,6 +83,9 @@ public class KryoSerializerTests {
.register(DefaultDevice.class)
.register(URI.class, new URISerializer())
.register(MastershipRole.class, new MastershipRoleSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.build();
}
......
......@@ -9,7 +9,6 @@ import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.collect.Sets;
......@@ -24,8 +23,6 @@ import static org.onlab.onos.cluster.MastershipEvent.Type.*;
*/
public class SimpleMastershipStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
......