当前位置: 首页 > 图灵资讯 > 技术篇> netty4.0.x源码分析—write和flush

netty4.0.x源码分析—write和flush

来源:图灵教育
时间:2023-06-13 09:17:45

一般开发者write和flush数据都是基于channelhandlercontextex ctx,然后调用相应的write和flush方法。这两种方法的代码分析如下。(顺便说一句,这两种方法都是从应用程序发送到底层的数据,属于outboundhandler类型。如果没有特殊需求,我们不需要定义自己的handler,可以使用默认的handler,这将反映在后面的分析中。)

1、write方法

以下是ChanelHandlerContext中write的相关代码,ChanelHandlerContext中的chanelHantext

1. @Override  2. public ChannelFuture write(Object msg) {  3. return write(msg, newPromise()); //构建newpromise对象,调用带promise的write方法,  4. }  5.   6. @Override  7. public ChannelFuture write(final Object msg, final ChannelPromise promise) {  8. if (msg == null) {  9. throw new NullPointerException("msg");  10.     }  11.   12. true);///验证promise  13.   14. false, promise);///再次调用write方法,flush参数设置为false,更重要。真实的数据发送过程是flush,以后会提到  15.   16. return promise;  17. }  18.   19. private void write(Object msg, boolean flush, ChannelPromise promise) {  20.   21.     DefaultChannelHandlerContext next = findContextOutbound();  22.     EventExecutor executor = next.executor();  23. if (executor.inEventLoop()) {  24. ///调用invokeWrite函数  25. if (flush) {  26.             next.invokeFlush();  27.         }  28. else {  29. int size = channel.estimatorHandle().size(msg);  30. if (size > 0) {  31.             ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();  32. // Check for null as it may be set to null if the channel is closed already  33. if (buffer != null) {  34. false);  35.             }  36.         }  37.         executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));  38.     }  39. }  40.   41. pre name="code" class="java">    private void invokeWrite(Object msg, ChannelPromise promise) {  42. try {  43. this, msg, promise);///调用handler的write方法,这里我们使用headhandler默认的handler方法。  44. catch (Throwable t) {  45.         notifyOutboundHandlerException(t, promise);  46.     }  47. }

让我们来看看HeadHandlerwrite方法

1. @Override  2. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  3.     unsafe.write(msg, promise);  4. }

还记得我们之前提到的unsafe对象吗?从这个代码中,我们可以看到unsafe对象的重要性。真正的“苦力”仍然是unsafe对象,因此我们不能忽视它。让我们快速看看它的实现。以下是unsafe的write方法,主要unsafe是一个接口,以下是多个特定类别的扩展,这里的write方法是protected abstract class Abstractunsafe中的定义,请注意,它是Abstractchanel的内部类别。以下是write的定义。以下是write的定义。

1. @Override  2. public void write(Object msg, ChannelPromise promise) {  3. if (!isActive()) {  4. // Mark the write request as failure if the channel is inactive.  5. if (isOpen()) {  6.             promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);  7. else {  8.             promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);  9.         }  10. // release message now to prevent resource-leak  11.         ReferenceCountUtil.release(msg);  12. else {  13.         outboundBuffer.addMessage(msg, promise);  14.     }  15. }

通过这个代码,我们可以清楚地看到,事实上,write的过程只是将数据写入buffer中,而不是真正的发送。真正的发送过程是我们以后要分析的flush过程。好吧,看看flush过程。

2、flush方法

上面提到的flush是发送数据的真实过程,所以这种方法仍然是关键。让我们来看看defaultchanelhandlercontext中的相关代码:

1. @Override  2. public ChannelHandlerContext flush() {  3. final DefaultChannelHandlerContext next = findContextOutbound();  4.     EventExecutor executor = next.executor();  5. if (executor.inEventLoop()) {  6.         next.invokeFlush();  7. else {  8.         Runnable task = next.invokeFlushTask;  9. if (task == null) {  10. new Runnable() {  11. @Override  12. public void run() {  13.                     next.invokeFlush();  14.                 }  15.             };  16.         }  17.         executor.execute(task);  18.     }  19.   20. return this;  21. }  22.   23. private void invokeFlush() {  24. try {  25. this);  26. catch (Throwable t) {  27.         notifyHandlerException(t);  28.     }  29. }

这个代码逻辑还是比较清晰的,或者我们之前分析的最后调用默认的HeadHandler来处理flush。以下是HeadHandler的flush定义。

1. @Override  2. public void flush(ChannelHandlerContext ctx) throws Exception {  3.     unsafe.flush();  4. }

这是unsafe的苦力,在做事。让我们来看看unsafe中flush的定义。它仍然在protected上 abstract class Abstractunsafe中定义的,请注意,它是Abstractchanel的内部类别。

1. @Override  2. public void flush() {  3. this.outboundBuffer;  4. if (outboundBuffer == null) {  5. return;  6.     }  7.   8.     outboundBuffer.addFlush();  9.     flush0();  10. }  11.   12. protected void flush0() {  13. if (inflush0) {  14. // Avoid re-entrance  15. return;  16.     }  17.   18. final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;  19. if (outboundBuffer == null || outboundBuffer.isEmpty()) {  20. return;  21.     }  22.   23. true;  24.   25. // Mark all pending write requests as failure if the channel is inactive.  26. if (!isActive()) {  27. try {  28. if (isOpen()) {  29.                 outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);  30. else {  31.                 outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);  32.             }  33. finally {  34. false;  35.         }  36. return;  37.     }  38.   39. try {  40.         doWrite(outboundBuffer);  41. catch (Throwable t) {  42.         outboundBuffer.failFlushed(t);  43. if (t instanceof IOException) {  44.             close(voidPromise());  45.         }  46. finally {  47. false;  48.     }  49. }

逻辑还是很简单的。我们可以看到,它最终调用了dowrite函数,并以buffer为参数。这个buffer实际上是write时数据存储的地方。write的过程即将与channel打交道。还记得我们之前分析过的channel吗?write定义在abstractniobytechanel类中(server端以字节的形式发送数据)。以下是具体代码:

1. @Override  2. protected void doWrite(ChannelOutboundBuffer in) throws Exception {  3. int writeSpinCount = -1;  4.   5. for (;;) {  6.         Object msg = in.current();  7. if (msg == null) {  8. // Wrote all messages.  9.             clearOpWrite();  10. break;  11.         }  12.   13. if (msg instanceof ByteBuf) {  14.             ByteBuf buf = (ByteBuf) msg;  15. int readableBytes = buf.readableBytes();  16. if (readableBytes == 0) {  17.                 in.remove();  18. continue;  19.             }  20. if (!buf.isDirect()) {  21.                 ByteBufAllocator alloc = alloc();  22. if (alloc.isDirectBufferPooled()) {  23. // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.  24. // We can do a better job by using our pooled allocator. If the current allocator does not  25. // pool a direct buffer, we rely on JDK's direct buffer pool.  26.                     buf = alloc.directBuffer(readableBytes).writeBytes(buf);  27.                     in.current(buf);  28.                 }  29.             }  30. boolean done = false;  31. long flushedAmount = 0;  32. if (writeSpinCount == -1) {  33.                 writeSpinCount = config().getWriteSpinCount();  34.             }  35. for (int i = writeSpinCount - 1; i >= 0; i --) {  36. int localFlushedAmount = doWriteBytes(buf);  37. if (localFlushedAmount == 0) {  38. break;  39.                 }  40.   41.                 flushedAmount += localFlushedAmount;  42. if (!buf.isReadable()) {  43. true;  44. break;  45.                 }  46.             }  47.   48.             in.progress(flushedAmount);  49.   50. if (done) {  51.                 in.remove();  52. else {  53. // Did not write completely.  54.                 setOpWrite();  55. break;  56.             }  57. else if (msg instanceof FileRegion) {  58.             FileRegion region = (FileRegion) msg;  59. boolean done = false;  60. long flushedAmount = 0;  61. if (writeSpinCount == -1) {  62.                 writeSpinCount = config().getWriteSpinCount();  63.             }  64. for (int i = writeSpinCount - 1; i >= 0; i --) {  65. long localFlushedAmount = doWriteFileRegion(region);  66. if (localFlushedAmount == 0) {  67. break;  68.                 }  69.   70.                 flushedAmount += localFlushedAmount;  71. if (region.transfered() >= region.count()) {  72. true;  73. break;  74.                 }  75.             }  76.   77.             in.progress(flushedAmount);  78.   79. if (done) {  80.                 in.remove();  81. else {  82. // Did not write completely.  83.                 setOpWrite();  84. break;  85.             }  86. else {  87. throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));  88.         }  89.     }  90. }

从上面的代码逻辑可以看出,write最终可能会调用两个函数,一个是dowritebytes,另一个是dowritefileregion。这里先介绍dowritebytes,有时间再介绍dowritefileregion。好了,让我们继续看看dowriteBytes函数的实现。它被定义为NiosocketChanel,这是我们用来处理客户端连接的常用Chanel。

1. @Override  2. protected int doWriteBytes(ByteBuf buf) throws Exception {  3. final int expectedWrittenBytes = buf.readableBytes();  4. final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);  5. return writtenBytes;  6. }

上面的代码最终调用了buff.readBytes(channel, bytes)事实上,真正的JDKSocketet已经开始在这里调用 Channel开始发送数据。让我们简单地看看这个函数,然后有时间详细介绍ByteBuf的整个结构。在Abstractbytebuf类和Readonlybytebufferbuf中直接定义这两个函数,可以看出最终是通过JDK的chanel。.数据发送write函数。

1.  @Override  2. public int readBytes(GatheringByteChannel out, int length)  3. throws IOException {  4.      checkReadableBytes(length);  5. int readBytes = getBytes(readerIndex, out, length);  6.      readerIndex += readBytes;  7. return readBytes;  8.  }  9.   10. @Override  11. public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {  12.      ensureAccessible();  13. if (length == 0) {  14. return 0;  15.      }  16.   17.      ByteBuffer tmpBuf = internalNioBuffer();  18.      tmpBuf.clear().position(index).limit(index + length);  19. return out.write(tmpBuf);  20.  }

3、总结

本文简要分析了write和flush的过程,从上到下整理了write和flush数据的完整过程。此外,在最终发送数据时,我们提到了bytebuf,我稍后会写一篇文章详细介绍。