tom

Merge remote-tracking branch 'origin/master'

Showing 19 changed files with 243 additions and 37 deletions
......@@ -13,6 +13,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Device;
......@@ -81,6 +82,7 @@ public class HostMonitorTest {
}
@Test
@Ignore
public void testMonitorHostDoesNotExist() throws Exception {
HostManager hostManager = createMock(HostManager.class);
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
public final class ClusterMessageSubjects {
public final class ClusterManagementMessageSubjects {
// avoid instantiation
private ClusterMessageSubjects() {}
private ClusterManagementMessageSubjects() {}
public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
}
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.ControllerNode;
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
public enum ClusterMembershipEventType {
NEW_MEMBER,
......
......@@ -17,6 +17,8 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......
package org.onlab.onos.store.cluster.messaging;
package org.onlab.onos.store.common.impl;
import java.util.Map;
import java.util.Set;
......
......@@ -8,7 +8,7 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
......
......@@ -10,7 +10,7 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
import org.onlab.onos.store.common.impl.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
......
......@@ -4,6 +4,7 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
......@@ -33,10 +34,15 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
......@@ -96,6 +102,9 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Activate
public void activate() {
log.info("Started");
......@@ -133,8 +142,14 @@ public class GossipDeviceStore
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
if (event != null) {
// FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
......@@ -298,19 +313,21 @@ public class GossipDeviceStore
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
for (PortDescription e : portDescriptions) {
deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
}
Timestamped<List<PortDescription>> timestampedPortDescriptions =
new Timestamped<>(portDescriptions, newTimestamp);
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
new Timestamped<>(portDescriptions, newTimestamp));
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
if (!events.isEmpty()) {
// FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return events;
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
......@@ -437,8 +454,14 @@ public class GossipDeviceStore
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
if (event != null) {
// FIXME: broadcast deltaDesc
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
......@@ -749,4 +772,61 @@ public class GossipDeviceStore
return portDescs.put(newOne.value().portNumber(), newOne);
}
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
clusterCommunicator.broadcast(message);
}
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
}
}
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender());
InternalPortEvent event = (InternalPortEvent) message.payload();
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
updatePortsInternal(providerId, deviceId, portDescriptions);
}
}
private class InternalPortStatusEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<PortDescription> portDescription = event.portDescription();
updatePortStatusInternal(providerId, deviceId, portDescription);
}
}
}
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
public class InternalDeviceEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<DeviceDescription> deviceDescription;
protected InternalDeviceEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<DeviceDescription> deviceDescription) {
this.providerId = providerId;
this.deviceId = deviceId;
this.deviceDescription = deviceDescription;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
}
package org.onlab.onos.store.device.impl;
import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
public class InternalPortEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<List<PortDescription>> portDescriptions;
protected InternalPortEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<List<PortDescription>> portDescriptions) {
this.providerId = providerId;
this.deviceId = deviceId;
this.portDescriptions = portDescriptions;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
public class InternalPortStatusEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<PortDescription> portDescription;
protected InternalPortStatusEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<PortDescription> portDescription) {
this.providerId = providerId;
this.deviceId = deviceId;
this.portDescription = portDescription;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
}
......@@ -7,6 +7,7 @@ import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
......
......@@ -5,6 +5,7 @@ import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
......@@ -37,6 +38,10 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
......@@ -105,7 +110,9 @@ public class GossipDeviceStoreTest {
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
gossipDeviceStore = new TestGossipDeviceStore(clockService);
ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
}
......@@ -541,8 +548,20 @@ public class GossipDeviceStoreTest {
private static final class TestGossipDeviceStore extends GossipDeviceStore {
public TestGossipDeviceStore(ClockService clockService) {
public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService;
this.clusterCommunicator = clusterCommunicator;
}
}
private static final class TestClusterCommunicationService implements ClusterCommunicationService {
@Override
public boolean broadcast(ClusterMessage message) throws IOException { return true; }
@Override
public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
@Override
public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
}
}
......
......@@ -45,12 +45,12 @@ public class KryoSerializer implements Serializer {
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
}
......
......@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
......
......@@ -24,20 +24,18 @@ public interface Serializer {
public byte[] encode(Object data);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
* Encodes the specified POJO into a byte buffer.
*
* @param obj object to be serialized
* @param data POJO to be encoded
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
public void encode(final Object data, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
* Decodes the specified byte buffer to a POJO.
*
* @param buffer bytes to be deserialized
* @return deserialized object
* @param buffer bytes to be decoded
* @return POJO
*/
public <T> T deserialize(final ByteBuffer buffer);
public <T> T decode(final ByteBuffer buffer);
}
......