- Netty / Vert.x / All things NIO
- Author of Netty in Action
- @normanmaurer
- github.com/normanmaurer
public class HttpHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
public void channelRead(ChannelHandlerContext ctx, HttpRequest req) {
ChannelFuture future = ctx.writeAndFlush(createResponse(req)); (1)
if (!isKeepAlive(req)) {
future.addListener(ChannelFutureListener.CLOSE); (2)
}
}
}
1 | Write to the Channel and flush out to the Socket. |
2 | After written close Socket |
public class HttpPipeliningHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
public void channelRead(ChannelHandlerContext ctx, HttpRequest req) {
ChannelFuture future = ctx.writeAnd(createResponse(req)); (1)
if (!isKeepAlive(req)) {
future.addListener(ChannelFutureListener.CLOSE); (2)
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); (3)
}
}
1 | Write to the Channel (No syscall!) but not flush yet |
2 | After written close socket |
3 | Flush out to the socket. |
write(msg) ⇒ pass through pipeline
flush() ⇒ gathering write of previous written msgs
writeAndFlush() ⇒ short-cut for write(msg) and flush()
Limit flushes as much as possible as syscalls are quite expensive. |
But also limit write(...) as much as possible as it need to traverse the whole pipeline. |
http://25.media.tumblr.com/tumblr_me2eq0PnBx1rtu0cpo1_1280.jpg
Use VoidChannelPromise
if possible
Channel.write(msg) |
Channel.write(msg, Channel.voidPromise()) |
⇒ Only do this if you are not interested in the ChannelFuture , and no ChannelOutboundHandler needs to add ChannelFutureListener to it! |
public class BlindlyWriteHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
while(needsToWrite) { (1)
ctx.writeAndFlush(createMessage());
}
}
}
1 | Writes till needsToWrite returns false . |
Risk of OutOfMemoryError if writing too fast and having slow receiver! |
public class GracefulWriteHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
writeIfPossible(ctx.channel());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
writeIfPossible(ctx.channel());
}
private void writeIfPossible(Channel channel) {
while(needsToWrite && channel.isWritable()) { (1)
channel.writeAndFlush(createMessage());
}
}
}
1 | Make proper use of Channel.isWritable() to prevent OutOfMemoryError |
Set sane WRITE_BUFFER_HIGH_WATER_MARK and WRITE_BUFFER_LOW_WATER_MARK |
Server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Client
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
ChannelPipeline
Your custom events
public enum CustomEvents {
MyCustomEvent
}
public class CustomEventHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt == MyCustomEvent) { // do something}
}
}
ChannelPipeline pipeline = channel.pipeline();
pipeline.fireUserEventTriggered(MyCustomEvent);
Good fit for handshake notifications and more |
ReplayingDecoder
allows easy writing of decoders but is slower than ByteToMessageDecoder
as..
ReplayingError
Rule of thumb ⇒ Use ByteToMessageDecoder if it is possible without make things too complicated |
Use unpooled buffers with caution! |
Use pooled buffers! |
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Condition because of not enough areas |
Configure the areas
java -Dio.netty.allocator.numDirectArenas=... -Dio.netty.allocator.numHeapArenas=...
This will be not an issue anymore in next release. Thanks to Thread-Caches in PooledByteBufAllocator . |
Pooling pays off for direct and heap buffers! |
https://blog.twitter.com/2013/netty-4-at-twitter-reduced-gc-overhead
OpenJDK and Oracle JDK copy otherwise to direct buffer by itself! |
Only use heap buffers if need to operate on byte[]` in ChannelOutboundHandler
! By default direct ByteBuf` will be returned by ByteBufAllocator.buffer(...)
.
Take this as rule of thumb
SlowSearch :(
ByteBuf buf = ...;
int index = -1;
for (int i = buf.readerIndex(); index == -1 && i < buf.writerIndex(); i++) {
if (buf.getByte(i) == '\n') {
index = i;
}
}
FastSearch :)
ByteBuf buf = ...;
int index = buf.forEachByte(new ByteBufProcessor() {
@Override
public boolean process(byte value) {
return value != '\n';
}
});
ByteBufProcessor
is faster because it…
Use it whenever you need to find some pattern in a ByteBuf |
Prefer…
ByteBuf
payload ⇒ extend DefaultByteBufHolder
http://www.flickr.com/photos/za3tooor/65911648/
Use zero-memory-copy for efficient transfer of raw file content |
Channel channel = ...;
FileChannel fc = ...;
channel.writeAndFlush(new DefaultFileRegion(fc, 0, fileLength));
This only works if you not need to modify the data on the fly. If so use ChunkedWriteHandler and NioChunkedFile . |
Thread.sleep() |
CountDownLatch.await() or any other blocking operation from
java.util.concurrent |
Long-lived computationally intensive operations |
Blocking operations that might take a while (e.g. DB query) |
http://www.flickr.com/photos/za3tooor/65911648/
Schedule and execute tasks via EventLoop this reduces the needed Threads and also makes sure it’s Thread-safe |
http://www.flickr.com/photos/mbiddulph/3203780308/
Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup());
Bootstrap bootstrap2 = new Bootstrap().group(new NioEventLoopGroup());
Share EventLoopGroup between different Bootstraps
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap().group(group);
Bootstrap bootstrap2 = new Bootstrap().group(group);
Sharing the same EventLoopGroup allows to keep the resource usage (like Thread-usage) to a minimum. |
public class ProxyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) { (1)
final Channel inboundChannel = ctx.channel();
Bootstrap b = new Bootstrap();
b.group(new NioEventLooopGroup()); (2)
...
ChannelFuture f = b.connect(remoteHost, remotePort);
...
}
}
1 | Called once a new connection was accepted |
2 | Use a new EventLoopGroup instance to handle the connection to the remote peer |
Don’t do this! This will tie up more resources than needed and introduce extra context-switching overhead. |
public class ProxyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) { (1)
final Channel inboundChannel = ctx.channel();
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop()); (2)
...
ChannelFuture f = b.connect(remoteHost, remotePort);
...
}
}
1 | Called once a new connection was accepted |
2 | Share the same EventLoop between both Channels. This means all IO for both connected Channels are handled by the same Thread. |
Always share EventLoop in those Applications |
EventLoop
Not recommended!
channel.write(msg1);
channel.writeAndFlush(msg3);
Combine for the WIN!
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.write(msg1);
channel.writeAndFlush(msg3);
}
});
Always combine operations if possible when act on the Channel from outside the EventLoop to reduce overhead of wakeups and object creation! |
public class YourHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// BAD (most of the times)
ctx.channel().writeAndFlush(msg); (1)
// GOOD
ctx.writeAndFlush(msg); (2)
}
}
1 | Channel.* methods ⇒ the operation will start at the tail of the ChannelPipeline |
2 | ChannelHandlerContext.* methods => the operation will start from this `ChannelHandler to flow through the ChannelPipeline . |
Use the shortest path as possible to get the maximal performance. |
@ChannelHandler.Shareable (1)
public class StatelessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.debug("Now client from " + ctx.channel().remoteAddress().toString());
}
}
public class MyInitializer extends ChannelInitializer<Channel> {
private static final ChannelHandler INSTANCE = new StatelessHandler();
@Override
public void initChannel(Channel ch) {
ch.pipeline().addLast(INSTANCE);
}
}
1 | Annotate ChannelHandler that are stateless with @ChannelHandler.Shareable and use the same instance accross Channels to reduce GC. |
public class OneTimeHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
doOneTimeAction();
ctx.channel().pipeline().remove(this); (1)
}
}
1 | Remove ChannelHandler once not needed anymore. |
This keeps the ChannelPipeline as short as possible and so eliminate overhead of traversing as much as possible. |
public class EncodeActsOnByteArray extends MessageToByteEncoder<YourMessage> {
public EncodeActsOnByteArray() { super(false); } (1)
@Override
public encode(ChannelHandlerContext ctx, YourMessage msg, ByteBuf out) {
byte[] array = out.array(); (2)
int offset = out.arrayOffset() + out.writerIndex();
out.writeIndex(out.writerIndex() + encode(msg, array, offset)); (3)
}
private int encode(YourMessage msg, byte[] array, int offset, int len) { ... }
}
1 | Ensure heap buffers are used when pass into encode(...) method. This way you can access the backing array directly |
2 | Access the backing array and also calculate offset |
3 | Update writerIndex to reflect written bytes |
This saves extra byte copies. |
By default Netty will keep on reading data from the Channel
once something is ready.
Need more fine grained control ?
channel.config().setAutoRead(false); (1)
channel.read(); (2)
channel.config().setAutoRead(true); (3)
1 | Disable auto read == no more data will be read automatically of this Channel . |
2 | Tell the Channel to do one read operation once new data is ready |
3 | Enable again auto read == Netty will automatically read again |
This can also be quite useful when writing proxy like applications! |
JDKs SSLEngine is just too slow and produces too much GC :( |
Use Twitters OpenSSL based SSLEngine . For details how to compile check Finagle native module. |
import com.twitter.finagle.ssl.Ssl;
...
SSLEngine engine = Ssl.server("/path/to/cert.pem", "/path/to/key.pem", "/path/to/cachain.pem", null, null).self();
pipeline.addLast("ssl", new SslHandler(engine));
Netty will ship an OpenSSL based SslHandler by it own soon. |
You never know when ChannelFuture will be notified. |
Bad
public class OuterClass {
private final HeavyObject instance = ....;
public void write() { channel.writeAndFlush(msg).addListener(
new ChannelFutureListener() { ...}); }
}
Good
public class OuterClass {
private final HeavyObject instance = ....;
public void write() { channel.writeAndFlush(msg).addListener(new ListenerImpl()); }
private static final class ListenerImpl implements ChannelFutureListener { ... }
}
Only works on Linux as epoll is supported atm. May change in the future |
Using NIO transport
Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup());
bootstrap.channel(NioSocketChannel.class);
Using native transport
Bootstrap bootstrap = new Bootstrap().group(new EpollEventLoopGroup());
bootstrap.channel(EpollSocketChannel.class);
Buy my book Netty in Action and make me RICH. |
http://www.manning.com/maurer
$ KA-CHING $
Netty - http://netty.io |
Slides generated with Asciidoctor and DZSlides backend |
Original slide template - Dan Allen & Sarah White |
All pictures licensed with Creative Commons Attribution orCreative Commons Attribution-Share Alike |