RabbitMQ的基础配置及使用
注意,以下所有内容均为个人理解,且建立在具体的场景上,可能不适应所有情况
RabbitMQ
可以在允许最终一致性的场景下,将高并发请求简单化,也就是将高并发的请求处理成一条有序的消息队列,每一个请求都可以看作是一条消息,服务接口会绑定这条队列,按照消息进队列顺序依次处理消息
用在订单服务这种用户不需要立刻拿到反馈的场景简直绝配。
工作流程
1.生产者(Publisher)生产消息,通过一条长连接将消息送到消息代理(Broker)
2.消息由消息头和消息体组成,其中消息体中包含了一个路由(route-key),决定着该消息会进入哪一个队列
3.消息代理先接收消息,再将将消息送到交换机
4.交换机通过消息的消息头包含的key-route,将消息送入对应的队列
5.消费者(Consumer)通过一条长连接与消息代理连接
6.消费者中的不同服务通过不同的信道,监听并接收不同队列的消息
几种交换机
1.director:点对点模式,使用route-key绑定队列,并且route-key精确匹配
2.headers:点对点模式,属于JMS的一种,已淘汰
3.fanout:广播模式,交换机中所有队列都会收到
4.topic:发布订阅模式,同样是通过route-key匹配,但是允许使用通配符部分匹配,匹配到的队列即是订阅了生产者的队列
director和fanout没什么好说的,topic需要注意,该种交换机绑定队列时同样需要声明route-key,例如:
1 | #.aaa (队列1) |
其中通配符#表示该处可以是任意个任何字符,但是必须要有字符。除此之外路由键还完全需要满足对应的形式。
使用docker安装RabbitMQ
直接启动rabbitmq:management
1 | docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management |
没有镜像不用慌,docker会自动联网下载镜像,请确保网络通畅。
启动时,必须按照相同的端口映射,启动后访问本机(linux设备)的15672端口即可抵达rabbitmq的控制台。
SpringBoot整合RabbitMQ
添加依赖
在要使用RabbitMQ的服务中直接引入其启动依赖
1 | <dependency> |
配置RabbitMQ
在服务的application中添加上:
1 | spring: |
分别表示配置RabbitMQ所在的ip、端口,和使用的虚拟主机(默认应为/)
启动类上开启RabbitMQ功能
也即是加上注解
1 |
至此配置完毕
在SpringBoot中使用RabbitMQ
声明各种MQ组件
在SpringBoot中,可以使用AmpqAdmin类来进行MQ组件的创建,一般使用其自带的declare系列方法进行创建。
声明一个交换机
测试方法为:
1 |
|
先创建一个交换机,名称为spring.test01.directExchange,设定为持久化、不自动删除。amqpAdmin的declareExchange方法,用于“宣称”也即是创建一个交换机。
声明一个队列
测试方法为:
1 |
|
Queue的对象构造方法参数有四个:名字、是否持久化、是否排他、是否自动删除
声明一个绑定关系
测试方法为:
1 |
|
首先明确,在RabbitMQ中,绑定一定有交换机参与的,可以是交换机之间绑定,也可以是交换机与队列的绑定,但是不允许发生队列与队列之间的绑定,也可以理解,毕竟任何消息要从交换机发送出去,队列本身是消息接收者,在没有交换机的前提下自身是不可能产生消息的,队列间的绑定因此是没有意义的
发送消息
简单的消息发送
使用RabbitTemplate的方法,我们可以使用创建并发送消息的convertAndSend方法。
测试方法如下:
1 |
|
参数先后是:交换机、路由键、消息
序列化输出Obejct对象
上面的例子看出,rabbit发送消息可以发送String类型的数据,但是也仅限于String这种较为通用和特殊的数据类型了。一般的Object对象,rabbit是无法发送的,我们必须使该类实现一个Serializeable接口,告诉rabbit可以序列化输出对象,就用java自带的序列化方法就行
例如我有一个Entity类,如下:
1 |
|
此时我们便可以将该类的对象输出为序列化形式,测试方法为:
1 |
|
发送成功后,可以在客户端得知队列收到了一条消息:
1 | rO0ABXNyAC1jb20ua2F0emVueWFzYXgubWFsbC5vcmRlci5lbnRpdHkuT3JkZXJFbnRpdHkAAAAAAAAAAQIAKkwADmF1dG9Db25maXJtRGF5dAATTGphdmEv |
总之是可以输出任意Object类了,且用的是java自带的序列化方法
序列化Obejct为JSON格式
上面也看到了,用java自带的序列化方法很不友好,能不能序列化成JSON格式呢?
还真能,我们可以自定义。写一个配置类:
1 |
|
什么意思呢?相当于重写了rabbit默认的消息转换器,因为实例化一个rabbitTemplate单例时,其会调用一个构造方法,恰好就名叫MessageConverter,它的返回值就是一个默认的序列化方式。
而默认的消息转换器的逻辑是,如果是String类型消息就直接输出原本的消息;如果不是就参照java默认的序列化机制。
而我们在配置类中加入了同名的组件MessgaeConverter,系统配置时会直接调用且优先调用这个同名方法,这一举动会使该方法传递给rabbitTemplate的默认序列化方式被顶替为JSON序列化方式,也即是new的一个Jackson2JsonMessageConverter()。
总之我们再次测试时,消息会变成:
1 | {"VerbI":"I","VerbII":"II"} |
接收消息
简单的消息接收
需要使用@RabbitListener注解,测试方法:
1 | /** |
注意一下,该注解只有在容器内才能生效,也就是在各种组件里面@Service、@Controller、@Component等才行。
开启服务,运行一下上面发送Object类的测试方法,看看打印结果:
1 | 监听到消息:(Body:'[B@1f06c07c(byte[820])' MessageProperties [headers={__TypeId__=Entity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=spring.test01.directExchange, receivedRoutingKey=testRK, deliveryTag=1, consumerTag=amq.ctag-zOPjexiGSPI21zDUGAr-mA, consumerQueue=spring.test01.queue01]),类型为:class org.springframework.amqp.core.Message |
接收特定类型消息
接收消息时也可以直接把对象类型放上去,测试方法:
1 | public void listenMessage(Message message,Entity body){ |
看看结果:
1 | 监听到消息:Entity(VerbI="I",VerbII="II") |
甚至可以拿到信道
测试方法:
1 | public void listenMessage(Message message, Entity body, Channel channel){} |
具体有什么用我就不清楚了
注意
@RabbitListener只能支持同一个消息被仅仅一个服务监听到,但是当队列中碰到很多条消息时,消息可以被负载均衡到不同的服务。
更极端的情况,例如消息队列里消息太多,比服务器数量还多怎么办呢?那就只有在一台服务器处理完一个消息时,才能接收下一个消息,未被接收的消息就会阻塞在消息代理中等待被处理
@RabbitHandler
还有一个注解是@RabbitHandler,它可以和@RabbitListener注解搭配使用,就可以区分队列消息的类型
将@RabbitListener放在类上,@RabbitHandler放在方法上,例如拿一个业务层接口Service开刀:
1 |
|
再次分别调用上面的两个发送消息的测试方法,查看结果:
1 | 接受到String对象消息:ttttest |
也就是说通过配合使用@RabbitListener和@RabbitHandler,可以做到相当于方法的重载,根据参数类型的不同而采取不同的处理方式
可靠投递
防止消息在发送、接收过程中因为一系列原因导致的的消息丢失。
一种解决方法是使用事务,但是据rabbitMQ官方说法,这可能会使性能下降250倍,因此不推荐使用。
而另一种解决方案是使用确认机制,具体工作流程如下:
1 | 1.Publisher发送消息到Broker,若Broker收到了消息则调用一个方法confirmCallback进行确认 |
生产者到消息代理确认
配置application
1 | spring: |
自定义COnfirmCallback
需要写一个配置类,如下:
1 |
|
首先明确,让Spring自动配置的一个单例rabbitTemplate,在整个服务器运行的期间都是同一个,因此只要我们找到它并且将其部分配置修改,就能让其在整个服务器期间都遵从我们的配置。
因此,我们将rabbitTemplate的ConfirmCallback修改为我们所需要的处理方法,也即是重写。
注意这里有坑,这个配置组件不应该和上面的RabbitSerializeConfiguration写在一起,会循环依赖,具体原因就是RabbitTemplate依赖于RabbitSerializeConfiguration中自定义序列化转换器,而该配置类有引入了RabbitTemplate的单例实例化对象,二者会造成了循环依赖
测试
还是用发送消息的测试方法发送一个消息,反馈消息会反馈到消息发送者,也就是临时客户端,结果为:
1 | CorrelationData: null , ack: true reason: null |
注意这里的ack参数,只要消息从Publisher传到Broker,都是true,他不会参考后续进展。事实上,ack的作用就是用来表示消息是否被删除,在这里就可以看作是,Publisher将消息复制了一份交给Broker,Broker收到后就给一个反馈ConfirmCallback告诉Publisher有没有收到消息,如果收到了,ack就为true,消息就会从Publisher删除;没有收到ack就为false,该消息可能会重发,也可能会直接丢弃,依照具体配置执行。
注意,上述过程可以看作是整个rabbitMQ中,两点间消息传递确认机制的模板,基本上大差不差,区别可能仅在于任何处理发送失败的消息。
消息代理到队列确认
配置application
1 | spring: |
publisher-returns表示开启消息代理到队列的确认,而template.mandatory表示以异步(新开一个线程执行)方式回调return。
写配置类
在上面的配置类中setConfirmCallback后面加上setReturnCallback
1 | /** |
测试
为了人为造成发送失败,将测试方法的路由键暂时修改,随后测试.同样是在发送消息的客户端中,但是没有在临时客户端的test结果中特别展示,要在完整终端查看:
1 | 发送失败的消息内容: (Body:'"ttttest"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 回复的状态码: 312 回复的文本内容: NO_ROUTE 发送该消息的交换机: spring.test01.directExchange 路由键: ttttestRK |
确实是有错误提示,而且会给出错误的原因:
1 | 回复的状态码: 312 |
原因就是NO_ROUTE,即是找不到路由键
队列到消费者确认
RabbitMQ自带一个自动ack的机制,一旦接收到数据就会回复ack给队列,队列接到反馈后就会删除消息
但是一个很严重的缺陷是,该机制允许服务器一接收到消息就回复ack,也就是我只管接收到,我一接收到,队列就删除该消息。
但是从逻辑上讲,服务器接收到消息后还应该进行处理,若是在这段处理的时间内服务器宕机,那么此时服务器丢失了消息,而队列那边早就把消息删了,这就导致消息的丢失。
更严重的是,服务器一宕机,队列那边剩余阻塞的消息也会全部清空。
唯一的解决方案是手动确认
配置application
1 | spring: |
此时,除非我们手动确认消息,否则消息在队列中是不会删除的,且会一直处于ready状态
手动确认
在接收消息的方法后面手动确认ack,加上channel.basicAck()方法,前提是参数表中获取了channel
1 | /** |
其中第一个参数是消息的tag,根据消息进入channel的顺序,从1开始的自增id。
第二个参数表示是否连带确认后面进入该channel的所有消息,建议false
而同样,basicNack()方法就是不确认ack,但是相比basicAck会多出一个参数requeue,表示是否将消息退回队列重新ready,例如:
1 | channel.basicNack( |
延时队列
死信路由
死信路由算是原版rabbitmq实现延时队列的核心,它能够使消息在broker内滞留,不立刻发出,利用这一点就能实现延时队列
死信路由的相关组件,死信交换机和死信队列,作用是专门接收和发送死信,不应该承担任何实际业务功能
死信路由的组件本质上就是普通的交换机和队列,只是官方提供了一系列参数使组件,能够声明一个组件是死信组件,并在逻辑上实现死信路由的功能,因此死信组件只是打了mod的普通组件,并未和普通组件做出区分,实际使用上和普通组件是差不多的。
场景引入
从一个现实场景入手,比较好理解
业务需求
设想一个订单超时场景,要求订单在30分钟内未处理,就要进行订单删除并解锁库存。该功能包含了订单服务order和库存服务ware,主服务order,业务为删除订单;远程调用服务为ware,业务为解锁库存。
如果是一般情况下,直接让order调用ware就行了,但是考虑到一致性问题和远程调用的一些问题,且库存的释放并不需要用户立刻感知,所以仅需要保证最终一致性就行了,在这里应该使用延时队列。
架构分析
所以设想架构,订单创建成功后,对应商品的库存也已经锁成功了,那么锁库存成功后立刻向一个交换机stock.exchange.top发出消息,路由键为stock.key.locked,消息应该讲明哪个订单号、哪些仓库、哪个商品sku、几件商品。
交换机拿到该消息后,消息进入等待,等待时间为50min(订单30min内有效),消息死亡后,要发往死信队列stcok.queue.delay,路由键仍为stock.key.locked。
死信队列收到消息后,需要让后端判断该消息的订单是否过期未支付,若是,则将该消息以路由键stock.key.unlock.unpay再次发往交换机;否则不再进行处理,将该消息删除。
如果需要解锁,则交换机拿到该消息后就将该消息以stock.key.unlock.unpay路由键发往ware服务订阅的一个队列stock.queue.unlock,ware服务接收消息并自行处理。
架构如图所示:
红色的是定义死信队列时应该指明的,黑色的是要手动指明的。
创建组件
首先,应该创建一个交换机,因为消息最先从服务发往交换机
1 | /** |
应该明确的是,如果一个消息以路由键stock.key.locked发来交换机时,该消息应当在交换机内滞留一段时间,随后才会发往死信队列
但是滞留多长时间呢?怎么发往死信队列?这些并不是交换机来决定的,而是死信队列来决定
直接上定义
1 | /** |
可以看到的是,事实上死信队列的定义和其他队列的定义真没什么不同,区别在于它用了官方给的自定义参数,使用map将这些参数封装起来交给队列的构造方法,就能定义一个死信队列了,这些参数是:
1.x-dead-letter-exchange,这个参数表面的是死信队列应该接收哪个交换机的死信,而这个交换机也就成为了死信交换机
2.x-dead-letter-routing-key,这个参数表面的是死信队列拿到死信后,将消息以什么路由键发往死信交换机
3.x-message-ttl,表明的是死信交换机内,消息的存活时间
总之这个死信队列定义后,交换机也在事实上成为了死信交换机。
但是需要注意的是,上面不是说消息要以stock.key.locked的路由键发到交换机,才会滞留并发到死信队列吗?这一点不是没有定义吗?而且照这样,其他路由键的消息就也会被死信队列盯上,就走不了正常路由了。
别急,这一点需要手动绑定死信交换机和死信队列才行
1 | /** |
只有手动创建绑定关系,死信交换机内路由键为stock.key.locked的消息才会进入死信队列。
这还没完,刚说了死信队列里的消息会以stock.key.unlock.unpay的路由键发回死信交换机,死信交换机再发往队列stock.queue.unlock,ware服务才能监听该服务,进而完成业务。
所以直接创建unlock队列和交换机的绑定关系
1 | /** |
注意unlock队列和交换机的绑定中,路由键以stock.key.unlock.#,通配符形式,因为还有其他可能的路由键发到unlock队列,比如订单异常但是库存已锁stock.key.unlock.exception等
创建监听者
创建完监听者还没完,因为就这样定义组件,springboot是不会在mq中自动创建组件的,因为你还没有连上mq
怎么连上mq呢?弄一个监听者,监听mq中的队列就行了,这样以来ware服务就连上mq了
1 | /** |