Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server. It supports industry standard protocols so users get the benefits of client choices across a broad range of languages and platforms. Connectivity from C, C++, Python, .Net, and more is available. Integrate your multi-platform applications using the ubiquitous AMQP protocol. Exchange messages between your web applications using STOMP over websockets. Manage your IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers the power and flexibility to support any messaging use-case.
There are currently two “flavors” of ActiveMQ available - the “classic” 5.x broker and the “next generation” Artemis broker. Once Artemis reaches a sufficient level of feature parity with the 5.x code-base it will become ActiveMQ 6. Initial migration documentation is available.
Active MQ 06
Request/Response模型实现
QueueRequestor
同步阻塞
TemporaryQueue
异步监听,当消息过多时会创建响应的临时queue
JMSCorrelationID 消息属性
异步监听,公用queue
调优总结
Topic加强 可追溯消息
http://activemq.apache.org/retroactive-consumer.html
避免topic下错过消息
消费者设置
1 | Destination topic = session.createTopic("tpk?consumer.retroactive=true"); |
Summary of Available Recovery Policies
| Policy Name | Sample Configuration | Description |
|---|---|---|
| FixedSizedSubscriptionRecoveryPolicy | Keep a fixed amount of memory in RAM for message history which is evicted in time order. | |
| FixedCountSubscriptionRecoveryPolicy | Keep a fixed count of last messages. | |
| LastImageSubscriptionRecoveryPolicy | Keep only the last message. | |
| NoSubscriptionRecoveryPolicy | Disables message recovery. | |
| QueryBasedSubscriptionRecoveryPolicy | Perform a user specific query mechanism to load any message they may have missed. Details on message selectors are available here | |
| TimedSubscriptionRecoveryPolicy | Keep a timed buffer of messages around in memory and use that to recover new subscriptions. Recovery time is in milliseconds. | |
| RetainedMessageSubscriptionRecoveryPolicy | Keep the last message with ActiveMQ.Retain property set to true |
保留固定字节的消息
1 | <policyEntry topic=">"> |
保留固定数量的消息
1 | <policyEntry topic=">"> |
保留时间
1 | <subscriptionRecoveryPolicy> |
保留最后一条
1 | <subscriptionRecoveryPolicy> |
慢速消费
SlowConsumerStrategy
对于慢消费者,broker会启动一个后台线程用来检测所有的慢速消费者,并定期的关闭慢消费者。
AbortSlowConsumerStrategy abortConnection:中断慢速消费者,慢速消费将会被关闭。
1 | <slowConsumerStrategy> |
AbortSlowConsumerStrategy maxTimeSinceLastAck:如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。
1 | <slowConsumerStrategy> |
PendingMessageLimitStrategy:消息限制策略(面向慢消费者)
http://activemq.apache.org/slow-consumer-handling
此策略只对Topic有效,只对未持久化订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。为了防止Topic中有慢速消费者,导致整个通道消息积压。
ConstantPendingMessageLimitStrategy:保留固定条数的消息,如果消息量超过limit,将使用消息剔除策略移除消息。
1 | <policyEntry topic="ORDERS.>"> |
PrefetchRatePendingMessageLimitStrategy:保留prefetchSize倍数条消息。
1 | <!-- 若prefetchSize为100,则保留2.5 * 100条消息 --> |
消息堆积内存上涨
- 检查消息是否持久化
- 检查消息 消费速度与生产速度
- 调整xms xmx参数
磁盘满
当非持久化消息堆积到一定程度,ActiveMQ会将非持久化消息写入临时文件,但是在重启的时候不会恢复
当存储持久化数据的磁盘满了的时候
持久化消息
生产者阻塞,消费正常,当消费一部分消息后,腾出空间,生产者继续
非持久化消息
由于临时文件造成磁盘满了,生产者阻塞,消费异常,无法提供服务
开启事务
在发送非持久化消息的时候,可以有效防止消息丢失
prefetchSize影响消费倾斜
慢速消费的时候可以将prefetchSize设为1,每次取一条
prefetchSize造成消费者内存溢出
AUTO_ACKNOWLEDGE造成消息丢失/乱序
消息消费失败后,无法复原消息,可以手动ack 避免broker把消息自动确认删除
receive()方法接受到消息后立即确认
listener 的onmessage方法执行完毕才会确认
手动ack的时候要等connection断开 才会重新推送给其他的consumer,所以有可能会导致消费顺序错乱