1. 消息代理 (Broker)
ActiveMQ 负责处理所有消息流的核心组件。它提供了一个可以连接、发送和接收消息的平台。
BrokerService broker = new BrokerService(); broker.addConnector("tcp://0.0.0.0:61616"); broker.start();
2. 消息队列
在 ActiveMQ 存储信息的逻辑概念容器。消息队列可以从多个生产者那里收到消息,并将其传递给多个消费者。
Queue queue = session.createQueue("my-queue");
3. 主题
发布/订阅模型,消息制作人将消息发布到主题上,感兴趣的消费者可以订阅主题并接收所有发布的消息。
Topic topic = session.createTopic("my-topic");
4. 会话
应用程序与 ActiveMQ 代理之间的通信会话。会话允许应用程序发送和接收信息,以及管理事务。
Session session = broker.createSession();
5. 生产者
将消息发送到消息队列或主题应用程序。
MessageProducer producer = session.createProducer(queue);
6. 消费者
应用程序从消息队列或主题接收消息。
MessageConsumer consumer = session.createConsumer(queue);
7. 事务
一组操作,要么全部执行,要么全部回滚。ActiveMQ 支持事务,确保信息传输的可靠性和一致性。
session.begin(); producer.send(message); session.commit();
8. 持久性
消息的持久性决定了当代理重新启动或出现故障时消息是否会被保留。持久消息将在磁盘上持久,而不是持久消息将丢失。
Message message = session.createTextMessage("Hello world"); message.setPersistent(true); producer.send(message);
9. 桥接
允许未来来自一个 ActiveMQ 代理的消息转发给另一个代理。桥接可用于连接多个代理 ActiveMQ 实例。
<bridge destination="forward.my-topic" source="activemq:topic:my-topic" brokerName="broker-b" />
10. 虚拟机传送
允许在同一个 JVM 内连接 ActiveMQ 两个例子。这是对的在单机环境中进行测试或创建分布式系统非常有用。
BrokerService brokerA = new BrokerService(); BrokerService brokerB = new BrokerService(); brokerA.setVmConnectorURI(brokerB.getVmConnectorURI()); brokerA.setBrokerName("broker-a"); brokerB.setBrokerName("broker-b"); brokerA.start(); brokerB.start();
11. 插件
ActiveMQ 为扩大其功能提供了一种机制。插件可以添加新功能,如新闻存储,安全性或监控。
<plugins> <journalPlugin> <journalDirectory>/tmp/journal</journalDirectory> </journalPlugin> </plugins>
12. 消息转换
ActiveMQ 允许在不同的信息格式之间转换信息。转换器可用于将信息转换为 XML 转换为 JSON,或将文本消息转换为二进制消息。
MessageConverter converter = session.getMessageConverter(); Message message = converter.toMessage("Hello world", session); producer.send(message);
13. 故障转移
使用故障转移代理或集群,确保消息代理故障的可用性。当主代理失败时,备用代理将接管。
<broker cluster="my-cluster"> <networkConnectors> <networkConnector name="tcp" uri="tcp://0.0.0.0:61616" /> </networkConnectors> </broker>
14. 负载均衡
通过将消息负载分布到多个代理,提高可扩展性和性能。ActiveMQ 支持轮询或基于消息大小的轮询。负载均衡策略。
<broker loadBalancingPolicy="round-robin" />
15. 监控
监控 ActiveMQ 为了保证其正常运行和性能,代理非常重要。ActiveMQ 提供了一个 JMX 仪表板和 REST api,用于监控代理状态和信息流。
import org.apache.activemq.broker.jmx.BrokerViewMBean; BrokerViewMBean brokerView = (BrokerViewMBean) MBeanServerFactory.createMBeanServer().getObjectInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")); System.out.println("Message count: " + brokerView.getQueueSize());
16. 安全性
ActiveMQ 提供各种安全机制,包括 SSL、SASL 以及访问控制列表。
<securitySettings> <sslProtocols>TLSv1,TLSv1.1,TLSv1.2</sslProtocols> <requireCredentialsForAllConnections>true</requireCredentialsForAllConnections> <audit>true</audit> </securitySettings>
17. 协议
ActiveMQ 支持各种新闻传递协议,包括 AMQP、JMS 和 STOMP。
BrokerService broker = new BrokerService(); broker.setBrokerName("my-broker"); broker.addConnector("stomp://0.0.0.0:61613"); broker.start();
18. MQTT
MQtT(消息队列遥测传输)是专门为轻量级协议设计的轻量级协议物联网 (IoT) 设备开发。ActiveMQ 支持 MQTT,使其成为连接物联网设备和企业系统的理想选择。
<mqttConnectors> <mqttConnector name="mqtt" persist="true" uri="mqtt://0.0.0.0:1883" /> </mqttConnectors>
19. Web 控制台
ActiveMQ 提供了一个 WEB 控制台允许管理员监控代理状态、管理队列和主题,以及管理插件。
20. 故障排除
ActiveMQ 排除故障涉及检查日志文件,状态 GUI 和 JMX 仪表板。通过仔细分析错误。新闻和日志可以快速诊断和解决问题。