一般开发者write和flush数据都是基于channelhandlercontextex ctx,然后调用相应的write和flush方法。这两种方法的代码分析如下。(顺便说一句,这两种方法都是从应用程序发送到底层的数据,属于outboundhandler类型。如果没有特殊需求,我们不需要定义自己的handler,可以使用默认的handler,这将反映在后面的分析中。)
1、write方法以下是ChanelHandlerContext中write的相关代码,ChanelHandlerContext中的chanelHantext
1. @Override 2. public ChannelFuture write(Object msg) { 3. return write(msg, newPromise()); //构建newpromise对象,调用带promise的write方法, 4. } 5. 6. @Override 7. public ChannelFuture write(final Object msg, final ChannelPromise promise) { 8. if (msg == null) { 9. throw new NullPointerException("msg"); 10. } 11. 12. true);///验证promise 13. 14. false, promise);///再次调用write方法,flush参数设置为false,更重要。真实的数据发送过程是flush,以后会提到 15. 16. return promise; 17. } 18. 19. private void write(Object msg, boolean flush, ChannelPromise promise) { 20. 21. DefaultChannelHandlerContext next = findContextOutbound(); 22. EventExecutor executor = next.executor(); 23. if (executor.inEventLoop()) { 24. ///调用invokeWrite函数 25. if (flush) { 26. next.invokeFlush(); 27. } 28. else { 29. int size = channel.estimatorHandle().size(msg); 30. if (size > 0) { 31. ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); 32. // Check for null as it may be set to null if the channel is closed already 33. if (buffer != null) { 34. false); 35. } 36. } 37. executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); 38. } 39. } 40. 41. pre name="code" class="java"> private void invokeWrite(Object msg, ChannelPromise promise) { 42. try { 43. this, msg, promise);///调用handler的write方法,这里我们使用headhandler默认的handler方法。 44. catch (Throwable t) { 45. notifyOutboundHandlerException(t, promise); 46. } 47. }
让我们来看看HeadHandlerwrite方法
1. @Override 2. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 3. unsafe.write(msg, promise); 4. }
还记得我们之前提到的unsafe对象吗?从这个代码中,我们可以看到unsafe对象的重要性。真正的“苦力”仍然是unsafe对象,因此我们不能忽视它。让我们快速看看它的实现。以下是unsafe的write方法,主要unsafe是一个接口,以下是多个特定类别的扩展,这里的write方法是protected abstract class Abstractunsafe中的定义,请注意,它是Abstractchanel的内部类别。以下是write的定义。以下是write的定义。
1. @Override 2. public void write(Object msg, ChannelPromise promise) { 3. if (!isActive()) { 4. // Mark the write request as failure if the channel is inactive. 5. if (isOpen()) { 6. promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); 7. else { 8. promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); 9. } 10. // release message now to prevent resource-leak 11. ReferenceCountUtil.release(msg); 12. else { 13. outboundBuffer.addMessage(msg, promise); 14. } 15. }
通过这个代码,我们可以清楚地看到,事实上,write的过程只是将数据写入buffer中,而不是真正的发送。真正的发送过程是我们以后要分析的flush过程。好吧,看看flush过程。
2、flush方法上面提到的flush是发送数据的真实过程,所以这种方法仍然是关键。让我们来看看defaultchanelhandlercontext中的相关代码:
1. @Override 2. public ChannelHandlerContext flush() { 3. final DefaultChannelHandlerContext next = findContextOutbound(); 4. EventExecutor executor = next.executor(); 5. if (executor.inEventLoop()) { 6. next.invokeFlush(); 7. else { 8. Runnable task = next.invokeFlushTask; 9. if (task == null) { 10. new Runnable() { 11. @Override 12. public void run() { 13. next.invokeFlush(); 14. } 15. }; 16. } 17. executor.execute(task); 18. } 19. 20. return this; 21. } 22. 23. private void invokeFlush() { 24. try { 25. this); 26. catch (Throwable t) { 27. notifyHandlerException(t); 28. } 29. }
这个代码逻辑还是比较清晰的,或者我们之前分析的最后调用默认的HeadHandler来处理flush。以下是HeadHandler的flush定义。
1. @Override 2. public void flush(ChannelHandlerContext ctx) throws Exception { 3. unsafe.flush(); 4. }
这是unsafe的苦力,在做事。让我们来看看unsafe中flush的定义。它仍然在protected上 abstract class Abstractunsafe中定义的,请注意,它是Abstractchanel的内部类别。
1. @Override 2. public void flush() { 3. this.outboundBuffer; 4. if (outboundBuffer == null) { 5. return; 6. } 7. 8. outboundBuffer.addFlush(); 9. flush0(); 10. } 11. 12. protected void flush0() { 13. if (inflush0) { 14. // Avoid re-entrance 15. return; 16. } 17. 18. final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; 19. if (outboundBuffer == null || outboundBuffer.isEmpty()) { 20. return; 21. } 22. 23. true; 24. 25. // Mark all pending write requests as failure if the channel is inactive. 26. if (!isActive()) { 27. try { 28. if (isOpen()) { 29. outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); 30. else { 31. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); 32. } 33. finally { 34. false; 35. } 36. return; 37. } 38. 39. try { 40. doWrite(outboundBuffer); 41. catch (Throwable t) { 42. outboundBuffer.failFlushed(t); 43. if (t instanceof IOException) { 44. close(voidPromise()); 45. } 46. finally { 47. false; 48. } 49. }
逻辑还是很简单的。我们可以看到,它最终调用了dowrite函数,并以buffer为参数。这个buffer实际上是write时数据存储的地方。write的过程即将与channel打交道。还记得我们之前分析过的channel吗?write定义在abstractniobytechanel类中(server端以字节的形式发送数据)。以下是具体代码:
1. @Override 2. protected void doWrite(ChannelOutboundBuffer in) throws Exception { 3. int writeSpinCount = -1; 4. 5. for (;;) { 6. Object msg = in.current(); 7. if (msg == null) { 8. // Wrote all messages. 9. clearOpWrite(); 10. break; 11. } 12. 13. if (msg instanceof ByteBuf) { 14. ByteBuf buf = (ByteBuf) msg; 15. int readableBytes = buf.readableBytes(); 16. if (readableBytes == 0) { 17. in.remove(); 18. continue; 19. } 20. if (!buf.isDirect()) { 21. ByteBufAllocator alloc = alloc(); 22. if (alloc.isDirectBufferPooled()) { 23. // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. 24. // We can do a better job by using our pooled allocator. If the current allocator does not 25. // pool a direct buffer, we rely on JDK's direct buffer pool. 26. buf = alloc.directBuffer(readableBytes).writeBytes(buf); 27. in.current(buf); 28. } 29. } 30. boolean done = false; 31. long flushedAmount = 0; 32. if (writeSpinCount == -1) { 33. writeSpinCount = config().getWriteSpinCount(); 34. } 35. for (int i = writeSpinCount - 1; i >= 0; i --) { 36. int localFlushedAmount = doWriteBytes(buf); 37. if (localFlushedAmount == 0) { 38. break; 39. } 40. 41. flushedAmount += localFlushedAmount; 42. if (!buf.isReadable()) { 43. true; 44. break; 45. } 46. } 47. 48. in.progress(flushedAmount); 49. 50. if (done) { 51. in.remove(); 52. else { 53. // Did not write completely. 54. setOpWrite(); 55. break; 56. } 57. else if (msg instanceof FileRegion) { 58. FileRegion region = (FileRegion) msg; 59. boolean done = false; 60. long flushedAmount = 0; 61. if (writeSpinCount == -1) { 62. writeSpinCount = config().getWriteSpinCount(); 63. } 64. for (int i = writeSpinCount - 1; i >= 0; i --) { 65. long localFlushedAmount = doWriteFileRegion(region); 66. if (localFlushedAmount == 0) { 67. break; 68. } 69. 70. flushedAmount += localFlushedAmount; 71. if (region.transfered() >= region.count()) { 72. true; 73. break; 74. } 75. } 76. 77. in.progress(flushedAmount); 78. 79. if (done) { 80. in.remove(); 81. else { 82. // Did not write completely. 83. setOpWrite(); 84. break; 85. } 86. else { 87. throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); 88. } 89. } 90. }
从上面的代码逻辑可以看出,write最终可能会调用两个函数,一个是dowritebytes,另一个是dowritefileregion。这里先介绍dowritebytes,有时间再介绍dowritefileregion。好了,让我们继续看看dowriteBytes函数的实现。它被定义为NiosocketChanel,这是我们用来处理客户端连接的常用Chanel。
1. @Override 2. protected int doWriteBytes(ByteBuf buf) throws Exception { 3. final int expectedWrittenBytes = buf.readableBytes(); 4. final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); 5. return writtenBytes; 6. }
上面的代码最终调用了buff.readBytes(channel, bytes)事实上,真正的JDKSocketet已经开始在这里调用 Channel开始发送数据。让我们简单地看看这个函数,然后有时间详细介绍ByteBuf的整个结构。在Abstractbytebuf类和Readonlybytebufferbuf中直接定义这两个函数,可以看出最终是通过JDK的chanel。.数据发送write函数。
1. @Override 2. public int readBytes(GatheringByteChannel out, int length) 3. throws IOException { 4. checkReadableBytes(length); 5. int readBytes = getBytes(readerIndex, out, length); 6. readerIndex += readBytes; 7. return readBytes; 8. } 9. 10. @Override 11. public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { 12. ensureAccessible(); 13. if (length == 0) { 14. return 0; 15. } 16. 17. ByteBuffer tmpBuf = internalNioBuffer(); 18. tmpBuf.clear().position(index).limit(index + length); 19. return out.write(tmpBuf); 20. }
3、总结
本文简要分析了write和flush的过程,从上到下整理了write和flush数据的完整过程。此外,在最终发送数据时,我们提到了bytebuf,我稍后会写一篇文章详细介绍。