博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ深入理解及运用
阅读量:4281 次
发布时间:2019-05-27

本文共 36376 字,大约阅读时间需要 121 分钟。

  • 为什么要用MQ

  1. 解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
  2. 冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化直到它们完全被处理。扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
  3. 削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃。
  4. 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理。
  5. 顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性。
  6. 缓冲:消息中间件提供一定能力的缓冲数据作用,主要是用以消息的发布和订阅异步的进行通行,其运用了持久化于中间件中,对消息进行缓冲作用。
  7. 异步通信:通过把消息发送给消息中间件,消息中间件并不立即处理。
  • RabbitMQ概述

AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.

 根据官网的解释,AMQP 0-9-1(Advanced Message Queuing Protocol)是一种消息传递协议且可编程,它支持符合标准的客户端请求程序与符合标准的消息中间件代理进行通信。AMQP的实体和路由规则都是自定义的,只是规定了queues、exchanges、两者之间的bindings关系以及订阅此queues等,所以这样的方式就会很自由的去发挥AMQP中的这些特性。

以下是AMQP的整体流程图:

Publish path from publisher to consumer via                              exchange and queue

  1. 启动RabbitMQ服务,并且启动Client与之建立Connection连接
  2. 连接建立之后会根据每个Consumer建立Channel通道,用以Publisher和Consumer传送消息
  3. 消息发布者Publisher将message发布至Exchange 
  4. Exchange通过bindings的规则分别Routes至不同的Queue
  5. AMQP的broker传递消息给与Queue关联的Consumers、或者Consumers可以根据不同需求主动去Exchange中拉取消息
  • 重点释义 

 Exchanges

 Exchange其实跟ActiveMQ以及RocketMQ中的Broker中心有点类似,但是更多的侧重于它的字面意义--交换器。生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange再将消息路由到一个或多个Queue,其中根据配置不同的路由规则进行过滤,具体根据什么规则筛选,则是后面要提到的Exchange Types。 

既然消息是通过Exchange推送给Queue的,那怎么准确的定位一个Queue呢,这其中最重要的环节就是Bindings。

Bindings

Bindings是 Exchange用来路由消息到Queues的规则,Bindings可能有一个选择性的Routing Key属性,被某些类型的exchanges使用。Queue必须绑定到Exchange,建立绑定关系,这样Exchange就可以准确无误的定位到Queues。

 Exchange Types

Name Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

Default exchange

Default exchange是一个没有名称的(空字符串)被broker预先申明的Direct exchange。它所拥有的一个特殊属性就是使它用于简单的应用程序:每个创建的queue会与它自动绑定,使用queue名称作为Routing key。

public final static String queueName = "hello_queue";    @Bean    public Queue helloQueue() {        return new Queue(queueName);    }

当申明一个名称为“hello_queue”的queue时,AMQP broker使用“hello_queue”作为routing key将它绑定到default exchange。因此,一条被发布到default exchange并且routing key为"hello_queue"将被路由到名称为"hello_queue"的queue。换句话说,default exchange可以直接传送消息到queue,一般项目中不建议这么简单粗暴。

Direct exchange

Direct exchange根据Routing key完全匹配发送消息。其中两个要素:

1,将routing key的queue与exchange绑定

2,如果新queue中的routing key与之前绑定的routing key(bingding key)相等,exchange则将路由至新的queue中

@Configurationpublic class AdvanceRabbitConfig {    public final static String queueName = "advance_queue";    public final static String routingKey = "advance_routing_key";    public final static String exchangeName = "advance_exchange";    @Bean    public Queue advanceQueue() {        return new Queue(queueName);    }    @Bean    DirectExchange advanceExchange() {        return new DirectExchange(exchangeName);    }    @Bean    Binding bindingDirectExchangeMessage(Queue advanceQueue, DirectExchange advanceExchange) {        return BindingBuilder.bind(advanceQueue).to(advanceExchange).with(routingKey);    }}

 适用场景:如果需要一个message以一种循环的方式发送给Queue消费者,可以使用这种类型,需要注意的是这里分发给不同的Queue不是对Queue进行的负载均衡,而是对消费者Consumer进行的选择,Queue是同一个。

 Fanout exchange

Fanout exchange所有发送到该Exchange的消息路由到所有与它绑定的Queue中,忽略routing key。Fanout exchange是广播路由的最佳选择。

@Configurationpublic class FanoutRabbitConfig {    public final static String fanoutQueueName = "fanout_queue";    public final static String fanoutExchangeName = "fanout_exchange";    @Bean    public Queue fanoutQueue() {        return new Queue(fanoutQueueName);    }    @Bean    public FanoutExchange fanoutExchange(){        return new FanoutExchange(fanoutExchangeName);    }    @Bean Binding bindingFanoutExchangeMessage(Queue fanoutQueue, FanoutExchange fanoutExchange){        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);    }}

适用场景

  • 大量的多用户在线(multi-player online MMO)游戏使用它更新排行榜或者其他的全体事件,王者荣耀和CFM的VIP用户的大喇叭功能
  • CMS属性的门户网站及APP客户端的新闻推送
  • 分布式系统可以广播各种状态与配置更新
  • IM实时群聊也是其引用场景

Topic exchange

Topic exchange将消息路由到一个或者多个Queue,如果说Direct是精确匹配的话,那Topic就是模糊匹配,可以通过通配符满足一部分规则就可以传送,命名一般有如下约定:

 

当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。 

  1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“topic.notice”、“sxf.routing.topic”、“top.sxf.rabbit”
  2. binding key与routing key一样也是句点号“. ”分隔的字符串
  3. binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
@Configurationpublic class TopicRabbitConfig {    public final static String topicQueueName = "sxf.topic.notice";    public final static String topicRoutingKey = "topic.routing.key";    public final static String topicExchangeName = "topic.exchange";    @Bean    public Queue topicQueue() {        return new Queue(topicQueueName);    }    @Bean    public TopicExchange topicExchange(){        return new TopicExchange(topicExchangeName);    }    @Bean Binding bindingTopicExchangeMessage(Queue topicQueue, TopicExchange topicExchange) {        return BindingBuilder.bind(topicQueue).to(topicExchange).with(topicRoutingKey);    }}

适用场景: 

  • 针对部分区域性数据进行的更新,如地域性的销售网点或者分公司的调整等
  • 由多个工作者完成的后台任务处理,每个都能够负责处理指定的任务
  • 库存价格更新(更新其他的财务数据)
  • 包含分类与标签的新闻更新(例如只针对某一个特定的运动或团队)
  • 不同种类的云服务编制
  • 分布式结构/特定操作系统软件的构建与包装,每个处理者只能处理一个结构或者系统

Headers exchange

 Headers exchange为在多个属性进行路由而设计的,这些属性更容易描述为消息头,而不是routing key。headers exchanges忽略routing key属性,相反用于路由的属性是从headers属性中获取的。如果消息头的值等于指定的绑定值,则认为消息是匹配的。

当“x-match”参数的值被设为“any”,只要一个匹配的header值就足够了。相反的,设置“x-match”的值为“all”需要所有的headers值匹配。

@Configurationpublic class HeadersRabbitConfig {    public final static String headersQueueName = "headers_queue";    public final static String headersExchangeName = "headers_exchange";    @Bean    public Queue headersQueue() {        return new Queue(headersQueueName);    }    @Bean    public HeadersExchange headersExchange(){        return new HeadersExchange(headersExchangeName);    }    @Bean Binding bindingHeadersExchangeMessage(Queue headersQueue, HeadersExchange headersExchange) {        Map
headers = new HashMap<>(); headers.put("key1", "sxf_header1"); headers.put("key2", "sxf_header2"); //whereAny表示部分匹配,whereAll表示全部匹配 Binding binding = BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match(); return binding; }}

综上所述,总结下特点:

Exchange Type 特点描述
Direct routing key == binding key,严格直接消费
Fanout 把所有Exchange接收的消息分发到所有与之匹配的Queue上
Topic 根据rout规则模糊匹配到不同的Queue
Headers 跟routing key和binding key无关,根据消息的headers属性进行匹配

Queue 

Queue 队列用以存储应用的消息实体,在使用之前必须先声明,如果不存在可以创建一个Queue,如果已经存在某个Queue,且跟将要创建的Queue属性相同,则不进行再次创建,相同的Channel中,使用空字符串通过以下声明队列名会新创建一个队列绑定到当前通道。

String queueName = channel.queueDeclare().getQueue();

以下是没有声明Queue的前提下去监听这个【654】的队列,就会报没有找到该队列错误。 

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method
(reply-code=404, reply-text=NOT_FOUND - no queue '654' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)

 Queue 有几个附加属性:

1,Name:开发者可以为Queue自定义名称也可以使用broker自动分配名称,名称最多可达255字节,自动分配的是以“amp.”开头的,建议取有规则意义的名称命名,且使用“.”和“_”连接符,识别度高。

2,Durable:Queue的持久化是写入磁盘文件中的,所以broker重启后消息还存在,非持久的消息是暂存在内存中的,需要注意的一点不是所有的消息都要持久化,持久化必然会带来性能上的影响,根据实际需求而定。

3,Exclusive:排他性。排他队列仅对首次声明它的连接可见,并在连接断开后自动删除。需要注意以下几点:

3.1,连接。排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。

3.2,独有。如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。

3.3,自我。即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。

4,Auto delete:如果设置为Yes,则该队列在所有消费者都没有订阅后会自动删除。一般用于临时性的队列。

5,Arguments:实现ttl的一些附加属性,可以单独设置。

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间:

AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);

Auto Expire(x-expires):当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length):限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

Max Length Bytes(x-max-length-bytes):限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小,Features=Lim B

Dead letter exchange(x-dead-letter-exchange):当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费

Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator):主节点队列,用以保证消息的FIFO排序所设置的分布策略,主要有三种方式,min-masters(最小主机数量)、client-local(客户端节点)以及random(随机选取)

 Consumer And Message

消息订阅者通过两种方式进行消费消息,一个是监听队列后被动接收推送的消息,一个是主动去拉取消息。提到消息的消费有几种处理方式需要了解下:

如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息依次被不同的订阅者循环接收。

[INFO ] 2019-01-04 12:32:56,975 --http-nio-1111-exec-1-- [com.jikeh.topic.TopicSender] Sender : topicMessage Fri Jan 04 12:32:56 CST 2019 [INFO ] 2019-01-04 12:32:56,989 --SimpleAsyncTaskExecutor-1-- [com.jikeh.topic.TopicReceiver1] TopicReceiver1  : topicMessage Fri Jan 04 12:32:56 CST 2019 [INFO ] 2019-01-04 12:32:59,327 --http-nio-1111-exec-2-- [com.jikeh.topic.TopicSender] Sender : topicMessage Fri Jan 04 12:32:59 CST 2019 [INFO ] 2019-01-04 12:32:59,330 --SimpleAsyncTaskExecutor-1-- [com.jikeh.topic.TopicReceiver2] TopicReceiver2  : topicMessage Fri Jan 04 12:32:59 CST 2019 [INFO ] 2019-01-04 12:33:02,378 --http-nio-1111-exec-3-- [com.jikeh.topic.TopicSender] Sender : topicMessage Fri Jan 04 12:33:02 CST 2019 [INFO ] 2019-01-04 12:33:02,383 --SimpleAsyncTaskExecutor-1-- [com.jikeh.topic.TopicReceiver3] TopicReceiver3  : topicMessage Fri Jan 04 12:33:02 CST 2019 [INFO ] 2019-01-04 12:33:06,805 --http-nio-1111-exec-4-- [com.jikeh.topic.TopicSender] Sender : topicMessage Fri Jan 04 12:33:06 CST 2019 [INFO ] 2019-01-04 12:33:06,809 --SimpleAsyncTaskExecutor-1-- [com.jikeh.topic.TopicReceiver1] TopicReceiver1  : topicMessage Fri Jan 04 12:33:06 CST 2019

消息的acknowledged属性,可以隐式的配置自动确认antoAck为true,也可以显式的调用baseack手动确认,这个确认ack仅仅是通知Server可以安全的删除该消息,而不是通知生产者,生产者可以调用ConfirmCallback获取是否已经发送到Exchange,如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。

消息发送者:

@Slf4j@Componentpublic class FanoutSender {    @Autowired    private RabbitTemplate rabbitTemplate;    public void send(String queueStr) {        String context = queueStr + " " + new Date();        log.info("Sender : {}", context);        // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中        rabbitTemplate.setConfirmCallback(confirmCallback);        Object o = this.rabbitTemplate.convertSendAndReceive(queueStr, context);        if(null != o) log.info("ask====={}", o.toString());    }    //回调函数: confirm确认    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {        @Override        public void confirm(CorrelationData correlationData, boolean ack, String cause) {            log.info("correlationData: {}", correlationData);            if(ack){                //如果confirm返回成功 则进行更新                log.info("confirm返回成功");            } else {                //失败则进行具体的后续操作:重试 或者补偿等手段                log.info("confirm返回失败");            }        }    };}

消息订阅者:

@Slf4j@Component@RabbitListener(queues = FanoutRabbitConfig.fanoutQueueName)public class FanoutReceiver1 {    @RabbitHandler    public void process(String messageStr, Message message, Channel channel) throws IOException {        //如果出现异常,会由另一个Receiver处理//        int a = 1/0;        log.info("FanoutReceiver1  : {}", messageStr);        //ACK,确认一条消息已经被消费。不然的话,在rabbitmq首页会有Unacked显示为未处理数1.//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        log.info("receiver success");    }}

 当打开订阅者的ACK确认时,输出信息完整

[INFO ] 2019-01-04 14:11:30,666 --http-nio-1111-exec-1-- [com.jikeh.fanout.FanoutSender] Sender : fanout_queue Fri Jan 04 14:11:30 CST 2019 [INFO ] 2019-01-04 14:11:30,761 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-04 14:11:30,761 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-04 14:11:30,768 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] FanoutReceiver1  : fanout_queue Fri Jan 04 14:11:30 CST 2019 [INFO ] 2019-01-04 14:11:30,768 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] receiver success

当注释掉订阅者的ACK确认时,发送消息后并没有消费掉消息也就是说没有得到确认结果,该消息挂起 

[INFO ] 2019-01-04 14:17:24,813 --http-nio-1111-exec-1-- [com.jikeh.fanout.FanoutSender] Sender : fanout_queue Fri Jan 04 14:17:24 CST 2019 [INFO ] 2019-01-04 14:17:24,853 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-04 14:17:24,853 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功

 重启服务以后,所有没有确认的消息都要进行再次的消费,这也是持久性的一个特点:

[INFO ] 2019-01-04 14:22:01,209 --SimpleAsyncTaskExecutor-1-- [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Created new connection: rabbitConnectionFactory#5f40d1e:0/SimpleConnection@349171ca [delegate=amqp://guest@172.16.133.138:5672/, localPort= 57767] [INFO ] 2019-01-04 14:22:01,399 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] FanoutReceiver1  : fanout_queue Fri Jan 04 14:16:00 CST 2019 [INFO ] 2019-01-04 14:22:01,399 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] receiver success [INFO ] 2019-01-04 14:22:01,403 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] FanoutReceiver1  : fanout_queue Fri Jan 04 14:17:24 CST 2019 [INFO ] 2019-01-04 14:22:01,403 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] receiver success

那么问题来了,如果消费者没有确认,也没有订阅者,那就需要消息的Rejecting属性了,Client可以发起拒绝命令,当拒绝一条消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然也可以通过ack,让消息服务器直接删除该消息并且不会重传,如果想无限循环消费,要确保没有重复的拒绝并且要将消息重新传入下一个订阅者,也就是requeue=true。

Connections

AMQP的流程中第一步建立连接,这样的Connection是长期存活的,这个长期针对RabbitMQ的服务而言,AMQP是一个应用级协议,它使用TCP保持稳定传输。AMQP连接使用身份认证并且可以使用TLS(SSL)来保护连接。当应用程序不再需要连接到AMQP broker时,它应该优雅的关闭AMQP连接而不是突然关闭底层的TCP连接。所以在管理页面我们通常看到一个IP段的Connection,并不是随着生产者订阅者的断开连接而中断,这其中最重要的关系就是建立连接后的通道Channel。

Channels

Channels 是为了避免AMQP频繁的多连接而生,TCP的多个连接或者频繁调用会消耗系统资源并且让配置防火墙更加繁琐,所以Channels 的多通路复用就很好的解决了这种问题,共享单个TCP连接的轻量级连接。

Virtual Hosts

Vhosts也是AMQP的一个基础概念,连接到RabbitMQ默认就有一个名为"/"的vhost可用,本地调试的时候可以直接使用这个默认的vhost.这个"/"的访问可以使用guest用户名(密码guest)访问。可以使用rabbitmqctl工具修改这个账户的权限和密码,这在生产环境是必须要关注的。出于安全和可移植性的考虑,一个vhost内的exchange不能绑定到其他的vhost。

可以按照业务功能组来规划vhost,在集群环境中只要在某个节点创建vhost就会在整个集群内的节点都创建该vhost。VHost和权限都不能通过AMQP协议创建,在RabbitMQ中都是使用rabbitmqctl进行创建、管理。

 

  • 深度剖析 

@RabbitListener 

@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。

有两种方式配置RabbitListener,一种使用RabbitListener标注类且使用RabbitHandler标注方法,另一种使用RabbitListener标注方法,两种模式同时存在也可以。使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法

@Slf4j@Componentpublic class FanoutReceiver3 {    @RabbitListener(queues = FanoutRabbitConfig.fanoutQueueName)    public void process(String message) {        log.info("FanoutReceiver3  : {}", message);    }}
@Slf4j@Component@RabbitListener(bindings = {@QueueBinding (value = @Queue(value = TopicRabbitConfig.topicQueueName, durable = "true"),        exchange = @Exchange(value = TopicRabbitConfig.topicExchangeName, durable = "true", type = "topic"),        key = "*.routing.*")})public class TopicReceiver2 {    @RabbitHandler    public void process(String messageStr, Channel channel) throws IOException {        log.info("TopicReceiver2  : {}", messageStr);        String queueName = channel.queueDeclare().getQueue();        log.info("=======queue:{}", queueName);    }}

RabbitListenerAnnotationBeanPostProcessor类用于解析RabbitListener注解,该类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton,具有以下特性: 

  1. 基于Spring,@RabbitListeners必须可以被实例化,可以通过@Bean或者@Component注入
  2. 可以获取BeanFactory的Bean对象和环境变量等
  3.  Ordered的使用说明可以配置优先级priority
  4. SmartInitializingSingleton单例Bean创建后执行
  5. BeanPostProcessor说明解析的入口是postProcessAfterInitialization
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {        Class
targetClass = AopUtils.getTargetClass(bean); RabbitListenerAnnotationBeanPostProcessor.TypeMetadata metadata = (RabbitListenerAnnotationBeanPostProcessor.TypeMetadata)this.typeCache.get(targetClass); if (metadata == null) { //获取需要监听的Method metadata = this.buildMetadata(targetClass); this.typeCache.putIfAbsent(targetClass, metadata); } RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[] var5 = metadata.listenerMethods; int var6 = var5.length; for(int var7 = 0; var7 < var6; ++var7) { RabbitListenerAnnotationBeanPostProcessor.ListenerMethod lm = var5[var7]; RabbitListener[] var9 = lm.annotations; int var10 = var9.length; for(int var11 = 0; var11 < var10; ++var11) { RabbitListener rabbitListener = var9[var11]; //处理监听数据 this.processAmqpListener(rabbitListener, lm.method, bean, beanName); } } if (metadata.handlerMethods.length > 0) { this.processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; }

以下是解析监听处理,根据源码可以得知,@RabbitListener中可使用SpEL表达式,即可以使用#{queue},对Listener进行的数据解析和声明Exchange以及Binding等

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) {        endpoint.setBean(bean);        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);        //设置ID        endpoint.setId(this.getEndpointId(rabbitListener));        //解析队列        endpoint.setQueueNames(this.resolveQueues(rabbitListener));        //分组        String group = rabbitListener.group();        if (StringUtils.hasText(group)) {            Object resolvedGroup = this.resolveExpression(group);            if (resolvedGroup instanceof String) {                endpoint.setGroup((String)resolvedGroup);            }        }        //队列独占        endpoint.setExclusive(rabbitListener.exclusive());        //优先级        String priority = this.resolve(rabbitListener.priority());        if (StringUtils.hasText(priority)) {            try {                endpoint.setPriority(Integer.valueOf(priority));            } catch (NumberFormatException var14) {                throw new BeanInitializationException("Invalid priority value for " + rabbitListener + " (must be an integer)", var14);            }        }        //查找admin        String rabbitAdmin = this.resolve(rabbitListener.admin());        if (StringUtils.hasText(rabbitAdmin)) {            Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");            try {                endpoint.setAdmin((RabbitAdmin)this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));            } catch (NoSuchBeanDefinitionException var13) {                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" + rabbitAdmin + "' was found in the application context", var13);            }        }        //查找containerFactory        RabbitListenerContainerFactory
factory = null; String containerFactoryBeanName = this.resolve(rabbitListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = (RabbitListenerContainerFactory)this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException var12) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", var12); } } //将endpoint关联到containerFactory中 this.registrar.registerEndpoint(endpoint, factory);}//解析队列private String[] resolveQueues(RabbitListener rabbitListener) { String[] queues = rabbitListener.queues(); QueueBinding[] bindings = rabbitListener.bindings(); //queues和bindings都配置会抛出异常 if (queues.length > 0 && bindings.length > 0) { throw new BeanInitializationException("@RabbitListener can have 'queues' or 'bindings' but not both"); } else { List
result = new ArrayList(); if (queues.length <= 0) { return this.registerBeansForDeclaration(rabbitListener); } else { for(int i = 0; i < queues.length; ++i) { //队列名称可为SpEL表达式,如#{myQueue},注意不能带.,如my.queue(可能会报错) Object resolvedValue = this.resolveExpression(queues[i]); //解析队列名称,resolvedValue可为String[],Queue,String,最终都会解析为String,存放到result中 this.resolveAsString(resolvedValue, result); } return (String[])result.toArray(new String[result.size()]); } }}//声明注册的队列信息private String[] registerBeansForDeclaration(RabbitListener rabbitListener) { List
queues = new ArrayList
(); if (this.beanFactory instanceof ConfigurableBeanFactory) { for (QueueBinding binding : rabbitListener.bindings()) { //声明队列 String queueName = declareQueue(binding); queues.add(queueName); //声明Exchange和Binding declareExchangeAndBinding(binding, queueName); } } return queues.toArray(new String[queues.size()]);}

死信队列、延迟队列

 DLX:Dead-Letter-Exchange,死信队列,是指当消息在一个队列中变成死信(dead message)后,会被重新publish到另外一个Exchange,这个Exchange就是DLX。

产生消息死信有以下三种情况:

  1. 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  2. 消息TTL过期
  3. 队列达到最大长度

 

 注入DLX死信队列

@Configurationpublic class DLXRabbitConfig {    public final static String DLX_EXCHANGE = "DLX_EXCHANGE";    public final static String DLX_QUEUE = "DLX_QUEUE";    public final static String REDIRECT_QUEUE = "REDIRECT_QUEUE";    public final static String DLX_ROUTING_KEY = "DLX_ROUTING_KEY";    public final static String REDIRECT_ROUTING_KEY = "REDIRECT_ROUTING_KEY";    @Bean    public Queue deadLetterQueue(){        Map
arguments = new HashMap<>(2); //x-dead-letter-exchange 死信交换机 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //x-dead-letter-routing-key 死信路由键,注意这里是死信队列key以外的路由key arguments.put("x-dead-letter-routing-key", REDIRECT_ROUTING_KEY); return new Queue(DLX_QUEUE, true, false, false, arguments); } @Bean public DirectExchange deadLetterExchange(){ return new DirectExchange(DLX_EXCHANGE); } /** * 死信队列转发队列 * @return */ @Bean public Queue redirectQueue() { return new Queue(REDIRECT_QUEUE); } @Bean Binding bindingDLXExchangeMessage(Queue deadLetterQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DLX_ROUTING_KEY); } @Bean Binding bindingExchangeMessageRe(Queue redirectQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(redirectQueue).to(deadLetterExchange).with(REDIRECT_ROUTING_KEY); }}

消息发送者 

@Slf4j@Componentpublic class DLXSender {    @Autowired    private RabbitTemplate rabbitTemplate;    public void send(){        String context = "deadLetterMessage " + new Date();        log.info("Sender : " + context);        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        //声明消息处理器        MessagePostProcessor messagePostProcessor = message -> {            MessageProperties messageProperties = message.getMessageProperties();            //设置编码            messageProperties.setContentEncoding("utf-8");            //设置过期时间10*1000毫秒            messageProperties.setExpiration("10000");            return message;        };        // 向DLX_QUEUE 发送消息  10*1000毫秒后过期 形成死信        rabbitTemplate.convertAndSend(DLXRabbitConfig.DLX_EXCHANGE, DLXRabbitConfig.DLX_ROUTING_KEY, context, messagePostProcessor, correlationData);    }}

消息订阅者(监听重定向队列) 

@Slf4j@Componentpublic class DLXReceiver {    //订阅重定向的队列    @RabbitListener(queues = DLXRabbitConfig.REDIRECT_QUEUE)    public void process(Message message, Channel channel) throws IOException {//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        log.info("DLXReceiver  10s 后 消费消息 {}", new String(message.getBody()));    }}

 结果

[INFO ] 2019-01-07 16:04:40,836 --http-nio-1111-exec-2-- [com.jikeh.dlx.DLXSender] Sender : deadLetterMessage Mon Jan 07 16:04:40 CST 2019 [INFO ] 2019-01-07 16:04:50,851 --SimpleAsyncTaskExecutor-1-- [com.jikeh.dlx.DLXReceiver] DLXReceiver  10s 后 消费消息 deadLetterMessage Mon Jan 07 16:04:40 CST 2019

 死信队列是相对于当前队列不可达,并不是一定不可消费的,以上的一种重定向队列就是通过REDIRECT_ROUTING_KEY获取到这个死信队列消费

死信队列可以用于对一些特定要求的日志、报警、敏感处理逻辑和超时逻辑进行的监听和确认处理。由于重定向队列其实就是一个延迟队列,所以也可以对一些需要延迟处理的需求进行封装等。

持久化

首先想为什么要持久化?持久化一定是有保障的吗?

带着这两个疑问我们来看,正常情况下,我们使用MQ是为了解决我们的异步处理,而异步处理后的逻辑又需要严格执行,即用MQ实现一个异步的同步流程,但因为各种原因,导致MQ积累消息过多,势必会造成内存不足,如果这时再来个网络波动或者MQdown掉,那结果太美,不敢想象,所以就需要消息一定程度的持久化,来保证我们的消息必达,所以至少有两点来要求我们为什么持久化消息,但是持久化也不一定有保障,因为这涉及到MQ的配置,即便配置无误,因为持久化是要落地的,落地的过程是写磁盘,如果在写盘的时候出现问题,那也会出现丢消息的情况,就要引入集群的可用性部署了,综上所述:

  • 内存紧张,需要将队列中部分消息写入磁盘
  • 业务要求,需要消息尽可能的被有效消费
  • 集群部署,需要对持久化做高可用的支撑

持久化设置

上文提到Queue有个属性是Druable,其实并非只有queue才有这个属性,Exchange和Message也有这个属性。

如果设置queue的druable=true,代表该队列是持久化的,当服务重启后,该队列也存在,是因为持久化到磁盘中,重启后会再次执行该队列,但是队列的持久化并不代表消息会被持久化,如果Message没有被持久化,即便该队列持久了,消息还没有发出去,重启后该消息就可能丢失了;但是如果队列和消息都设置持久化了,这样是不是就可以了?显然是不行的,因为还有个大前提,Exchange,这是队列的依赖,如果Exchange不是持久化的,那队列和消息没意义,所以要都设置为持久化

//声明一个交换器,第三个参数是druablechannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);//声明一个队列,第二个参数是druablechannel.queueDeclare(queueName, true, false, false, null);//消息(deliveryMode=1代表不持久化,deliveryMode=2代表持久化)MessageProperties.PERSISTENT_TEXT_PLAIN的BasicProperties类属性(deliveryMode=2)

 丢失消息的几种情况和处理方式

  • 没有正确配置MQ的Druable属性,需要exchange、queue、message都设置持久化,如果其中之一或者两个设置是有可能丢失的
  • 不清楚是发送消息丢失还是消费消息丢失,所以需要引入发送确认回执机制和消费手动确认配置
  • 保证MQ在持久化到磁盘的过程有效,考虑引入MirroredQueue(镜像队列),类似于Redis的master和salve主备策略,也是主从复制,如果master挂掉,推举一个salve为主节点,有效保障HA(高可用)
  • 在生产者确认消息发送成功确认的情况下,可以在消费端进行先入库后操作的处理,也就是将接收到的消息先缓存至业务前置DB中,然后异步处理之后的业务逻辑,可以有效降低因消费链路过长或者异常导致的消息丢失现象

注意:引入以上持久化的可靠性,势必造成性能上的影响,鱼和熊掌不可兼得,关键在于选择和取舍 

限流策略

 正常情况下,当发送若干消息后,会有消息订阅者去消费发送的消息,但如果出现宕机或者N多条消息没有消费到,还有一种情况是打开手动ack机制,且没有去确认ack,导致MQ里持久化了很多条消息,这样如果再次重启服务或者重新有一个订阅者监听到了这个Queue,那么就会出现N条同时涌入该订阅者,导致出现类似于“穿透”的现象以至于挂掉等一系列无法处理过来的问题,那么如何避免这种结果就引入了限流策略

前提是ack要配置为手动确认

# 开启ACKspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.template.mandatory=true

 

#限流的设置#springboot配置:spring.rabbitmq.listener.simple.prefetch=2//手动配置://参数一: 消息的大小不做任何限制 参数二: 服务器给的最大的消息数,这里是一条一条的消费  参数三: 级别为consumer//prefetCount 为 1, 一次消费一条消息,如果消费者没有确认消费,将不会接受生产者给的消息channel.basicQos(0, 1, false);
[INFO ] 2019-01-10 21:23:05,527 --http-nio-1111-exec-2-- [com.jikeh.fanout.FanoutSender] Sender : fanout_queue Thu Jan 10 21:23:05 CST 2019 [INFO ] 2019-01-10 21:23:05,588 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:05,588 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:10,603 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:10,604 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:10,608 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] FanoutReceiver1  : fanout_queue Thu Jan 10 21:23:05 CST 2019 [INFO ] 2019-01-10 21:23:10,608 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] receiver1 success [INFO ] 2019-01-10 21:23:15,615 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:15,616 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:15,616 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] FanoutReceiver1  : fanout_queue Thu Jan 10 21:23:05 CST 2019 [INFO ] 2019-01-10 21:23:15,616 --SimpleAsyncTaskExecutor-1-- [com.jikeh.fanout.FanoutReceiver1] receiver1 success [INFO ] 2019-01-10 21:23:20,629 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:20,629 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:25,640 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:25,640 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:30,650 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:30,650 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:35,661 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:35,661 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:40,670 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null [INFO ] 2019-01-10 21:23:40,671 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] confirm返回成功 [INFO ] 2019-01-10 21:23:45,681 --AMQP Connection 172.16.133.138:5672-- [com.jikeh.fanout.FanoutSender] correlationData: null

 根据日志可发现,以上是配置了两条预读,以下是配置了一条预读,当没有得到手动确认ack时就会出现各自预读自己的消息后不再接受生产者的消息了,有效避免N条消息同时涌入一个订阅者。

[INFO ] 2019-01-10 21:23:05,597 --pool-1-thread-4-- [com.jikeh.fanout.QpsReceiver] -----------consume message---------- [INFO ] 2019-01-10 21:23:05,607 --pool-1-thread-4-- [com.jikeh.fanout.QpsReceiver] consumerTag: amq.ctag-4eG7zuYrHOzf7lScPfsqHw [INFO ] 2019-01-10 21:23:05,607 --pool-1-thread-4-- [com.jikeh.fanout.QpsReceiver] envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=fanout_queue) [INFO ] 2019-01-10 21:23:05,608 --pool-1-thread-4-- [com.jikeh.fanout.QpsReceiver] properties: #contentHeader
(content-type=text/plain, content-encoding=UTF-8, headers={spring_listener_return_correlation=51eb6881-0c31-4169-8a17-86036021f616, spring_request_return_correlation=1}, delivery-mode=2, priority=0, correlation-id=null, reply-to=amq.gen-S8q_KY60JxvesVI2BpfnaQ, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) [INFO ] 2019-01-10 21:23:05,608 --pool-1-thread-4-- [com.jikeh.fanout.QpsReceiver] body: fanout_queue Thu Jan 10 21:23:05 CST 2019
  • 版本差异

当前是基于RabbitMQ3.2.2,Erlang / OTP R16B03,RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器,所以在升级MQ的版本同时要留意下Erlang的版本变动,当然这个会在官方日志中体现出来,如: 

升级Erlang之前先升级RabbitMQ

 When upgrading to this release and upgrading Erlang to 21.0 at the same time, extra care has to be taken.

Since CLI tools from RabbitMQ releases older than 3.7.7 will fail on Erlang 21, RabbitMQ must be upgraded before Erlang. 

 每个大的版本都会有一些比较较大的改动,如3.0+后改动了端口15672

move management port out of the ephemeral range (which could result in the management plug-in failing to start). The new default port is 15672, with a redirect in place from 55672.

如3.2的增强性功能

enhancements

25063 support arrays in web interface for arguments, policies and headers
25598 display queue paging information
25711 improve handling of defaults in config file by rabbitmqadmin (thanks to Simon Lundstr枚m)
25747 de-emphasise internal federation queues and exchanges
25778 introduce 'policymaker' tag, permitting policy & parameter operations without being full administrator
25616 more readable number formatting in graph labels
25641 permit turning tracing on/off using the HTTP API
25811 add support for web UI authentication via the initial URI
25792 optimise monitoring of file descriptors on OS X

还有很多版本change的日志可以根据官方文档去查看:

  • Demo

以上示例都是基于Spring的注解发送和消费MQ,以下是原生的MQ处理逻辑:

消费生产者ProducerTest

@Slf4jpublic class ProducerTest {    private final static String EXCHANGE_NAME = "fanout_exchange_test";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setVirtualHost("/");        factory.setPort(AMQP.PROTOCOL.PORT);        factory.setHost("172.16.133.138");        factory.setUsername("guest");        factory.setPassword("guest");        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);        //定义一组路由键        String routingKey = "limit.key";        //要发送的消息        String message = "Hello world";        /**         * 发送消息到交换器上         * 参数1:交换器的名字         * 参数2:路由键         * 参数3:BasicProperties         * 参数4:要发送的消息         */        channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());        log.info("Sent {}:{}", routingKey, message);        channel.close();        connection.close();    }}

消息消费者ConsumerTest

@Slf4jpublic class ConsumerTest {    private final static String EXCHANGE_NAME = "fanout_exchange_test";    public static void main(String[] args)throws IOException, Exception {        // 1、创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setVirtualHost("/");        factory.setPort(AMQP.PROTOCOL.PORT);        factory.setHost("172.16.133.138");        factory.setUsername("guest");        factory.setPassword("guest");        // 2、创建连接        Connection connection = factory.newConnection();        // 3、获取通道        Channel channel = connection.createChannel();        // 4、声明        String routingKey = "limit.key";        String queueName = "fanout_queue_test";        // 5、声明一个交换器        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);        // 6、声明一个队列        channel.queueDeclare(queueName, true, false, false, null);        // 7、绑定队列到交换器        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);        log.info("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                    AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                log.info("Accept:{}:{}", envelope.getRoutingKey(), message);            }        };        // TODO 限流的设置        // TODO 参数一: 消息的大小不做任何限制 参数二: 服务器给的最大的消息数,这里是一条一条的消费  参数三: 级别为consumer        // TODO prefetCount 为 1, 一次消费一条消息,如果消费者没有确认消费,将不会接受生产者给的消息        // channel.basicQos(0, 1, false);        // 9、消费者,要想做限流必须将自动ack设置为false//        channel.basicConsume(queueName, false, new QpsReceiver(channel));        //9.自动确认:autoAck参数为true        channel.basicConsume(queueName,true, consumerB);    }}

先启动ConsumerTest用以监听消息,然后启动ProducerTest,运行结果:

--main-- [com.jikeh.fanout.ProducerTest] Sent limit.key:Hello world
--main-- [com.jikeh.fanout.ConsumerTest] Waiting message....... --pool-1-thread-4-- [com.jikeh.fanout.ConsumerTest] Accept:limit.key:Hello world

更多原生demo可参看

以上就是本人对RabbitMQ的一些理解和运用,有错误的地方不吝赐教以使纠正,本文提到的很多都是一些MQ的相对基础的深入理解,很多点都需要摊开着重去深入探究的,如RabbitMQ的事务机制或者Confirm机制,高可用的分布式部署策略,RabbitMQ的任务分发等等。

你可能感兴趣的文章
usb BC1.2的三种端口
查看>>
linux下的"BusHound"——usb_mon非常好
查看>>
linux usb枚举过程分析【host】
查看>>
android之通过USB插拔流程来了解android UEvent
查看>>
[RK3288][Android6.0] USB 枚举过程小结
查看>>
CarPlay简介
查看>>
CarPlay介绍
查看>>
CarPlay wireless(蓝牙+WiFi)连接方案(蓝牙部分)
查看>>
CarPlay wireless(蓝牙+WiFi)连接方案(Wi-Fi部分)
查看>>
CarPlay wired连接与wireless连接相互切换
查看>>
USB linux NCM usbnet驱动详解
查看>>
USB OTG规范的SRP和HNP协议
查看>>
usb驱动的层次结构简述
查看>>
控制Linux内核启动中的打印
查看>>
创建一个简单的debugfs文件系统节点
查看>>
创建一个procfs文件系统的节点
查看>>
高通平台手机开发之Sensor
查看>>
Android4.x 如何处理Power按键
查看>>
创建一个简单的device_create_file文件节点
查看>>
android linux 休眠 深度睡眠 查看 方法 调试
查看>>