众所周知,redis数据结构中的listlistlpush和rpop可以用于传统消息队列,从收集的最左端写入,从最右端弹出消费。并支持多个生产者和多个消费者并发获取数据,数据只能由一个消费者获取。
然而,该方案并不能保证消费者在消费消息后能否成功处理问题(服务挂断或异常处理等)。该机制属于点对点模式,无法进行广播模式(发布/订阅模式)
Pub/sub因此,redis提供了相应的发布订阅功能,并引入了解除点对点强绑定模式Channel管道
。
当生产者向管道发布消息时,订阅管道的消费者可以同时收到消息,并提供pattern能力,以简化订阅多个管道,关注多个名称。
如果通过名称匹配接收消息的频道wmyskzz.chat,consumer3也会收到消息。
然而,这一计划也受到了极大的批评,即它不会持续下去。如果服务挂断并重新启动数据,则完全丢弃,并且没有提供ack机制,这不能保证数据的可靠性。无论消费是否成功,它都会被遗忘。
Streamstream的结构与kafka的设计理念非常相似,提供了consumergroup和offset机制,结构感觉与kafka的topic相似,但没有相应的partation副本机制,而是一个额外的信息链表结构。当客户端调用xadd时,它会自动创建stream。每个消息都是持久的,并且有唯一的id识别
ConsumerGroup消费者群体的概念与kafka的概念相同。消费者可以使用它XREAD
命令独立消费,多个消费者也可以同时加入一个消费者群体。一个消息只能由一个消费者群体中的一个消费者消费。这可以确保信息在分布式系统中的独特性。
事实上,我后来仔细考虑了这个功能。为了确保分布式系统的唯一消息,我制作了redis分布式锁。。。有点鸡肋。显然,消费者组已经确保了数据的独特性。。。只能说,添加锁可以降低资源成本。。。
last_delivered_id用于识别消费者组在stream上消费位置的游标。每个消费者组都有stream中唯一的名称,消费者组不会自动创建,需要使用XGROUPCREATE
显式创建。
每个消费者内部都有一个状态变量。用来表示已经
被客户端消费,但没有ack消费。目的是确保客户端至少消费一次消息。(atleastonce
)。如果消费者收到消息,但没有回复ack,列表将继续增长。如果有很多消费者组,列表所占用的内存将被放大
- xadd添加新闻
- xdel删除消息,这里的删除只设置标志位,不影响消息总长度
- xrange获取消息列表,将自动过滤已删除的消息
- xlen消息长度
- del删除Streamm
当客户端消费者阅读Stream消息时,Redis服务器将消息回复客户端,客户端突然断开连接,消息丢失。
但是发送的消息ID已经保存在pending_ids中。客户端重新连接后,可以再次收到pending_ids中的消息ID列表。
然而,此时,xreadgroup的初始消息必须是任何有效的消息ID,通常将参数设置为0-0,这意味着读取所有pending_ids消息和last_delivered_id之后的新消息。
将SpringBoot嵌入到Springbott中虽然redistream仍然有一些缺点,但与kafka等消息组件相比,redis非常适合作为消息队列。
这里简单介绍一下思路,本质上是提供管理信息的一个小功能,定义了创建stream管道的注释
创建注释类,注释类必须继承StreamListener<String,ObjectRecord<String,Object>>并重写onmessage方法。此注释也添加到方法中。
创建config类实现BeanPostProcessor
重写bean声明周期postProcessAfterInitialization
和postProcessBeforeInitialization
方法。该方法将在spring启动过程中的refresh方法加载bean的声明周期中扫描到所有添加注释的bean。
通过线程池逐个创建streamgroup组和streamconsumer监控连接,config类记得继承disposablebean类在destroy方法中关闭连接,以避免oom。
注册redis stream consumer容器api提供的consumer在这里,一定要注意polltimeout参数。看名字就知道默认拉数据的时间间隔。如果这个参数值很小或者写0,看你的cpu是否高。
@Bean("listenerContainer")@DependsOn(value = "redisConnectionFactory")public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .serializer(new StringRedisSerializer()) .executor(new ForkJoinPool()) .pollTimeout(Duration.ofSeconds(3)) .targetType(Object.class) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options);}
创建消费者
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) { StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream(); if (stringRedisTemplate.hasKey(streamKey)) { StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey); AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false); groups.forEach(groupInfo -> { if (Objects.equals(group, groupInfo.getRaw().get("name"))) { groupHasKey.set(true); } }); if (groups.isEmpty() || !groupHasKey.get()) { creatGroup(streamKey, group); } else { groups.stream().forEach(g -> { log.info("XInfoGroups:{}", g); StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName()); log.info("XInfoConsumers:{}", consumers); }); } } else { creatGroup(streamKey, group); } StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed()); Consumer consumer = Consumer.from(group, consumerName); Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener); listenerContainer.start(); this.containerList.add(listenerContainer); return subscription;}
具体代码在gitee上。现在一些其他组件已经慢慢扩展。如果你感兴趣,你可以去看看,或者如果你有一些开发建议,你也可以在私人信件下添加一些组件以使其更大。
Gitee新闻流通组件服务