当前位置: 首页 > 图灵资讯 > 技术篇> 设计模式之订阅发布模式

设计模式之订阅发布模式

来源:图灵教育
时间:2023-05-28 09:32:00

一、简介

订阅发布模式(Publish-Subscribe Pattern)解耦框架和业务逻辑是一种有效的方式,也是一种常见的观察者设计模式,广泛应用于事件驱动架构。

在这种模式下,出版商(或主题)不直接向订阅者发送信息,而是通过调度中心(或消息代理)发送信息。 出版商(或主题)不知道订阅者的存在,订阅者也不知道出版商的存在。他们之间唯一的关系是在调度中心注册为订阅者或出版商。

当出版商有新消息时,将消息发布到调度中心。调度中心将通知所有订阅者。这实现了出版商和订阅者之间的解耦,出版商和订阅者不再直接依赖对方,他们可以独立扩展自己。

在具体实现中,调度中心可以通过消息队列、事件总线等机制实现。不同的语言和平台都有库和框架,比如 Java 中的 ActiveMQ、RabbitMQ、Kafka等。

订阅发布模式有以下优点:

  1. 性能好,发布者发送消息后直接返回,不需要等待消费者处理。
  2. 解耦性强,出版商与订阅者之间没有直接依赖,满足高内聚低耦合的设计理念。
  3. 它可以支持一对多、多对多的信息通信模型,并提供更灵活的信息传输方式。
  4. 可动态增加或删除出版商和订阅者,扩展性好。
二、Java实现发布订阅模式
  1. 为接收消息通知,创建订阅者接口。
interface Subscriber {    void update(String message);}
  1. 为发布新闻创建出版商。增加、删除和发布功能,并维护订阅列表,
class Publisher {    private Map<String, List<Subscriber>> subscribers = new HashMap<>();    public void subscribe(String topic, Subscriber subscriber) {        List<Subscriber> subscriberList = subscribers.get(topic);        if (subscriberList == null) {            subscriberList = new ArrayList<>();            subscribers.put(topic, subscriberList);        }        subscriberList.add(subscriber);    }    public void unsubscribe(String topic, Subscriber subscriber) {        List<Subscriber> subscriberList = subscribers.get(topic);        if (subscriberList != null) {            subscriberList.remove(subscriber);        }    }    public void publish(String topic, String message) {        List<Subscriber> subscriberList = subscribers.get(topic);        if (subscriberList != null) {            for (Subscriber subscriber : subscriberList) {                subscriber.update(message);            }        }    }}
  1. 我们也实现了两种不同之处 Subscriber 实现,一个是 EmailSubscriber,另一个是 SMSSubscriber,用于接收发布者的信息,并将其发送到电子邮件和手机上。
class EmailSubscriber implements Subscriber {    private String email;    public EmailSubscriber(String email) {        this.email = email;    }    public void update(String message) {        System.out.println("Send email to " + email + ": " + message);    }}class SMSSubscriber implements Subscriber {    private String phoneNumber;    public SMSSubscriber(String phoneNumber) {        this.phoneNumber = phoneNumber;    }    public void update(String message) {        System.out.println("Send SMS to " + phoneNumber + ": " + message);    }}
  1. 在 Main 在类中,我们创建了一个类 Publisher 添加两个对象 EmailSubscriber 和两个 SMSSubscriber,分别订阅了 news 主题更新。我们先给主题发消息,然后取消 news 其中一个主题是订阅者,最后我们再次给出 news 主题发送消息。
public class Main {    public static void main(String[] args) {        Publisher publisher = new Publisher();        Subscriber emailSubscriber = new EmailSubscriber("foo@example.com");        Subscriber sssubscriber = new SMSSubscriber("1234567890");        publisher.subscribe("news", emailSubscriber1;        publisher.subscribe("news", smssubscriber1;        publisher.publish("news", "发布新消息1";        publisher.unsubscribe("news", smssubscriber1;        publisher.publish("news", "发布新消息2";    }}

打印输出如下:

Send email to foo@example.com: 发布新消息1Senddd新消息dd新消息 SMS to 1234567890: 发布新消息1Senddd新消息dd新消息 email to foo@example.com: 发布新消息2
三、Spring自带的订阅发布模式

Spring的订阅发布模式是通过发布事件、事件监听器和事件发布器三个部分完成的

我们在这里通过 newbee-mall-pro 已在项目中实现订阅发布模式的订单流程向大家讲解,项目地址:https://github.com/wayn11///newbee-mall-pro

  1. 自定义订单发布事件,继承 ApplicationEvent
public class OrderEvent extends ApplicationEvent {  void onApplicationEvent(Object event) {    ...  }}
  1. 定义订单监听器,实现 ApplicationListener
@Componentpublic class OrderListener implements ApplicationListener<OrderEvent> {    @Override    public void onApplicationEvent(OrderEvent event) {    // 生成订单,删除购物车,扣除库存    ...    }}
  1. 通过事件发布器下订单流程 applicationEventPublisher 发布订单事件,然后在订单监听器中处理订单保存逻辑。
@Resourceprivate ApplicationEventPublisher applicationEventPublisher;private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) {    // 订单检查    ...    // 生成订单号    String orderNo = NumberUtil.genOrderNo();    // 发布订单事件,在事件监控中处理订单逻辑    applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList));    // 所有操作成功后,返回订单号    return orderNo;    ...}

通过事件监控机制,将订单逻辑分为以下步骤:

  1. 订单检查
  2. 生成订单号
  3. 发布订单事件,处理事件监控中的订单保存逻辑
  4. 所有操作成功后,订单号返回的每一步都是独立的,不相互影响

以上代码已经实现了订阅发布模式,并成功地解耦了订单逻辑。但由于性能没有得到优化,因此没有得到优化 Spring Boot 在项目中,默认情况下,事件监听器同步处理,即此处的订单流程将在事件监听器处理后返回,最终影响接口响应时间。

四、使用异步事件监控发布类别

Spring Boot 项目中事件监督发布的类别是由项目中事件监督发布的 SimpleApplicationEventMulticaster 实现此类,源代码中通知的订阅者代码如下:

设计模式之订阅发布模式_事件监听

可以看出,代码中有判断。 getTaskExecutor() 如果方法返回不是空的,就交由 executor 执行,负责同步执行。这个时候大家都会问,异步通知订阅者不是有线程池吗?别担心,博主会带你继续查看源代码。

设计模式之订阅发布模式_解耦_02

可以看到 getTaskExecutor() 该方法返回成员属性,该成员属性在 SimpleApplicationEventMulticaster 类中是通过 setTaskExecutor(@Nullable Executor taskExecutor) 设置方法。我们通过。 ctrl + f7 查一下 setTaskExecutor(...) 该方法在哪里被调用,

设计模式之订阅发布模式_List_03

Ok,到此水落石出,SimpleApplicationEventMulticaster 类的 taskExecutor 成员属性一直是 null,因此,在通过订阅者时,总是同步处理,等待订阅者完成处理。

对异步处理,我们可以从两个方面入手:

  1. 从事件监听器开始,将事件监听器的事件触发方法改为异步执行,如生成订单、删除购物车、扣除库存逻辑进入线程池异步执行,或在订阅者的通知方法中 onApplicationEvent 上加上@Async注意,表示该方法异步执行。
  2. 从事件监控发布开始,设置默认事件监控发布taskExecutor通过源码可以知道属性,也可以解决。

这里博主介绍如何修改事件监听发布类别来解决。

/** * 在系统启动时执行 */@Componentpublic class SpringBeanStartupRunner implements ApplicationRunner {    @Override    public void run(ApplicationArguments args) throws Exception {        // 设置spring默认事件监控为异步执行        SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class);        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(                5,                10,                60L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(500),                new CustomizableThreadFactory("newbee—event-task"),                new ThreadPoolExecutor.CallerRunsPolicy()        );        multicaster.setTaskExecutor(threadPoolExecutor);    }}

当系统启动时反射修改SimpleApplicationEventMulticaster类的taskExecutor属性,从而让SimpleApplicationEventMulticaster类别支持异步事件通知。

总结

建议您考虑哪些业务流程可以应用于日常开发。例如,当订单支付成功后,您需要通知用户、商品、活动和其他服务时,您可以考虑使用订阅发布模式。对于解耦发布者和订阅者,发布者只需发布消息,不需要知道订阅者是什么,也不需要知道订阅者的具体实现。订阅者只需要关注他们感兴趣的信息。这种松耦设计使系统更容易扩展和维护。