当前位置: 首页 > 图灵资讯 > 技术篇> 【MQ】kafka(三)——如何保证消息不丢失?如何解决?

【MQ】kafka(三)——如何保证消息不丢失?如何解决?

来源:图灵教育
时间:2023-05-10 17:28:35

  一、前言

  在之前的博客中,我们介绍了为什么生产者发送消息的吞吐量如此之大,因为生产者提供了内存缓冲区,包装和发送消息,从而增加了吞吐量。

  所以,消息发送过去,到了broker就成功了吗?会丢失吗?这个博客,给大家介绍一下 在什么情况下,kafka会出现消息丢失和解决方案。二、消息会在什么情况下丢失?

  首先要看kafka的架构图:

【MQ】kafka(三)——如何保证消息不丢失?如何解决?_发送消息

  因为我们有三个角色:生产者,broker,消费者。

  消费者是消费消息,一般不会丢失,所以消息丢失会出现 生产者和 broker之间。三、生产者消息丢失和解决丢失原因分为两种 消息未发送成功丢失,通过重试进行补偿 消息发送了,但broker挂了,没有收到。通过ack机制确认。 解决方案

  当我们配置生产者时,我们将有一个记录broker集群地址、权利鉴定和发送批次的配置文件: import com.ctrip.framework.apollo.ConfigService;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class BigDataKafkaConfig { private String innerServersProd = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.bootstrap-servers.prod", "").toString(); private String retries = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.producer.retries", "").toString(); private String batchSize = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.producer.batch-size", "").toString(); private String bufferMemory = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.producer.buffer-memory", "").toString(); private String innerServersTest = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.bootstrap-servers.test", "").toString(); private String ack = ConfigService.getConfig(MsgCommonConstant.TECH_BIGDATA_KAFKA).getProperty("spring.kafka.producer.ack", "").toString(); @Bean("ProducerFactoryTest") ///厂家配置 public ProducerFactory producerFactoryTest(String env) { return new DefaultKafkaProducerFactory<>(senderProps(env)); } @Bean("ProducerFactoryProd") ///厂家配置 public ProducerFactory producerFactoryProd(String env) { return new DefaultKafkaProducerFactory<>(senderProps(env)); } /** * 测试环境-kafkaka * * @return */ @Bean("bigDataKafkaTempletTest") public KafkaTemplate kafkaTemplateTest() { String env = "TEST"; return new KafkaTemplate(producerFactoryTest(env)); } /** * kafkakakaka发送生产环境 * * @return */ @Bean("bigDataKafkaTempletProd") public KafkaTemplate kafkaTemplateProd() { String env = "PROD"; return new KafkaTemplate(producerFactoryProd(env)); } /** * 生产者配置方法 *

* 生产者有三个必要的属性 *

* 1.bootstrap.servers broker地址清单,列表不包含所有broker地址, * 生产者会从给定的broker中找到其他broker的信息。但是,一旦建议至少提供两个broker信息 其中一个停机,生产者仍然可以连接到集群。但是,一旦建议至少提供两个broker信息 其中一个停机,生产者仍然可以连接到集群。 *

*

* 2.key.serializer broker希望收到的信息的键和值都是字节数组。 生产者将相应的类把键对象序列化为字节数组。 *

*

* 3.value.serializer 值得序列化 *

* * @return */ private Map senderProps(String env) { Map props = new HashMap<>(); if ("TEST".equals(env)) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServersTest); } else { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServersProd); } /** * 当broker收到临时可恢复的异常时,生产者会向broker重发消息,但不能无限 * 制重发,如果重发次数达到限制值,生产者不会重试并返回错误。 * 通过retries属性设置。默认情况下,生产者会在重试后等待100ms,可以通过 retries.backoff.修改ms属性 */ props.put(ProducerConfig.RETRIES_CONFIG, retries); /** * 在考虑完成请求之前,生产者要求leader确认数量。这可以控制发送记录的持久性。这可以控制发送记录的耐久性。允许以下设置: *
  • *
  • * acks = 0 如果设置为零,制造商将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,不能保证服务器已经收到记录, * retries 配置不会生效(因为客户端通常不知道任何故障)。每个记录返回的偏移总是设置为-1。 *
  • acks = 1 * 这意味着leader将记录写入其本地日志,但不需要等待所有follower的完全确认。在这种情况下, * 如果leader确认了记忆记录立即失败,但在关注者复制之前,记录将丢失。 *
  • acks = all * 这意味着leader将等待完整的同步副本集来确认记录。这保证了记录不会丢失,只要至少一个同步副本仍然存活。这是最有力的保证。 * 这相当于acks = -1设置 */ props.put(ProducerConfig.ACKS_CONFIG, ack); /** * 当多条消息发送到统一分区时,生产者将把它们放在统一批中。kafka通过批次概念 增加吞吐量,但也会增加延迟。 */ // 当缓存量达到16kb时,以下配置将触发网络请求并发送信息 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); // 缓存中每条消息的最长时间,如果超过这个时间,batch将被忽略.size的限制由客户端立即发送 ///设置kafka消息大小,msg超过设定大小不能发送到kafka props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }}

     

      我们有配置 acks 这个属性。这个ack表示broker的收据方式。发送者可以根据broker的回复确认收到后,以确保准确发送消息。 props.put(ProducerConfig.ACKS_CONFIG, ack);

      ack包括三种情况: ack = 0 ,由于默认发送成功,生产者不确认消息,发送后拉倒,此时重试参数无效。 ack = 1 , 制作人发送消息,只等leader写入成功后再返回,follower是否同步成功。 ack = -1 或 all ,生产者发送消息,等待leader 接收 且 follower 同步完 返回,如有异常或超时,将重新测试。影响性能。

      另一个参数:retries,生产者发送失败重试次数。 props.put(ProducerConfig.RETRIES_CONFIG, retries);

      综上所述,无疑是为了保证信息的准确性和可接触性,无疑是为了重新测试 + 回执确认。

      这里再补充一下 生产者写入的流程:

    【MQ】kafka(三)——如何保证消息不丢失?如何解决?_分布式_02

      producer制作新闻时,将使用pwrite()系统调用[对应Java FileChanel在NIO中.write() API】根据偏移量写入数据,并先写入page cache里。当consumer消费消息时,将使用sendfile()系统调用相应的filechanel.transferTo() API】,从pagee中复制数据 将cache传输到broker的Socketet buffer,然后通过网络传输。

      图中没有画的是leader和follower之间的同步,这和consumer是一样的:只要follower在ISR中,数据就可以通过零拷贝机制从leader所在的broker中提取。 page 将cache传输到follower的broker。

      同时,page 随着内核中flusher线程的调度和sync()/fsync()调用写回磁盘,即使过程崩溃,也不用担心数据丢失。此外,如果consumer想要消费的消息不在page中 只有在cache中,才会去磁盘读取,顺便预读一些相邻的块,放入page中 cache,为了方便下一次阅读。

      如果Kafka,我们可以得出一个重要的结论: producer的生产率与consumer的消费率相差不大,因此只能依靠broker page cache的读写完成了整个生产-消费过程,磁盘访问非常少。这一结论俗称“读写空中接力”。此外,当Kafka将持久消息传递到每个topic的partition文件时,它只附加顺序,充分利用磁盘顺序访问快的特点,效率高。四、broker丢失的消息丢失场景 没有收到消息 broker挂了,不能接受 broker收到消息,broker在刷盘到硬盘之前就挂了

      解释一下刷盘,解释一下刷盘。

      当消息到达broker时,它将首先写入 pageCache,然后系统根据配置策略进行配置 ,异步批量刷盘到磁盘,持久。

      常用的刷盘策略有:

      og.flush.interval.messages //多少条消息一次刷盘?

      log.flush.interval.ms ///每隔多长时间刷一次?

      log.flush.scheduler.interval.ms //周期性刷盘。

    【MQ】kafka(三)——如何保证消息不丢失?如何解决?_kafka_03

    解决方案 使用副本机制

      正如我们前面所说,broker中有ISR列表和OSR列表。事实上,partition的副本是否可用:每个partition都有相应的对应性 一个leader + 多个 follower,leaer专门处理事务类型的请求,follower负责同步拉取leader的数据,也从leader的pagecache中拉取。

    【MQ】kafka(三)——如何保证消息不丢失?如何解决?_发送消息_04

    优化刷盘时间 通过调整刷盘策略,可以保证刷盘到磁盘。这肯定会降低性能。 五、小结

      毫无疑问,我们应该考虑三个组件: 生产者 重试 + acks 确认回执 broker ISR副本机制 + 调整刷盘策略 消费者 一般没有问题