Toggle navigation
Toggle navigation
This project
Loading...
Sign in
홍길동
/
onos
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Snippets
Network
Create a new issue
Builds
Commits
Issue Boards
Authored by
pankaj
2014-10-02 16:30:48 -0700
Browse Files
Options
Browse Files
Download
Plain Diff
Commit
f1eab297967de8e610d1f3472c03d5c39091a9fc
f1eab297
2 parents
93838d57
ab6d311b
Merge branch 'master' of
ssh://gerrit.onlab.us:29418/onos-next
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
834 additions
and
0 deletions
utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
utils/netty/src/main/java/org/onlab/netty/Endpoint.java
utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
utils/netty/src/main/java/org/onlab/netty/Message.java
utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
utils/netty/src/main/java/org/onlab/netty/MessagingService.java
utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
utils/netty/src/main/java/org/onlab/netty/Response.java
utils/netty/src/main/java/org/onlab/netty/Serializer.java
utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/**
* An asynchronous response.
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
* @param <T> type of response.
*/
public
class
AsyncResponse
<
T
>
implements
Response
<
T
>
{
private
T
value
;
private
boolean
done
=
false
;
private
final
long
start
=
System
.
nanoTime
();
@Override
public
T
get
(
long
timeout
,
TimeUnit
tu
)
throws
TimeoutException
{
timeout
=
tu
.
toNanos
(
timeout
);
boolean
interrupted
=
false
;
try
{
synchronized
(
this
)
{
while
(!
done
)
{
try
{
long
timeRemaining
=
timeout
-
(
System
.
nanoTime
()
-
start
);
if
(
timeRemaining
<=
0
)
{
throw
new
TimeoutException
(
"Operation timed out."
);
}
TimeUnit
.
NANOSECONDS
.
timedWait
(
this
,
timeRemaining
);
}
catch
(
InterruptedException
e
)
{
interrupted
=
true
;
}
}
}
}
finally
{
if
(
interrupted
)
{
Thread
.
currentThread
().
interrupt
();
}
}
return
value
;
}
@Override
public
T
get
()
throws
InterruptedException
{
throw
new
UnsupportedOperationException
();
}
@Override
public
boolean
isReady
()
{
return
done
;
}
/**
* Sets response value and unblocks any thread blocking on the response to become
* available.
* @param data response data.
*/
@SuppressWarnings
(
"unchecked"
)
public
synchronized
void
setResponse
(
Object
data
)
{
if
(!
done
)
{
done
=
true
;
value
=
(
T
)
data
;
this
.
notifyAll
();
}
}
}
utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
/**
* Message handler that echos the message back to the sender.
*/
public
class
EchoHandler
implements
MessageHandler
{
@Override
public
void
handle
(
Message
message
)
throws
IOException
{
System
.
out
.
println
(
"Received: "
+
message
.
payload
()
+
". Echoing it back to the sender."
);
message
.
respond
(
message
.
payload
());
}
}
utils/netty/src/main/java/org/onlab/netty/Endpoint.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
/**
* Representation of a TCP/UDP communication end point.
*/
public
class
Endpoint
{
private
final
int
port
;
private
final
String
host
;
public
Endpoint
(
String
host
,
int
port
)
{
this
.
host
=
host
;
this
.
port
=
port
;
}
public
String
host
()
{
return
host
;
}
public
int
port
()
{
return
port
;
}
@Override
public
String
toString
()
{
return
"Endpoint [port="
+
port
+
", host="
+
host
+
"]"
;
}
@Override
public
int
hashCode
()
{
final
int
prime
=
31
;
int
result
=
1
;
result
=
prime
*
result
+
((
host
==
null
)
?
0
:
host
.
hashCode
());
result
=
prime
*
result
+
port
;
return
result
;
}
@Override
public
boolean
equals
(
Object
obj
)
{
if
(
this
==
obj
)
{
return
true
;
}
if
(
obj
==
null
)
{
return
false
;
}
if
(
getClass
()
!=
obj
.
getClass
())
{
return
false
;
}
Endpoint
other
=
(
Endpoint
)
obj
;
if
(
host
==
null
)
{
if
(
other
.
host
!=
null
)
{
return
false
;
}
}
else
if
(!
host
.
equals
(
other
.
host
))
{
return
false
;
}
if
(
port
!=
other
.
port
)
{
return
false
;
}
return
true
;
}
}
utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
/**
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
public
final
class
InternalMessage
implements
Message
{
private
long
id
;
private
Endpoint
sender
;
private
String
type
;
private
Object
payload
;
private
transient
NettyMessagingService
messagingService
;
public
static
final
String
REPLY_MESSAGE_TYPE
=
"NETTY_MESSAGIG_REQUEST_REPLY"
;
// Must be created using the Builder.
private
InternalMessage
()
{}
public
long
id
()
{
return
id
;
}
public
String
type
()
{
return
type
;
}
public
Endpoint
sender
()
{
return
sender
;
}
@Override
public
Object
payload
()
{
return
payload
;
}
@Override
public
void
respond
(
Object
data
)
throws
IOException
{
Builder
builder
=
new
Builder
(
messagingService
);
InternalMessage
message
=
builder
.
withId
(
this
.
id
)
// FIXME: Sender should be messagingService.localEp.
.
withSender
(
this
.
sender
)
.
withPayload
(
data
)
.
withType
(
REPLY_MESSAGE_TYPE
)
.
build
();
messagingService
.
sendAsync
(
sender
,
message
);
}
/**
* Builder for InternalMessages.
*/
public
static
class
Builder
{
private
InternalMessage
message
;
public
Builder
(
NettyMessagingService
messagingService
)
{
message
=
new
InternalMessage
();
message
.
messagingService
=
messagingService
;
}
public
Builder
withId
(
long
id
)
{
message
.
id
=
id
;
return
this
;
}
public
Builder
withType
(
String
type
)
{
message
.
type
=
type
;
return
this
;
}
public
Builder
withSender
(
Endpoint
sender
)
{
message
.
sender
=
sender
;
return
this
;
}
public
Builder
withPayload
(
Object
payload
)
{
message
.
payload
=
payload
;
return
this
;
}
public
InternalMessage
build
()
{
return
message
;
}
}
}
utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
org.onlab.util.KryoPool
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
/**
* Kryo Serializer.
*/
public
class
KryoSerializer
implements
Serializer
{
private
final
Logger
log
=
LoggerFactory
.
getLogger
(
getClass
());
private
KryoPool
serializerPool
;
public
KryoSerializer
()
{
setupKryoPool
();
}
/**
* Sets up the common serialzers pool.
*/
protected
void
setupKryoPool
()
{
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool
=
KryoPool
.
newBuilder
()
.
register
(
ArrayList
.
class
,
HashMap
.
class
,
ArrayList
.
class
)
.
build
()
.
populate
(
1
);
}
@Override
public
Object
decode
(
byte
[]
data
)
{
return
serializerPool
.
deserialize
(
data
);
}
@Override
public
byte
[]
encode
(
Object
payload
)
{
return
serializerPool
.
serialize
(
payload
);
}
}
utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
/**
* A MessageHandler that simply logs the information.
*/
public
class
LoggingHandler
implements
MessageHandler
{
@Override
public
void
handle
(
Message
message
)
{
System
.
out
.
println
(
"Received: "
+
message
.
payload
());
}
}
utils/netty/src/main/java/org/onlab/netty/Message.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
/**
* A unit of communication.
* Has a payload. Also supports a feature to respond back to the sender.
*/
public
interface
Message
{
/**
* Returns the payload of this message.
* @return message payload.
*/
public
Object
payload
();
/**
* Sends a reply back to the sender of this messge.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
public
void
respond
(
Object
data
)
throws
IOException
;
}
utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.util.Arrays
;
import
java.util.List
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkState
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.handler.codec.ByteToMessageDecoder
;
/**
* Decode bytes into a InternalMessage.
*/
public
class
MessageDecoder
extends
ByteToMessageDecoder
{
private
final
NettyMessagingService
messagingService
;
private
final
Serializer
serializer
;
public
MessageDecoder
(
NettyMessagingService
messagingService
,
Serializer
serializer
)
{
this
.
messagingService
=
messagingService
;
this
.
serializer
=
serializer
;
}
@Override
protected
void
decode
(
ChannelHandlerContext
context
,
ByteBuf
in
,
List
<
Object
>
messages
)
throws
Exception
{
byte
[]
preamble
=
in
.
readBytes
(
MessageEncoder
.
PREAMBLE
.
length
).
array
();
checkState
(
Arrays
.
equals
(
MessageEncoder
.
PREAMBLE
,
preamble
),
"Message has wrong preamble"
);
// read message Id.
long
id
=
in
.
readLong
();
// read message type; first read size and then bytes.
String
type
=
new
String
(
in
.
readBytes
(
in
.
readInt
()).
array
());
// read sender host name; first read size and then bytes.
String
host
=
new
String
(
in
.
readBytes
(
in
.
readInt
()).
array
());
// read sender port.
int
port
=
in
.
readInt
();
Endpoint
sender
=
new
Endpoint
(
host
,
port
);
// read message payload; first read size and then bytes.
Object
payload
=
serializer
.
decode
(
in
.
readBytes
(
in
.
readInt
()).
array
());
InternalMessage
message
=
new
InternalMessage
.
Builder
(
messagingService
)
.
withId
(
id
)
.
withSender
(
sender
)
.
withType
(
type
)
.
withPayload
(
payload
)
.
build
();
messages
.
add
(
message
);
}
}
utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.handler.codec.MessageToByteEncoder
;
/**
* Encode InternalMessage out into a byte buffer.
*/
public
class
MessageEncoder
extends
MessageToByteEncoder
<
InternalMessage
>
{
// onosiscool in ascii
public
static
final
byte
[]
PREAMBLE
=
"onosiscool"
.
getBytes
();
private
final
Serializer
serializer
;
public
MessageEncoder
(
Serializer
serializer
)
{
this
.
serializer
=
serializer
;
}
@Override
protected
void
encode
(
ChannelHandlerContext
context
,
InternalMessage
message
,
ByteBuf
out
)
throws
Exception
{
// write preamble
out
.
writeBytes
(
PREAMBLE
);
// write id
out
.
writeLong
(
message
.
id
());
// write type length
out
.
writeInt
(
message
.
type
().
length
());
// write type
out
.
writeBytes
(
message
.
type
().
getBytes
());
// write sender host name size
out
.
writeInt
(
message
.
sender
().
host
().
length
());
// write sender host name.
out
.
writeBytes
(
message
.
sender
().
host
().
getBytes
());
// write port
out
.
writeInt
(
message
.
sender
().
port
());
try
{
serializer
.
encode
(
message
.
payload
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
byte
[]
payload
=
serializer
.
encode
(
message
.
payload
());
// write payload length.
out
.
writeInt
(
payload
.
length
);
// write payload bytes
out
.
writeBytes
(
payload
);
}
}
utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
/**
* Handler for a message.
*/
public
interface
MessageHandler
{
/**
* Handles the message.
* @param message message.
* @throws IOException.
*/
public
void
handle
(
Message
message
)
throws
IOException
;
}
utils/netty/src/main/java/org/onlab/netty/MessagingService.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
/**
* Interface for low level messaging primitives.
*/
public
interface
MessagingService
{
/**
* Sends a message asynchronously to the specified communication end point.
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @throws IOException
*/
public
void
sendAsync
(
Endpoint
ep
,
String
type
,
Object
payload
)
throws
IOException
;
/**
* Sends a message synchronously and waits for a response.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @return a response future
* @throws IOException
*/
public
<
T
>
Response
<
T
>
sendAndReceive
(
Endpoint
ep
,
String
type
,
Object
payload
)
throws
IOException
;
/**
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
*/
public
void
registerHandler
(
String
type
,
MessageHandler
handler
);
/**
* Unregister current handler, if one exists for message type.
* @param type message type
*/
public
void
unregisterHandler
(
String
type
);
}
utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.io.IOException
;
import
java.net.UnknownHostException
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.TimeUnit
;
import
io.netty.bootstrap.Bootstrap
;
import
io.netty.bootstrap.ServerBootstrap
;
import
io.netty.buffer.PooledByteBufAllocator
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelFuture
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.ChannelInitializer
;
import
io.netty.channel.ChannelOption
;
import
io.netty.channel.EventLoopGroup
;
import
io.netty.channel.SimpleChannelInboundHandler
;
import
io.netty.channel.nio.NioEventLoopGroup
;
import
io.netty.channel.socket.SocketChannel
;
import
io.netty.channel.socket.nio.NioServerSocketChannel
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
org.apache.commons.lang.math.RandomUtils
;
import
org.apache.commons.pool.KeyedObjectPool
;
import
org.apache.commons.pool.KeyedPoolableObjectFactory
;
import
org.apache.commons.pool.impl.GenericKeyedObjectPool
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.google.common.cache.Cache
;
import
com.google.common.cache.CacheBuilder
;
/**
* A Netty based implementation of MessagingService.
*/
public
class
NettyMessagingService
implements
MessagingService
{
private
final
Logger
log
=
LoggerFactory
.
getLogger
(
getClass
());
private
KeyedObjectPool
<
Endpoint
,
Channel
>
channels
=
new
GenericKeyedObjectPool
<
Endpoint
,
Channel
>(
new
OnosCommunicationChannelFactory
());
private
final
int
port
;
private
final
EventLoopGroup
bossGroup
=
new
NioEventLoopGroup
();
private
final
EventLoopGroup
workerGroup
=
new
NioEventLoopGroup
();
private
final
ConcurrentMap
<
String
,
MessageHandler
>
handlers
=
new
ConcurrentHashMap
<>();
private
Cache
<
Long
,
AsyncResponse
<?>>
responseFutures
;
private
final
Endpoint
localEp
;
protected
Serializer
serializer
;
public
NettyMessagingService
()
{
// TODO: Default port should be configurable.
this
(
8080
);
}
// FIXME: Constructor should not throw exceptions.
public
NettyMessagingService
(
int
port
)
{
this
.
port
=
port
;
try
{
localEp
=
new
Endpoint
(
java
.
net
.
InetAddress
.
getLocalHost
().
getHostName
(),
port
);
}
catch
(
UnknownHostException
e
)
{
// bailing out.
throw
new
RuntimeException
(
e
);
}
}
public
void
activate
()
throws
Exception
{
responseFutures
=
CacheBuilder
.
newBuilder
()
.
maximumSize
(
100000
)
.
weakValues
()
// TODO: Once the entry expires, notify blocking threads (if any).
.
expireAfterWrite
(
10
,
TimeUnit
.
MINUTES
)
.
build
();
startAcceptingConnections
();
}
public
void
deactivate
()
throws
Exception
{
channels
.
close
();
bossGroup
.
shutdownGracefully
();
workerGroup
.
shutdownGracefully
();
}
@Override
public
void
sendAsync
(
Endpoint
ep
,
String
type
,
Object
payload
)
throws
IOException
{
InternalMessage
message
=
new
InternalMessage
.
Builder
(
this
)
.
withId
(
RandomUtils
.
nextLong
())
.
withSender
(
localEp
)
.
withType
(
type
)
.
withPayload
(
payload
)
.
build
();
sendAsync
(
ep
,
message
);
}
protected
void
sendAsync
(
Endpoint
ep
,
InternalMessage
message
)
throws
IOException
{
Channel
channel
=
null
;
try
{
channel
=
channels
.
borrowObject
(
ep
);
channel
.
eventLoop
().
execute
(
new
WriteTask
(
channel
,
message
));
}
catch
(
Exception
e
)
{
throw
new
IOException
(
e
);
}
finally
{
try
{
channels
.
returnObject
(
ep
,
channel
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"Error returning object back to the pool"
,
e
);
// ignored.
}
}
}
@Override
public
<
T
>
Response
<
T
>
sendAndReceive
(
Endpoint
ep
,
String
type
,
Object
payload
)
throws
IOException
{
AsyncResponse
<
T
>
futureResponse
=
new
AsyncResponse
<
T
>();
Long
messageId
=
RandomUtils
.
nextLong
();
responseFutures
.
put
(
messageId
,
futureResponse
);
InternalMessage
message
=
new
InternalMessage
.
Builder
(
this
)
.
withId
(
messageId
)
.
withSender
(
localEp
)
.
withType
(
type
)
.
withPayload
(
payload
)
.
build
();
sendAsync
(
ep
,
message
);
return
futureResponse
;
}
@Override
public
void
registerHandler
(
String
type
,
MessageHandler
handler
)
{
// TODO: Is this the right semantics for handler registration?
handlers
.
putIfAbsent
(
type
,
handler
);
}
public
void
unregisterHandler
(
String
type
)
{
handlers
.
remove
(
type
);
}
private
MessageHandler
getMessageHandler
(
String
type
)
{
return
handlers
.
get
(
type
);
}
private
void
startAcceptingConnections
()
throws
InterruptedException
{
ServerBootstrap
b
=
new
ServerBootstrap
();
b
.
option
(
ChannelOption
.
ALLOCATOR
,
PooledByteBufAllocator
.
DEFAULT
);
b
.
group
(
bossGroup
,
workerGroup
)
.
channel
(
NioServerSocketChannel
.
class
)
.
childHandler
(
new
OnosCommunicationChannelInitializer
())
.
option
(
ChannelOption
.
SO_BACKLOG
,
128
)
.
childOption
(
ChannelOption
.
SO_KEEPALIVE
,
true
);
// Bind and start to accept incoming connections.
b
.
bind
(
port
).
sync
();
}
private
class
OnosCommunicationChannelFactory
implements
KeyedPoolableObjectFactory
<
Endpoint
,
Channel
>
{
@Override
public
void
activateObject
(
Endpoint
endpoint
,
Channel
channel
)
throws
Exception
{
}
@Override
public
void
destroyObject
(
Endpoint
ep
,
Channel
channel
)
throws
Exception
{
channel
.
close
();
}
@Override
public
Channel
makeObject
(
Endpoint
ep
)
throws
Exception
{
Bootstrap
b
=
new
Bootstrap
();
b
.
option
(
ChannelOption
.
ALLOCATOR
,
PooledByteBufAllocator
.
DEFAULT
);
b
.
group
(
workerGroup
);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
b
.
channel
(
NioSocketChannel
.
class
);
b
.
option
(
ChannelOption
.
SO_KEEPALIVE
,
true
);
b
.
handler
(
new
OnosCommunicationChannelInitializer
());
// Start the client.
ChannelFuture
f
=
b
.
connect
(
ep
.
host
(),
ep
.
port
()).
sync
();
return
f
.
channel
();
}
@Override
public
void
passivateObject
(
Endpoint
ep
,
Channel
channel
)
throws
Exception
{
}
@Override
public
boolean
validateObject
(
Endpoint
ep
,
Channel
channel
)
{
return
channel
.
isOpen
();
}
}
private
class
OnosCommunicationChannelInitializer
extends
ChannelInitializer
<
SocketChannel
>
{
@Override
protected
void
initChannel
(
SocketChannel
channel
)
throws
Exception
{
channel
.
pipeline
()
.
addLast
(
new
MessageEncoder
(
serializer
))
.
addLast
(
new
MessageDecoder
(
NettyMessagingService
.
this
,
serializer
))
.
addLast
(
new
NettyMessagingService
.
InboundMessageDispatcher
());
}
}
private
class
WriteTask
implements
Runnable
{
private
final
Object
message
;
private
final
Channel
channel
;
public
WriteTask
(
Channel
channel
,
Object
message
)
{
this
.
message
=
message
;
this
.
channel
=
channel
;
}
@Override
public
void
run
()
{
channel
.
writeAndFlush
(
message
);
}
}
private
class
InboundMessageDispatcher
extends
SimpleChannelInboundHandler
<
InternalMessage
>
{
@Override
protected
void
channelRead0
(
ChannelHandlerContext
ctx
,
InternalMessage
message
)
throws
Exception
{
String
type
=
message
.
type
();
if
(
type
.
equals
(
InternalMessage
.
REPLY_MESSAGE_TYPE
))
{
try
{
AsyncResponse
<?>
futureResponse
=
NettyMessagingService
.
this
.
responseFutures
.
getIfPresent
(
message
.
id
());
if
(
futureResponse
!=
null
)
{
futureResponse
.
setResponse
(
message
.
payload
());
}
log
.
warn
(
"Received a reply. But was unable to locate the request handle"
);
}
finally
{
NettyMessagingService
.
this
.
responseFutures
.
invalidate
(
message
.
id
());
}
return
;
}
MessageHandler
handler
=
NettyMessagingService
.
this
.
getMessageHandler
(
type
);
handler
.
handle
(
message
);
}
}
}
utils/netty/src/main/java/org/onlab/netty/Response.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/**
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
*
* @param <T> type of response.
*/
public
interface
Response
<
T
>
{
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
* @return response
* @throws TimeoutException if the timeout expires before the response arrives.
*/
public
T
get
(
long
timeout
,
TimeUnit
tu
)
throws
TimeoutException
;
/**
* Gets the response waiting for indefinite timeout period.
* @return response
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
public
T
get
()
throws
InterruptedException
;
/**
* Checks if the response is ready without blocking.
* @return true if response is ready, false otherwise.
*/
public
boolean
isReady
();
}
utils/netty/src/main/java/org/onlab/netty/Serializer.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
/**
* Interface for encoding/decoding message payloads.
*/
public
interface
Serializer
{
/**
* Decodes the specified byte array to a POJO.
*
* @param data byte array.
* @return POJO
*/
Object
decode
(
byte
[]
data
);
/**
* Encodes the specified POJO into a byte array.
*
* @param data POJO to be encoded
* @return byte array.
*/
byte
[]
encode
(
Object
message
);
}
utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
import
java.util.concurrent.TimeUnit
;
public
final
class
SimpleClient
{
private
SimpleClient
()
{}
public
static
void
main
(
String
...
args
)
throws
Exception
{
NettyMessagingService
messaging
=
new
TestNettyMessagingService
(
9081
);
messaging
.
activate
();
messaging
.
sendAsync
(
new
Endpoint
(
"localhost"
,
8080
),
"simple"
,
"Hello World"
);
Response
<
String
>
response
=
messaging
.
sendAndReceive
(
new
Endpoint
(
"localhost"
,
8080
),
"echo"
,
"Hello World"
);
System
.
out
.
println
(
"Got back:"
+
response
.
get
(
2
,
TimeUnit
.
SECONDS
));
}
public
static
class
TestNettyMessagingService
extends
NettyMessagingService
{
public
TestNettyMessagingService
(
int
port
)
throws
Exception
{
super
(
port
);
Serializer
serializer
=
new
KryoSerializer
();
this
.
serializer
=
serializer
;
}
}
}
utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
0 → 100644
View file @
f1eab29
package
org
.
onlab
.
netty
;
public
final
class
SimpleServer
{
private
SimpleServer
()
{}
public
static
void
main
(
String
...
args
)
throws
Exception
{
NettyMessagingService
server
=
new
TestNettyMessagingService
();
server
.
activate
();
server
.
registerHandler
(
"simple"
,
new
LoggingHandler
());
server
.
registerHandler
(
"echo"
,
new
EchoHandler
());
}
public
static
class
TestNettyMessagingService
extends
NettyMessagingService
{
protected
TestNettyMessagingService
()
{
Serializer
serializer
=
new
KryoSerializer
();
this
.
serializer
=
serializer
;
}
}
}
Please
register
or
login
to post a comment