Madan Jampani

Removed usage of deprecated ClusterCommunicationService APIs

Change-Id: Id306dadad48d1bad7b3fbde3a40ba3e0fdac4cbc
......@@ -17,6 +17,7 @@ package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -39,8 +40,6 @@ import org.onosproject.core.ApplicationIdStore;
import org.onosproject.core.DefaultApplication;
import org.onosproject.core.Permission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -48,10 +47,12 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
......@@ -126,7 +127,17 @@ public class GossipApplicationStore extends ApplicationArchive
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler"));
clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
try {
return toByteArray(getApplicationInputStream(name));
} catch (IOException e) {
throw new StorageException(e);
}
},
Function.identity(),
messageHandlingExecutor);
// FIXME: Consider consolidating into a single map.
......@@ -394,21 +405,6 @@ public class GossipApplicationStore extends ApplicationArchive
}
/**
* Responder to requests for application bits.
*/
private class InternalBitServer implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
String name = new String(message.payload(), Charsets.UTF_8);
try {
message.respond(toByteArray(getApplicationInputStream(name)));
} catch (Exception e) {
log.debug("Unable to read bits for application {}", name);
}
}
}
/**
* Prunes applications which are not in the map, but are on disk.
*/
private void pruneUninstalledApps() {
......
......@@ -60,8 +60,6 @@ import org.onosproject.net.group.StoredGroupBucketEntry;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -198,10 +196,11 @@ public class DistributedGroupStore
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
"message-handlers"));
clusterCommunicator.
addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
new ClusterGroupMsgHandler(),
messageHandlingExecutor);
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build()::deserialize,
this::process,
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
......@@ -970,45 +969,27 @@ public class DistributedGroupStore
}
}
}
/**
* Message handler to receive messages from group subsystems of
* other cluster members.
*/
private final class ClusterGroupMsgHandler
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
if (message.subject().equals(
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
GroupStoreMessage groupOp = kryoBuilder.
build().deserialize(message.payload());
log.debug("received remote group operation {} request for device {}",
groupOp.type(),
groupOp.deviceId());
if (mastershipService.
getLocalRole(groupOp.deviceId()) !=
MastershipRole.MASTER) {
log.warn("ClusterGroupMsgHandler: This node is not "
+ "MASTER for device {}", groupOp.deviceId());
return;
}
if (groupOp.type() == GroupStoreMessage.Type.ADD) {
storeGroupDescriptionInternal(groupOp.groupDesc());
} else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
updateGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie(),
groupOp.updateType(),
groupOp.updateBuckets(),
groupOp.newAppCookie());
} else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
deleteGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie());
}
} else {
log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
message.subject());
}
}
private void process(GroupStoreMessage groupOp) {
log.debug("Received remote group operation {} request for device {}",
groupOp.type(),
groupOp.deviceId());
if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
log.warn("This node is not MASTER for device {}", groupOp.deviceId());
return;
}
if (groupOp.type() == GroupStoreMessage.Type.ADD) {
storeGroupDescriptionInternal(groupOp.groupDesc());
} else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
updateGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie(),
groupOp.updateType(),
groupOp.updateBuckets(),
groupOp.newAppCookie());
} else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
deleteGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie());
}
}
/**
......
......@@ -35,8 +35,6 @@ import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
......@@ -104,9 +102,10 @@ public class DistributedPacketStore
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/packet", "message-handlers"));
communicationService.addSubscriber(PACKET_OUT_SUBJECT,
new InternalClusterMessageHandler(),
messageHandlingExecutor);
communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
SERIALIZER::decode,
packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
messageHandlingExecutor);
tracker = new PacketRequestTracker();
......@@ -134,9 +133,12 @@ public class DistributedPacketStore
return;
}
// TODO check unicast return value
communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
// error log: log.warn("Failed to send packet-out to {}", master);
communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
.whenComplete((r, error) -> {
if (error != null) {
log.warn("Failed to send packet-out to {}", master, error);
}
});
}
@Override
......@@ -154,21 +156,6 @@ public class DistributedPacketStore
return tracker.requests();
}
/**
* Handles incoming cluster messages.
*/
private class InternalClusterMessageHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
log.warn("Received message with wrong subject: {}", message);
}
OutboundPacket packet = SERIALIZER.decode(message.payload());
notifyDelegate(new PacketEvent(Type.EMIT, packet));
}
}
private class PacketRequestTracker {
private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
......