当前位置: 首页 > 图灵资讯 > 技术篇> spring boot连接kafka及kafka参数说明

spring boot连接kafka及kafka参数说明

来源:图灵教育
时间:2023-10-13 17:20:29

一、maven 添加kafka依赖

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.13</artifactId>    <version>3.2.1</version></dependency>
配置log4j2日志

maven配置

如果使用spring, boot框架,需要先排除spring boot  start 依赖loging<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter</artifactId>    <exclusions>        <exclusion>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-logging</artifactId>        </exclusion>    </exclusions></dependency>#如果使用立面模式,只需添加以下依赖性<dependency>    <groupId>org.slf4j</groupId>    <artifactId>slf4j-api</artifactId></dependency><dependency>    <groupId>org.apache.logging.log4j</groupId>    <artifactId>log4j-slf4j-impl</artifactId></dependency>#因为log4j-slf4j-log4j-api和log4j-core依赖#如果不使用门面模式,然后需要添加以下依赖<dependency>    <groupId>org.apache.logging.log4j</groupId>    <artifactId>log4j-api</artifactId></dependency><dependency>    <groupId>org.apache.logging.log4j</groupId>    <artifactId>log4j-core</artifactId></dependency>#这里没有添加版本号,因为使用了spring boot框架

log4j2.xml 配置文件需要放置resource目录

<?xml version="1.0" encoding="utf-8" ?><Configuration status="error">    <!--全局属性-->    <properties>        <!--项目名称-->        <Property name="PROJECT_NAME">test</Property>        <!-- 定义日志存储的路径 -->        <property name="LOG_FILE_PATH" value="${PROJECT_NAME}/logs"/>        <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:从左显示5个字符宽度 %msg:日志新闻,%n是换行符-->        <!-- %logger{36} 表示 Logger 最长36个字符的名称 -->        <property name="LOG_PATTERN" value="%date{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger{36} - %msg%n"/>    </properties>    <Appenders>        <Console name="Console" target="SYSTEM_OUT">            <PatternLayout pattern="%d{HH:mm:ss.sss} %t %-5level %logger{36} - %msg%n" />        </Console>        <!-- RolllingFile也可以用RollingRandomacesfile,后者可以比前者提高20-200%的性能 -->        <RollingRandomAccessFile name="RollingRandomAccessFile" fileName="${LOG_FILE_PATH}/info.log" filePattern="${LOG_FILE_PATH}/$${date:yyyyMMdd}/$${date:HH}/info-%d{yyyyMMddHH}-%i.log">            <!--过滤器 只统计 level 以上级别的信息(onMatch),其它顺序处理(onMismatch)-->            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="NEUTRAL"/>            <!--日志信息 格式-->            <PatternLayout pattern="${LOG_PATTERN}"/>            <!--指定规则-->            <Policies>                <!--当系统启动时,生成一个新的日志文件,以触发之前的日志策略-->                <OnStartupTriggeringPolicy />                <!--interval属性用于指定滚动多长时间,与filePatern有关-->                <TimeBasedTriggeringPolicy modulate="true" interval="1"/>                <!--根据日志的大小拆分-->                <SizeBasedTriggeringPolicy size="100 MB"/>            </Policies>            <!--在同一目录下,最大文件编号将重新开始覆盖。--同一目录下的最大文件编号,超过时重新开始覆盖。也就是%。i,默认7-->            <DefaultRolloverStrategy max="30">                <!--要删除的路径和目录的深度-->                <Delete basePath="${LOG_FILE_PATH}/${date:yyyyMMdd}/" maxDepth="1">                    <!--匹配指定文件-->                    <IfFileName glob="*.log">                        <!--删除超过指定时间的日志-->                        <IfLastModified age="1d">                            <!--<IfAny>-->                            <!--<IfAccumulatedFileSize exceeds="300 KB" />-->                            <!--<IfAccumulatedFileCount exceeds="10" />-->                            <!--</IfAny>-->                        </IfLastModified>                    </IfFileName>                </Delete>            </DefaultRolloverStrategy>        </RollingRandomAccessFile>    </Appenders>    <Loggers>        <Root level="info">            <AppenderRef ref="Console" />            <AppenderRef ref="RollingRandomAccessFile" />        </Root>    </Loggers></Configuration>
三、发kafka消息1、producer配置参数
private static final Properties producerProp = new Properties();    static {        producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.200:9092");        producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");        producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        // acks 0 :发送后不需要等待leader broker端响应结果,即回调函数失效        // acks all/-1: broker写入本地日志后,还需要确保所有副本都成功写入日志        // acks 1: broker写入本地日志后,发送响应结果。即以上两种方案的折中方案        producerProp.put(ProducerConfig.ACKS_CONFIG, "1");        // kafka 异步发送的条件  1、达到一定时间,默认为0   2、默认为16384,达到一定数量即16kb        producerProp.put(ProducerConfig.LINGER_MS_CONFIG, 50000);        producerProp.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840);    }
2、同步发送
/**     * 同步发送     * @param topic     * @param msg     */    private static void sendMsg(String topic, String msg) {        try {            KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProp);            LOGGER.info("============= start sync send msg =================");            int i = 0;            // 同步发送            while (true) {                String message = "the msg is sync" + msg + " " + i++;                ProducerRecord<Integer, String> record = new ProducerRecord<>(topic, message);                producer.send(record);                LOGGER.info("the msg seng success {}", message);                Thread.sleep(2000);            }        } catch (Exception e) {            e.printStackTrace();        }    }
3、异步发送
/**     * 异步发送     * @param topic     * @param msg     */    private static void asyncSendMsg(String topic, String msg) {        try {            KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProp);            int i = 0;            LOGGER.info("============= start async send msg =================");            while (true){                String message = "the msg is sync" + msg + " " + i++;                ProducerRecord<Integer, String> record = new ProducerRecord<>(topic, message);                LOGGER.info("send msg success {}", message);                // 异步发送                producer.send(record, (recordMetadata, e) -> {                    if (e == null) {                        LOGGER.info("send msg success {}, topic is {}", message, recordMetadata.topic());                    } else {                        LOGGER.error("send msg failed", e);                    }                });                Thread.sleep(1000);            }        } catch (Exception e) {            e.printStackTrace();        }    }
4、测试
public static void main(String[] args) {        String message = "send my message";        String topicName = "topicDemo";        asyncSendMsg(topicName, message);//        sendMsg(topicName, message);    }
四、消费kafka消息1、consumer参数配置
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTest.class);    private static final Properties consumerProp = new Properties();    static {        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092");        consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");        consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");    }
2、消费kafka
private static void consumerMsg(String topic) {        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp);        consumer.subscribe(Collections.singleton(topic));        LOGGER.info("==================  start consumer msg =======================");        while (true) {            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));            records.forEach(record -> {                LOGGER.info("the msg is {}", record.value());            });        }    }
3、测试
public static void main(String[] args) {        String topicName = "topicDemo";        consumerMsg(topicName);    }
五、producer主要参数1、bootstrap.servers

kafka broker地址用于连接kafka broker服务器,如果kafka集群机很多,只需要指定一些broker,producer会根据这个参数找到所有的broker。指定多个broker的原因是,如果一个broker挂断,kafka集群重启,可以通过其他broker连接到kafka集群

2、key.serializer

key和value在broker中发送消息必须是字节数组,因此消息必须序列化或自定义序列化器,只需实现serializer接口/p>3、value.serializer

value序列化器

4、acks

producer发送消息有三种情况、发送消息给broker后,直接准备发送下一条消息,性能高,吞吐量大,但有丢失数据的风险broker、将消息发送给broker,等待broken将消息写入本地日志,等待其所有副本也将消息写入本地,然后再给producer响应,吞吐量小,数据c不会丢失、在上述折中方案中,broker在将消息写入本地日志后,立即向producer响应acks,用于控制上述三种情况,对应的值为 0、all/-1、1在这里插入图片描述

5、buffer.memory

使用异步发送时,先将消息发送到缓存区,这是缓冲区的大小。默认为3354432,即32M。参数设置越大,吞吐量越高,因为IO次数会减少

6、retries

信息发送失败,自动重试次数

7、batch.size

异步发送消息时,同一分区的消息将首先放在batch中。当batch满后,将batch中的所有消息发送。此值不能设置太大或太小。太大会给内存带来很大的压力。太小的吞吐量很小。默认值为16384,即16KB。当batch未填写时,也可以发送消息。有以下参数.ms控制的

8、linger.ms

该参数决定了在batch中发送消息的时间。默认值为0,即立即发送,这将减少producer的吞吐量

9、max.request.size

Producer发送的最大消息大小为1048576,即1M

10、request.timeout.ms

broker应在规定的时间内将响应结果返回到producer。如果超过此时间,timeoutexception将在回调函数中抛出异常,默认值为30秒

六、producer分区策略1、默认分区策略
a、如果指定了key,有相同key的信息尽可能发送相同的分区,或者是否有指定的key数据也会发送到这个分区。如果只是想在没有其他数据的情况下将某些数据发送到单独的分区,则需要定制分区2、没有指定key,消息通过轮询均匀发送到所有分区
2、定制分区策略
a、创建分区类别,实现org.apache.kafka.clients.producer.bpartitioner接口、在构建producer对象的properties中设置partioner.class参数
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import java.util.List;import java.util.Map;import java.util.Random;public class MyPartitionerDemo implements Partitioner {    private Random random;    /**     * 计算给定msg发送的topic的分区     * @param s topic名称     * @param o key     * @param bytes key字节数组     * @param o1 value     * @param bytes1 value字节数组     * @param cluster 集群元数据     * @return 分区索引值     */    @Override    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {        String key = (String)o;        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(s);        int size = partitionInfos.size();        int auditPartitioner = size - 1;        return key == null || key.isEmpty() || !key.contains("audit") ? random.nextInt(auditPartitioner) : auditPartitioner;    }    /**     * 关闭partitioner     */    @Override    public void close() {    }    /**     * 该方法实现了资源的初始化     * @param map     */    @Override    public void configure(Map<String, ?> map) {        random = new Random();    }

发送消息

private static final Properties producerProp = new Properties();    static {        producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.200:9092");        producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        // acks 0 :发送后不需要等待leader broker端响应结果,即回调函数失效        // acks all/-1: broker写入本地日志后,还需要确保所有副本都成功写入日志        // acks 1: broker写入本地日志后,发送响应结果。即以上两种方案的折中方案        producerProp.put(ProducerConfig.ACKS_CONFIG, "1");        // kafka 异步发送的条件  1、达到一定时间,默认为0   2、默认为16384,达到一定数量即16kb//        producerProp.put(ProducerConfig.LINGER_MS_CONFIG, 50000);//        producerProp.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840);    }    public static void main(String[] args) {        String message = "send my message";        String topicName = "topicDemo";        customPartitioner(topicName);//        asyncSendMsg(topicName, message);//        sendMsg(topicName, message);    }/**     * 发送指定的分区     * @param topic     */    private static void customPartitioner(String topic) {        try {            producerProp.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, com.example.kafka.MyPartitionerDemo.class);            KafkaProducer<String, String> producer = new KafkaProducer<>(producerProp);            LOGGER.info("============= start sync send msg =================");            while (true) {                ProducerRecord<String, String> record = new ProducerRecord<>(topic, "没有指定分区的消息");                ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "audit", "audit指定分区的消息");                ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "other","audit指定分区的消息");                producer.send(record);                producer.send(record1);                producer.send(record2);                Thread.sleep(60000);            }        } catch (Exception e) {            e.printStackTrace();        }    }

消费数据

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerTest {    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTest.class);    private static final Properties consumerProp = new Properties();    static {        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092");        consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");        consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");    }    public static void main(String[] args) {        String topicName = "topicDemo";        consumerMsg(topicName);    }    private static void consumerMsg(String topic) {        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp);        consumer.subscribe(Collections.singleton(topic));        LOGGER.info("==================  start consumer msg =======================");        while (true) {            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));            records.forEach(record -> {                LOGGER.info("the msg is {}", record.value());            });        }    }

消费结果在这里插入图片描述

消息序列化

默认序列化在这里插入图片描述定制序列化。。。。

producer拦截器

在发送消息和回调逻辑之前,可以对消息进行一些定制需求,比如修改消息,允许用户指定多个拦截器来实现拦截器,需要实现Producerinterptor接口

无信息丢失配置

异步发送的两个问题a、batch中的消息尚未发送,此时producer崩溃,消息将丢失batch、由于网络抖动同时配置重试机制,会导致消息乱序producer端“无消息丢失配置”:

block.on.buffer.full = trueacks = all or -1retries = Integer.MAX_VALUEmax.in.flight.requests.per.connection = 使用带回调机制的send发送消息Callback逻辑中显式地关闭producerunclean.leader.election.enable = falsereplication.factor = 3min.insync.replicas = 2replication.factor > min.insync.replicasenable.auto.commit = false
producer端配置

block.on.buffer.full = 当true内存缓冲区被填满时,producer处于阻塞状态,停止接收新消息,而不是抛出异常,否则,producer的生产速度会耗尽缓冲区acks = 所有的alll副本都写入成功后才能认为提交成功retries = Integer.MAX_VALUE重试次数为Integer最大值max.in.flight.request.per.connection = 为防止topic同分区下的消息乱序问题,如果设置为1,producer将无法在broker发送响应之前向broker发送请求,并使用带有回调机制的send。当消息发送失败时,会收到异常通知。callback逻辑显示,为了处理消息的乱序问题,立即关闭producer。如果不使用close(0),默认情况下,producer将被允许发送未完成的消息。这可能会导致消息混乱。

broker端配置

unclean.leader.election.enable = false关闭unclean.leader选举,即非ISR中的副本不允许被选为leader,从而避免broker端因日志水位截断而丢失replication.factor >= 保存分区的消息min有3多个副本.insync.replicas > 用于控制信息至少写入ISR的多少副本才算成功replication.factor > min.insync.replicas如果两者相等,所以只要挂了一个副本,分区就不能正常工作,推荐配置为replication.factor = min.insync.replicas+1

消息压缩

减少磁盘占用,增加IO,但消耗CPU时钟周期,支持的压缩算法包括:GZIP,Snappy,LZ4,默认情况下不压缩性能比较:LZ4>Snappy>GZIP

多线程处理

多线程单Kafkaproducer实例构建了一个producer,用于多线程。由于producer是线程安全的,这种方法也是线程安全的多线程多Kafkaproducer实例。在这里插入图片描述

七、consumer主要参数1、构建consumer

bootstra.server:grouppkaka地址.id:消费者组名称key.deserializer:key反序列化value.deserializer:value反序列化

a、properties对象的结构
private static final Properties consumerProp = new Properties();    static {        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092");        consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");        consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");    }
b、构建Kafkaconsumer对象
**KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp);**
c、订阅topic
**consumer.subscribe(Collections.singleton(topic));**
d、循环拉取消息,处理consumerRecord对象
while (true) {            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));            records.forEach(record -> {                LOGGER.info("the msg is {}", record.value());            });        }
e、关闭kafkaconsumer
finally {            consumer.close();//          consumer.close(Duration.ofMillis(5000));        }
1.1、consumer脚本命令
kafka@kafka200 bin]$ ./kafka-console-consumer.sh --helpThis tool helps to read data from Kafka topics and outputs it to standard output.Option                                   Description                            ------                                   -----------                            --bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.   connect to>                                                                   --consumer-property <String:             A mechanism to pass user-defined         consumer_prop>                           properties in the form key=value to                                             the consumer.                        --consumer.config <String: config file>  Consumer config properties file. Note                                             that [consumer-property] takes                                                  precedence over this config.         --enable-systest-events                  Log lifecycle events of the consumer                                              in addition to logging consumed                                                 messages. (This is specific for                                                 system tests.)                       --formatter <String: class>              The name of a class to use for                                                    formatting kafka messages for                                                   display. (default: kafka.tools.                                                 DefaultMessageFormatter)             --from-beginning                         If the consumer does not already have                                             an established offset to consume                                                from, start with the earliest                                                   message present in the log rather                                               than the latest message.             --group <String: consumer group id>      The consumer group id of the consumer. --help                                   Print usage information.               --isolation-level <String>               Set to read_committed in order to                                                 filter out transactional messages                                               which are not committed. Set to                                                 read_uncommitted to read all                                                    messages. (default: read_uncommitted)--key-deserializer <String:                                                       deserializer for key>                                                         --max-messages <Integer: num_messages>   The maximum number of messages to                                                 consume before exiting. If not set,                                             consumption is continual.            --offset <String: consume offset>        The offset id to consume from (a non-                                             negative number), or 'earliest'                                                 which means from beginning, or                                                  'latest' which means from end                                                   (default: latest)                    --partition <Integer: partition>         The partition to consume from.                                                    Consumption starts from the end of                                              the partition unless '--offset' is                                              specified.                           --property <String: prop>                The properties to initialize the                                                  message formatter. Default                                                      properties include:                                                            print.timestamp=true|false                                                      print.key=true|false                                                            print.offset=true|false                                                         print.partition=true|false                                                      print.headers=true|false                                                        print.value=true|false                                                          key.separator=<key.separator>                                                   line.separator=<line.separator>                                                 headers.separator=<line.separator>                                              null.literal=<null.literal>                                                     key.deserializer=<key.deserializer>                                             value.deserializer=<value.                                                       deserializer>                                                                  header.deserializer=<header.                                                     deserializer>                                                                 Users can also pass in customized                                                 properties for their formatter; more                                            specifically, users can pass in                                                 properties keyed with 'key.                                                     deserializer.', 'value.                                                         deserializer.' and 'headers.                                                    deserializer.' prefixes to configure                                            their deserializers.                 --skip-message-on-error                  If there is an error when processing a                                            message, skip it instead of halt.    --timeout-ms <Integer: timeout_ms>       If specified, exit if no message is                                               available for consumption for the                                               specified interval.                  --topic <String: topic>                  The topic id to consume on.            --value-deserializer <String:                                                     deserializer for values>                                                      --version                                Display Kafka version.                 --whitelist <String: whitelist>          Regular expression specifying                                                     whitelist of topics to include for                                              consumption.
1.2、consumer主要参数

bootstra.server:grouppkaka地址.id:消费者组名称key.deserializer:key反序列化value.deserializer:value反序列化session.timeout.ms:consumer group检测组成员发送崩溃时间,即当group中的一个consumer崩溃时,消费者组协调员可能需要session.timeout.ms可以找到,所以这个时间不能设置太长的max.poll.interval.ms:onsumer消息处理逻辑的最大时间,该参数默认值为10秒auto.offset.reset:kafka在没有位移信息或位移越界时的应对策略,即consumer要消耗的消息的位移不在当前消息日志的合理范围内,该参数有以下三个值:指从最早的位移开始消费,这里最早的不一定说0,如果是group首次运行,即新的group,设置为该值将从零开始消费。如果已经提交了位移信息,latest将从提交的位移开始消费:指从最新的位移开始消费none:指定如未发现位移信息或位移越界,这抛出了异常enablele.auto.commit:位移fetch是否自动提交.max.bytes:consumer单次获取数据最大字节数max.poll.records:单次调用poll返回的最大消息,默许500heartbeat.interval.ms:通知其他group打开Rebalance需要多长时间?.timeout.msconnection.max.idle.ms:kafka定期关闭空闲socket连接,默认为9分钟。如果用户不在乎这些费用,建议将该值设置为-1

2、topic2.1订阅 订阅topic列表
consumer.subscribe(Collections.singleton(topic1,topic2,topic3);
2.2、topicc基于正则表达式订阅
consumer.subscribe(Pattern.compile("kafka-.*"));// 当指定的分区方案发生变化时,consumer.subscribe(Pattern.compile("kafka-.*"), new ConsumerRebalanceListener() {            @Override            public void onPartitionsRevoked(Collection<TopicPartition> collection) {                            }            @Override            public void onPartitionsAssigned(Collection<TopicPartition> collection) {            }        });// 若设置为自动提交,这个类别不用担心consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
3、消息轮询

poll方法返回数据的两个条件

  • 获得足够的数据
  • 等待时间超过指定的超时间