一、简介
订阅发布模式(Publish-Subscribe Pattern)解耦框架和业务逻辑是一种有效的方式,也是一种常见的观察者设计模式,广泛应用于事件驱动架构。
在这种模式下,出版商(或主题)不直接向订阅者发送信息,而是通过调度中心(或消息代理)发送信息。 出版商(或主题)不知道订阅者的存在,订阅者也不知道出版商的存在。他们之间唯一的关系是在调度中心注册为订阅者或出版商。
当出版商有新消息时,将消息发布到调度中心。调度中心将通知所有订阅者。这实现了出版商和订阅者之间的解耦,出版商和订阅者不再直接依赖对方,他们可以独立扩展自己。
在具体实现中,调度中心可以通过消息队列、事件总线等机制实现。不同的语言和平台都有库和框架,比如 Java 中的 ActiveMQ、RabbitMQ、Kafka等。
订阅发布模式有以下优点:
- 性能好,发布者发送消息后直接返回,不需要等待消费者处理。
- 解耦性强,出版商与订阅者之间没有直接依赖,满足高内聚低耦合的设计理念。
- 它可以支持一对多、多对多的信息通信模型,并提供更灵活的信息传输方式。
- 可动态增加或删除出版商和订阅者,扩展性好。
- 为接收消息通知,创建订阅者接口。
interface Subscriber { void update(String message);}
- 为发布新闻创建出版商。增加、删除和发布功能,并维护订阅列表,
class Publisher { private Map<String, List<Subscriber>> subscribers = new HashMap<>(); public void subscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList == null) { subscriberList = new ArrayList<>(); subscribers.put(topic, subscriberList); } subscriberList.add(subscriber); } public void unsubscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { subscriberList.remove(subscriber); } } public void publish(String topic, String message) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { for (Subscriber subscriber : subscriberList) { subscriber.update(message); } } }}
- 我们也实现了两种不同之处 Subscriber 实现,一个是 EmailSubscriber,另一个是 SMSSubscriber,用于接收发布者的信息,并将其发送到电子邮件和手机上。
class EmailSubscriber implements Subscriber { private String email; public EmailSubscriber(String email) { this.email = email; } public void update(String message) { System.out.println("Send email to " + email + ": " + message); }}class SMSSubscriber implements Subscriber { private String phoneNumber; public SMSSubscriber(String phoneNumber) { this.phoneNumber = phoneNumber; } public void update(String message) { System.out.println("Send SMS to " + phoneNumber + ": " + message); }}
- 在 Main 在类中,我们创建了一个类 Publisher 添加两个对象 EmailSubscriber 和两个 SMSSubscriber,分别订阅了 news 主题更新。我们先给主题发消息,然后取消 news 其中一个主题是订阅者,最后我们再次给出 news 主题发送消息。
public class Main { public static void main(String[] args) { Publisher publisher = new Publisher(); Subscriber emailSubscriber = new EmailSubscriber("foo@example.com"); Subscriber sssubscriber = new SMSSubscriber("1234567890"); publisher.subscribe("news", emailSubscriber1; publisher.subscribe("news", smssubscriber1; publisher.publish("news", "发布新消息1"; publisher.unsubscribe("news", smssubscriber1; publisher.publish("news", "发布新消息2"; }}
打印输出如下:
Send email to foo@example.com: 发布新消息1Senddd新消息dd新消息 SMS to 1234567890: 发布新消息1Senddd新消息dd新消息 email to foo@example.com: 发布新消息2
三、Spring自带的订阅发布模式
Spring的订阅发布模式是通过发布事件、事件监听器和事件发布器三个部分完成的
我们在这里通过 newbee-mall-pro 已在项目中实现订阅发布模式的订单流程向大家讲解,项目地址:https://github.com/wayn11///newbee-mall-pro
- 自定义订单发布事件,继承 ApplicationEvent
public class OrderEvent extends ApplicationEvent { void onApplicationEvent(Object event) { ... }}
- 定义订单监听器,实现 ApplicationListener
@Componentpublic class OrderListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(OrderEvent event) { // 生成订单,删除购物车,扣除库存 ... }}
- 通过事件发布器下订单流程 applicationEventPublisher 发布订单事件,然后在订单监听器中处理订单保存逻辑。
@Resourceprivate ApplicationEventPublisher applicationEventPublisher;private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) { // 订单检查 ... // 生成订单号 String orderNo = NumberUtil.genOrderNo(); // 发布订单事件,在事件监控中处理订单逻辑 applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList)); // 所有操作成功后,返回订单号 return orderNo; ...}
通过事件监控机制,将订单逻辑分为以下步骤:
- 订单检查
- 生成订单号
- 发布订单事件,处理事件监控中的订单保存逻辑
- 所有操作成功后,订单号返回的每一步都是独立的,不相互影响
以上代码已经实现了订阅发布模式,并成功地解耦了订单逻辑。但由于性能没有得到优化,因此没有得到优化 Spring Boot 在项目中,默认情况下,事件监听器同步处理,即此处的订单流程将在事件监听器处理后返回,最终影响接口响应时间。
四、使用异步事件监控发布类别Spring Boot 项目中事件监督发布的类别是由项目中事件监督发布的 SimpleApplicationEventMulticaster
实现此类,源代码中通知的订阅者代码如下:
可以看出,代码中有判断。 getTaskExecutor()
如果方法返回不是空的,就交由 executor 执行,负责同步执行。这个时候大家都会问,异步通知订阅者不是有线程池吗?别担心,博主会带你继续查看源代码。
可以看到 getTaskExecutor()
该方法返回成员属性,该成员属性在 SimpleApplicationEventMulticaster
类中是通过 setTaskExecutor(@Nullable Executor taskExecutor)
设置方法。我们通过。 ctrl + f7
查一下 setTaskExecutor(...)
该方法在哪里被调用,
Ok,到此水落石出,SimpleApplicationEventMulticaster
类的 taskExecutor 成员属性一直是 null,因此,在通过订阅者时,总是同步处理,等待订阅者完成处理。
对异步处理,我们可以从两个方面入手:
- 从事件监听器开始,将事件监听器的事件触发方法改为异步执行,如生成订单、删除购物车、扣除库存逻辑进入线程池异步执行,或在订阅者的通知方法中
onApplicationEvent
上加上@Async
注意,表示该方法异步执行。 - 从事件监控发布开始,设置默认事件监控发布
taskExecutor
通过源码可以知道属性,也可以解决。
这里博主介绍如何修改事件监听发布类别来解决。
/** * 在系统启动时执行 */@Componentpublic class SpringBeanStartupRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { // 设置spring默认事件监控为异步执行 SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(500), new CustomizableThreadFactory("newbee—event-task"), new ThreadPoolExecutor.CallerRunsPolicy() ); multicaster.setTaskExecutor(threadPoolExecutor); }}
当系统启动时反射修改SimpleApplicationEventMulticaster
类的taskExecutor
属性,从而让SimpleApplicationEventMulticaster
类别支持异步事件通知。
建议您考虑哪些业务流程可以应用于日常开发。例如,当订单支付成功后,您需要通知用户、商品、活动和其他服务时,您可以考虑使用订阅发布模式。对于解耦发布者和订阅者,发布者只需发布消息,不需要知道订阅者是什么,也不需要知道订阅者的具体实现。订阅者只需要关注他们感兴趣的信息。这种松耦设计使系统更容易扩展和维护。