当前位置: 首页 > 图灵资讯 > 技术篇> 浅尝Redis Stream做消息队列

浅尝Redis Stream做消息队列

来源:图灵教育
时间:2023-06-06 09:35:12

众所周知,redis数据结构中的listlistlpush和rpop可以用于传统消息队列,从收集的最左端写入,从最右端弹出消费。并支持多个生产者和多个消费者并发获取数据,数据只能由一个消费者获取。

然而,该方案并不能保证消费者在消费消息后能否成功处理问题(服务挂断或异常处理等)。该机制属于点对点模式,无法进行广播模式(发布/订阅模式)

Pub/sub

因此,redis提供了相应的发布订阅功能,并引入了解除点对点强绑定模式Channel管道

当生产者向管道发布消息时,订阅管道的消费者可以同时收到消息,并提供pattern能力,以简化订阅多个管道,关注多个名称。

浅尝Redis Stream做消息队列_客户端

如果通过名称匹配接收消息的频道wmyskzz.chat,consumer3也会收到消息。

然而,这一计划也受到了极大的批评,即它不会持续下去。如果服务挂断并重新启动数据,则完全丢弃,并且没有提供ack机制,这不能保证数据的可靠性。无论消费是否成功,它都会被遗忘。

Stream

stream的结构与kafka的设计理念非常相似,提供了consumergroup和offset机制,结构感觉与kafka的topic相似,但没有相应的partation副本机制,而是一个额外的信息链表结构。当客户端调用xadd时,它会自动创建stream。每个消息都是持久的,并且有唯一的id识别

浅尝Redis Stream做消息队列_客户端_02

ConsumerGroup

消费者群体的概念与kafka的概念相同。消费者可以使用它XREAD命令独立消费,多个消费者也可以同时加入一个消费者群体。一个消息只能由一个消费者群体中的一个消费者消费。这可以确保信息在分布式系统中的独特性。

事实上,我后来仔细考虑了这个功能。为了确保分布式系统的唯一消息,我制作了redis分布式锁。。。有点鸡肋。显然,消费者组已经确保了数据的独特性。。。只能说,添加锁可以降低资源成本。。。

last_delivered_id

用于识别消费者组在stream上消费位置的游标。每个消费者组都有stream中唯一的名称,消费者组不会自动创建,需要使用XGROUPCREATE显式创建。

pending_ids

每个消费者内部都有一个状态变量。用来表示已经被客户端消费,但没有ack消费。目的是确保客户端至少消费一次消息。(atleastonce)。如果消费者收到消息,但没有回复ack,列表将继续增长。如果有很多消费者组,列表所占用的内存将被放大

curd
  • xadd添加新闻
  • xdel删除消息,这里的删除只设置标志位,不影响消息总长度
  • xrange获取消息列表,将自动过滤已删除的消息
  • xlen消息长度
  • del删除Streamm
pending_ids如何避免消息丢失?

当客户端消费者阅读Stream消息时,Redis服务器将消息回复客户端,客户端突然断开连接,消息丢失。

但是发送的消息ID已经保存在pending_ids中。客户端重新连接后,可以再次收到pending_ids中的消息ID列表。

然而,此时,xreadgroup的初始消息必须是任何有效的消息ID,通常将参数设置为0-0,这意味着读取所有pending_ids消息和last_delivered_id之后的新消息。

将SpringBoot嵌入到Springbott中

虽然redistream仍然有一些缺点,但与kafka等消息组件相比,redis非常适合作为消息队列。

这里简单介绍一下思路,本质上是提供管理信息的一个小功能,定义了创建stream管道的注释

浅尝Redis Stream做消息队列_redis_03

创建注释类,注释类必须继承StreamListener<String,ObjectRecord<String,Object>>并重写onmessage方法。此注释也添加到方法中。

创建config类实现BeanPostProcessor重写bean声明周期postProcessAfterInitializationpostProcessBeforeInitialization方法。该方法将在spring启动过程中的refresh方法加载bean的声明周期中扫描到所有添加注释的bean。

通过线程池逐个创建streamgroup组和streamconsumer监控连接,config类记得继承disposablebean类在destroy方法中关闭连接,以避免oom。

注册redis stream consumer容器api提供的consumer

在这里,一定要注意polltimeout参数。看名字就知道默认拉数据的时间间隔。如果这个参数值很小或者写0,看你的cpu是否高。

@Bean("listenerContainer")@DependsOn(value = "redisConnectionFactory")public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()         .batchSize(10)         .serializer(new StringRedisSerializer())         .executor(new ForkJoinPool())         .pollTimeout(Duration.ofSeconds(3))         .targetType(Object.class)         .build();   return StreamMessageListenerContainer.create(redisConnectionFactory, options);}
创建消费者

 

private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();   if (stringRedisTemplate.hasKey(streamKey)) {      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);      groups.forEach(groupInfo -> {         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {            groupHasKey.set(true);         }      });      if (groups.isEmpty() || !groupHasKey.get()) {         creatGroup(streamKey, group);      } else {         groups.stream().forEach(g -> {            log.info("XInfoGroups:{}", g);            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());            log.info("XInfoConsumers:{}", consumers);         });      }   } else {      creatGroup(streamKey, group);   }   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());   Consumer consumer = Consumer.from(group, consumerName);   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);   listenerContainer.start();   this.containerList.add(listenerContainer);   return subscription;}

具体代码在gitee上。现在一些其他组件已经慢慢扩展。如果你感兴趣,你可以去看看,或者如果你有一些开发建议,你也可以在私人信件下添加一些组件以使其更大。

Gitee新闻流通组件服务