Madan Jampani

Added a messaging service implementation on top of IOLoop. Added ability to easi…

…ly switch between netty and io loop (default is netty)

Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
Showing 25 changed files with 483 additions and 403 deletions
......@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
......@@ -35,6 +34,7 @@ public class ClusterMessage {
private final NodeId sender;
private final MessageSubject subject;
private final byte[] payload;
private transient byte[] response;
/**
* Creates a cluster message.
......@@ -77,13 +77,21 @@ public class ClusterMessage {
}
/**
* Sends a response to the sender.
* Records the response to be sent to the sender.
*
* @param data payload response.
* @throws IOException when I/O exception of some sort has occurred
* @param data response payload
*/
public void respond(byte[] data) throws IOException {
throw new IllegalStateException("One can only respond to message received from others.");
public void respond(byte[] data) {
response = data;
}
/**
* Returns the response to be sent to the sender.
*
* @return response bytes
*/
public byte[] response() {
return response;
}
@Override
......
......@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging;
import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
......
......@@ -13,16 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Interface for low level messaging primitives.
*/
public interface MessagingService {
/**
* Sends a message asynchronously to the specified communication end point.
* The message is specified using the type and payload.
......@@ -31,7 +34,7 @@ public interface MessagingService {
* @param payload message payload bytes.
* @throws IOException when I/O exception of some sort has occurred
*/
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
......@@ -40,7 +43,7 @@ public interface MessagingService {
* @param payload message payload.
* @return a response future
*/
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
/**
* Registers a new message handler for message type.
......@@ -48,19 +51,19 @@ public interface MessagingService {
* @param handler message handler
* @param executor executor to use for running message handler logic.
*/
public void registerHandler(String type, MessageHandler handler, Executor executor);
void registerHandler(String type, Consumer<byte[]> handler, Executor executor);
/**
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
* @param executor executor to use for running message handler logic.
*/
@Deprecated
public void registerHandler(String type, MessageHandler handler);
void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
/**
* Unregister current handler, if one exists for message type.
* @param type message type
*/
public void unregisterHandler(String type);
void unregisterHandler(String type);
}
......
......@@ -53,6 +53,12 @@
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
<version>${project.version}</version>
</dependency>
......
......@@ -19,14 +19,12 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
......@@ -38,6 +36,7 @@ import org.onosproject.cluster.ControllerNode.State;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -56,6 +55,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -108,7 +108,7 @@ public class DistributedClusterStore
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private NettyMessagingService messagingService = new NettyMessagingService();
private NettyMessagingService messagingService;
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
......@@ -149,7 +149,6 @@ public class DistributedClusterStore
establishSelfIdentity();
messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
try {
messagingService.activate();
} catch (InterruptedException e) {
......@@ -376,10 +375,10 @@ public class DistributedClusterStore
throw new IllegalStateException("Unable to determine local ip");
}
private class HeartbeatMessageHandler implements MessageHandler {
private class HeartbeatMessageHandler implements Consumer<byte[]> {
@Override
public void handle(Message message) throws IOException {
HeartbeatMessage hb = SERIALIZER.decode(message.payload());
public void accept(byte[] message) {
HeartbeatMessage hb = SERIALIZER.decode(message);
failureDetector.report(hb.source().id());
hb.knownPeers().forEach(node -> {
allNodes.put(node.id(), node);
......
......@@ -21,18 +21,17 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.nio.service.IOLoopMessagingService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -64,17 +63,28 @@ public class ClusterCommunicationManager
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
private final boolean useNetty = true;
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
netty.activate();
} catch (Exception e) {
log.error("NettyMessagingService#activate", e);
if (useNetty) {
NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
try {
netty.activate();
messagingService = netty;
} catch (Exception e) {
log.error("NettyMessagingService#activate", e);
}
} else {
IOLoopMessagingService ioLoop = new IOLoopMessagingService(localNode.ip(), localNode.tcpPort());
try {
ioLoop.activate();
messagingService = ioLoop;
} catch (Exception e) {
log.error("IOLoopMessagingService#activate", e);
}
}
messagingService = netty;
log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
}
......@@ -83,9 +93,13 @@ public class ClusterCommunicationManager
// TODO: cleanup messageingService if needed.
// FIXME: workaround until it becomes a service.
try {
((NettyMessagingService) messagingService).deactivate();
if (useNetty) {
((NettyMessagingService) messagingService).deactivate();
} else {
((IOLoopMessagingService) messagingService).deactivate();
}
} catch (Exception e) {
log.error("NettyMessagingService#deactivate", e);
log.error("MessagingService#deactivate", e);
}
log.info("Stopped");
}
......@@ -232,7 +246,9 @@ public class ClusterCommunicationManager
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
messagingService.registerHandler(subject.value(),
new InternalClusterMessageHandler(subscriber),
executor);
}
@Override
......@@ -240,31 +256,6 @@ public class ClusterCommunicationManager
messagingService.unregisterHandler(subject.value());
}
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
this.handler = handler;
}
@Override
public void handle(Message message) {
final ClusterMessage clusterMessage;
try {
clusterMessage = ClusterMessage.fromBytes(message.payload());
} catch (Exception e) {
log.error("Failed decoding {}", message, e);
throw e;
}
try {
handler.handle(new InternalClusterMessage(clusterMessage, message));
} catch (Exception e) {
log.trace("Failed handling {}", clusterMessage, e);
throw e;
}
}
}
@Override
public <M, R> void addSubscriber(MessageSubject subject,
......@@ -287,7 +278,22 @@ public class ClusterCommunicationManager
executor);
}
private class InternalMessageResponder<M, R> implements MessageHandler {
private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
private ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
this.handler = handler;
}
@Override
public byte[] apply(byte[] bytes) {
ClusterMessage message = ClusterMessage.fromBytes(bytes);
handler.handle(message);
return message.response();
}
}
private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
private final Function<M, R> handler;
......@@ -299,14 +305,15 @@ public class ClusterCommunicationManager
this.encoder = encoder;
this.handler = handler;
}
@Override
public void handle(Message message) throws IOException {
R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
message.respond(encoder.apply(response));
public byte[] apply(byte[] bytes) {
R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
return encoder.apply(reply);
}
}
private class InternalMessageConsumer<M> implements MessageHandler {
private class InternalMessageConsumer<M> implements Consumer<byte[]> {
private final Function<byte[], M> decoder;
private final Consumer<M> consumer;
......@@ -314,24 +321,10 @@ public class ClusterCommunicationManager
this.decoder = decoder;
this.consumer = consumer;
}
@Override
public void handle(Message message) throws IOException {
consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
}
}
public static final class InternalClusterMessage extends ClusterMessage {
private final Message rawMessage;
public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
this.rawMessage = rawMessage;
}
@Override
public void respond(byte[] response) throws IOException {
rawMessage.respond(response);
public void accept(byte[] bytes) {
consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
}
}
}
......
......@@ -77,7 +77,6 @@ import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -278,11 +277,7 @@ public class DistributedFlowRuleStore
FlowRule rule = SERIALIZER.decode(message.payload());
log.trace("received get flow entry request for {}", rule);
FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(flowEntry));
}
}, executor);
......@@ -293,11 +288,7 @@ public class DistributedFlowRuleStore
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
log.error("Failed to respond to peer's getFlowEntries request", e);
}
message.respond(SERIALIZER.encode(flowEntries));
}
}, executor);
......@@ -308,11 +299,7 @@ public class DistributedFlowRuleStore
FlowEntry rule = SERIALIZER.decode(message.payload());
log.trace("received get flow entry request for {}", rule);
FlowRuleEvent event = removeFlowRuleInternal(rule);
try {
message.respond(SERIALIZER.encode(event));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(event));
}
}, executor);
}
......@@ -691,11 +678,7 @@ public class DistributedFlowRuleStore
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
// it make sense in the latter case to retry immediately.
try {
message.respond(SERIALIZER.encode(allFailed));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(allFailed));
return;
}
......
......@@ -51,7 +51,6 @@ import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
......@@ -143,11 +142,7 @@ public class DefaultFlowRuleExtRouter
@Override
public void run() {
FlowExtCompletedOperation result = Futures.getUnchecked(f);
try {
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(result));
}
}, futureListeners);
}
......
......@@ -22,7 +22,6 @@ import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -300,11 +299,7 @@ public class ConsistentDeviceMastershipStore
@Override
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
} catch (IOException e) {
log.error("Failed to responsd to role query", e);
}
message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
}
}
......@@ -318,11 +313,7 @@ public class ConsistentDeviceMastershipStore
@Override
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
} catch (IOException e) {
log.error("Failed to relinquish role.", e);
}
message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
}
}
......@@ -371,4 +362,4 @@ public class ConsistentDeviceMastershipStore
return m.matches();
}
}
\ No newline at end of file
}
......
......@@ -43,7 +43,6 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
......@@ -118,11 +117,7 @@ public class DistributedStatisticStore implements StatisticStore {
@Override
public void handle(ClusterMessage message) {
ConnectPoint cp = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
}
}, messageHandlingExecutor);
......@@ -131,11 +126,7 @@ public class DistributedStatisticStore implements StatisticStore {
@Override
public void handle(ClusterMessage message) {
ConnectPoint cp = SERIALIZER.decode(message.payload());
try {
message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
}
}, messageHandlingExecutor);
log.info("Started");
......
......@@ -39,6 +39,10 @@
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//FIXME: Should be move out to test or app
/**
* Message handler that echos the message back to the sender.
*/
public class EchoHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) throws IOException {
log.info("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
......@@ -15,9 +15,8 @@
*/
package org.onlab.netty;
import java.io.IOException;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.MoreObjects;
......@@ -25,20 +24,14 @@ import com.google.common.base.MoreObjects;
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
public final class InternalMessage implements Message {
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
public final class InternalMessage {
private long id;
private Endpoint sender;
private String type;
private byte[] payload;
private transient NettyMessagingService messagingService;
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
// Must be created using the Builder.
private InternalMessage() {}
InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
......@@ -57,26 +50,10 @@ public final class InternalMessage implements Message {
return sender;
}
@Override
public byte[] payload() {
return payload;
}
protected void setMessagingService(NettyMessagingService messagingService) {
this.messagingService = messagingService;
}
@Override
public void respond(byte[] data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
.withSender(messagingService.localEp())
.withPayload(data)
.withType(REPLY_MESSAGE_TYPE)
.build();
messagingService.sendAsync(sender, message);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
......@@ -86,39 +63,4 @@ public final class InternalMessage implements Message {
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
/**
* Builder for InternalMessages.
*/
public static final class Builder {
private InternalMessage message;
public Builder(NettyMessagingService messagingService) {
message = new InternalMessage();
message.messagingService = messagingService;
}
public Builder withId(long id) {
message.id = id;
return this;
}
public Builder withType(String type) {
message.type = type;
return this;
}
public Builder withSender(Endpoint sender) {
message.sender = sender;
return this;
}
public Builder withPayload(byte[] payload) {
message.payload = payload;
return this;
}
public InternalMessage build() {
return message;
}
}
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageHandler that simply logs the information.
*/
public class LoggingHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) {
log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import java.io.IOException;
/**
* A unit of communication.
* Has a payload. Also supports a feature to respond back to the sender.
*/
public interface Message {
/**
* Returns the payload of this message.
* @return message payload.
*/
public byte[] payload();
/**
* Sends a reply back to the sender of this message.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
public void respond(byte[] data) throws IOException;
}
......@@ -24,6 +24,7 @@ import java.util.List;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,8 +37,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final NettyMessagingService messagingService;
private long messageId;
private Version ipVersion;
private IpAddress senderIp;
......@@ -46,9 +45,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private String messageType;
private int contentLength;
public MessageDecoder(NettyMessagingService messagingService) {
public MessageDecoder() {
super(DecoderState.READ_MESSAGE_ID);
this.messagingService = messagingService;
}
@Override
......@@ -91,7 +89,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
new Endpoint(senderIp, senderPort),
messageType,
payload);
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_ID);
break;
......
......@@ -24,6 +24,7 @@ import java.io.IOException;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import java.io.IOException;
/**
* Handler for a message.
*/
public interface MessageHandler {
/**
* Handles the message.
*
* @param message message.
* @throws IOException if an error is encountered handling the message
*/
public void handle(Message message) throws IOException;
}
......@@ -46,10 +46,14 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -59,17 +63,18 @@ import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
/**
* A Netty based implementation of MessagingService.
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
......@@ -124,6 +129,7 @@ public class NettyMessagingService implements MessagingService {
}
public void activate() throws InterruptedException {
channels.setLifo(false);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
......@@ -146,12 +152,10 @@ public class NettyMessagingService implements MessagingService {
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
sendAsync(ep, message);
}
......@@ -164,7 +168,7 @@ public class NettyMessagingService implements MessagingService {
try {
try {
channel = channels.borrowObject(ep);
channel.eventLoop().execute(new WriteTask(channel, message));
channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} finally {
channels.returnObject(ep, channel);
}
......@@ -173,7 +177,6 @@ public class NettyMessagingService implements MessagingService {
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
......@@ -181,12 +184,7 @@ public class NettyMessagingService implements MessagingService {
CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, response);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
try {
sendAsync(ep, message);
} catch (Exception e) {
......@@ -197,24 +195,26 @@ public class NettyMessagingService implements MessagingService {
}
@Override
public void registerHandler(String type, MessageHandler handler) {
handlers.put(type, handler);
public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
}
@Override
public void registerHandler(String type, MessageHandler handler, Executor executor) {
handlers.put(type, new MessageHandler() {
@Override
public void handle(Message message) throws IOException {
executor.execute(() -> {
try {
handler.handle(message);
} catch (Exception e) {
log.debug("Failed to process message of type {}", type, e);
}
});
public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.payload());
if (responsePayload != null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
log.debug("Failed to respond", e);
}
}
});
}));
}
@Override
......@@ -222,14 +222,12 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type);
}
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
b.option(ChannelOption.SO_RCVBUF, 1048576);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup)
.channel(serverChannelClass)
......@@ -258,8 +256,9 @@ public class NettyMessagingService implements MessagingService {
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
......@@ -268,6 +267,7 @@ public class NettyMessagingService implements MessagingService {
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
log.info("Established a new connection to {}", ep);
return f.channel();
}
......@@ -291,27 +291,11 @@ public class NettyMessagingService implements MessagingService {
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
private static class WriteTask implements Runnable {
private final InternalMessage message;
private final Channel channel;
public WriteTask(Channel channel, InternalMessage message) {
this.channel = channel;
this.message = message;
}
@Override
public void run() {
channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
......@@ -329,10 +313,10 @@ public class NettyMessagingService implements MessagingService {
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
CompletableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.complete(message.payload());
} else {
......@@ -341,13 +325,13 @@ public class NettyMessagingService implements MessagingService {
+ " request handle", message.id(), message.sender());
}
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
responseFutures.invalidate(message.id());
}
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
Consumer<InternalMessage> handler = handlers.get(type);
if (handler != null) {
handler.handle(message);
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
......
......@@ -18,13 +18,17 @@ package org.onlab.netty;
import static org.junit.Assert.assertArrayEquals;
import java.net.InetAddress;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.util.concurrent.MoreExecutors;
/**
* Simple ping-pong test that exercises NettyMessagingService.
......@@ -39,9 +43,9 @@ public class PingPongTest {
try {
pinger.activate();
ponger.activate();
ponger.registerHandler("echo", new EchoHandler());
ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
byte[] payload = RandomUtils.nextBytes(100);
Future<byte[]> responseFuture =
CompletableFuture<byte[]> responseFuture =
pinger.sendAndReceive(
new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
......
......@@ -37,6 +37,14 @@
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.function.Consumer;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
/**
* IOLoop for transporting DefaultMessages.
*/
public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> {
public static final int SELECT_TIMEOUT_MILLIS = 500;
private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000;
private static final int BUFFER_SIZE = 1024 * 1024;
private final Consumer<DefaultMessage> consumer;
public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException {
this(SELECT_TIMEOUT_MILLIS, consumer);
}
public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException {
super(timeout);
this.consumer = consumer;
}
@Override
protected DefaultMessageStream createStream(ByteChannel byteChannel) {
return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS);
}
@Override
protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) {
messages.forEach(consumer);
}
@Override
protected void connect(SelectionKey key) throws IOException {
DefaultMessageStream stream = (DefaultMessageStream) key.attachment();
try {
super.connect(key);
stream.connected();
} catch (Exception e) {
stream.connectFailed(e);
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.nio.AbstractMessage;
import org.onlab.packet.IpAddress;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.Charsets;
import com.google.common.base.MoreObjects;
/**
* Default message.
*/
public class DefaultMessage extends AbstractMessage {
private long id;
private Endpoint sender;
private String type;
private byte[] payload;
/**
* Creates a new message with the specified data.
*
* @param id message id
* @param type message type
* @param sender sender endpoint
* @param payload message payload
*/
DefaultMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.type = checkNotNull(type, "Type cannot be null");
this.sender = checkNotNull(sender, "Sender cannot be null");
this.payload = checkNotNull(payload, "Payload cannot be null");
byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8);
IpAddress senderIp = sender.host();
byte[] ipOctets = senderIp.toOctets();
length = 25 + ipOctets.length + messageTypeBytes.length + payload.length;
}
/**
* Returns message id.
*
* @return message id
*/
public long id() {
return id;
}
/**
* Returns message sender.
*
* @return message sender
*/
public Endpoint sender() {
return sender;
}
/**
* Returns message type.
*
* @return message type
*/
public String type() {
return type;
}
/**
* Returns message payload.
*
* @return payload
*/
public byte[] payload() {
return payload;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("type", type)
.add("sender", sender)
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.Charsets;
/**
* Default bi-directional message stream for transferring messages to &amp; from the
* network via two byte buffers.
*/
public class DefaultMessageStream extends MessageStream<DefaultMessage> {
private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
public DefaultMessageStream(
IOLoop<DefaultMessage, ?> loop,
ByteChannel byteChannel,
int bufferSize,
int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
}
public CompletableFuture<DefaultMessageStream> connectedFuture() {
return connectFuture.thenApply(v -> this);
}
private final AtomicInteger messageLength = new AtomicInteger(-1);
@Override
protected DefaultMessage read(ByteBuffer buffer) {
if (messageLength.get() == -1) {
// check if we can read the message length.
if (buffer.remaining() < Integer.BYTES) {
return null;
} else {
messageLength.set(buffer.getInt());
}
}
if (buffer.remaining() < messageLength.get()) {
return null;
}
long id = buffer.getLong();
Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
buffer.get(octects);
IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
int senderPort = buffer.getInt();
int messageTypeByteLength = buffer.getInt();
byte[] messageTypeBytes = new byte[messageTypeByteLength];
buffer.get(messageTypeBytes);
String messageType = new String(messageTypeBytes, Charsets.UTF_8);
int payloadLength = buffer.getInt();
byte[] payloadBytes = new byte[payloadLength];
buffer.get(payloadBytes);
// reset for next message
messageLength.set(-1);
return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
}
@Override
protected void write(DefaultMessage message, ByteBuffer buffer) {
Endpoint sender = message.sender();
byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
IpAddress senderIp = sender.host();
byte[] ipOctets = senderIp.toOctets();
byte[] payload = message.payload();
int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
buffer.putInt(messageLength);
buffer.putLong(message.id());
if (senderIp.version() == Version.INET) {
buffer.put((byte) 0x0);
} else {
buffer.put((byte) 0x1);
}
buffer.put(ipOctets);
// write sender port
buffer.putInt(sender.port());
// write length of message type
buffer.putInt(messageTypeBytes.length);
// write message type bytes
buffer.put(messageTypeBytes);
// write payload length
buffer.putInt(payload.length);
// write payload.
buffer.put(payload);
}
/**
* Callback invoked when the stream is successfully connected.
*/
public void connected() {
connectFuture.complete(null);
}
/**
* Callback invoked when the stream fails to connect.
* @param cause failure cause
*/
public void connectFailed(Throwable cause) {
connectFuture.completeExceptionally(cause);
}
}
\ No newline at end of file