注意,以下所有内容均为个人理解,且建立在具体的场景上,可能不适应所有情况

RabbitMQ

可以在允许最终一致性的场景下,将高并发请求简单化,也就是将高并发的请求处理成一条有序的消息队列,每一个请求都可以看作是一条消息,服务接口会绑定这条队列,按照消息进队列顺序依次处理消息

用在订单服务这种用户不需要立刻拿到反馈的场景简直绝配。

工作流程

1.生产者(Publisher)生产消息,通过一条长连接将消息送到消息代理(Broker)

2.消息由消息头和消息体组成,其中消息体中包含了一个路由(route-key),决定着该消息会进入哪一个队列

3.消息代理先接收消息,再将将消息送到交换机

4.交换机通过消息的消息头包含的key-route,将消息送入对应的队列

5.消费者(Consumer)通过一条长连接与消息代理连接

6.消费者中的不同服务通过不同的信道,监听并接收不同队列的消息

几种交换机

1.director:点对点模式,使用route-key绑定队列,并且route-key精确匹配

2.headers:点对点模式,属于JMS的一种,已淘汰

3.fanout:广播模式,交换机中所有队列都会收到

4.topic:发布订阅模式,同样是通过route-key匹配,但是允许使用通配符部分匹配,匹配到的队列即是订阅了生产者的队列

director和fanout没什么好说的,topic需要注意,该种交换机绑定队列时同样需要声明route-key,例如:

1
2
3
#.aaa       (队列1)
bbb.# (队列2)
#.ccc.bbb (队列3)

其中通配符#表示该处可以是任意个任何字符,但是必须要有字符。除此之外路由键还完全需要满足对应的形式。

使用docker安装RabbitMQ

直接启动rabbitmq:management

1
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

没有镜像不用慌,docker会自动联网下载镜像,请确保网络通畅。

启动时,必须按照相同的端口映射,启动后访问本机(linux设备)的15672端口即可抵达rabbitmq的控制台。

SpringBoot整合RabbitMQ

添加依赖

在要使用RabbitMQ的服务中直接引入其启动依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ

在服务的application中添加上:

1
2
3
4
5
spring:
rabbitmq:
addresses: 192.168.74.130
port: 5672
virtual-host: /

分别表示配置RabbitMQ所在的ip、端口,和使用的虚拟主机(默认应为/)

启动类上开启RabbitMQ功能

也即是加上注解

1
@EnableRabbit

至此配置完毕

在SpringBoot中使用RabbitMQ

声明各种MQ组件

在SpringBoot中,可以使用AmpqAdmin类来进行MQ组件的创建,一般使用其自带的declare系列方法进行创建。

声明一个交换机

测试方法为:

1
2
3
4
5
6
7
8
9
@Test
void rabbitTest01(){
DirectExchange directExchange=new DirectExchange(
"spring.test01.directExchange" //名称
,true //是否持久化
,false //是否自动删除
);
amqpAdmin.declareExchange(directExchange);
}

先创建一个交换机,名称为spring.test01.directExchange,设定为持久化、不自动删除。amqpAdmin的declareExchange方法,用于“宣称”也即是创建一个交换机。

声明一个队列

测试方法为:

1
2
3
4
5
6
7
8
9
10
11
@Test
void createQueue(){
Queue queue=new Queue(
"spring.test01.queue01"
,true //持久化
,false //排他性
,false //自动删除
);
//声明一个队列
amqpAdmin.declareQueue(queue);
}

Queue的对象构造方法参数有四个:名字、是否持久化、是否排他、是否自动删除

声明一个绑定关系

测试方法为:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void createBinding(){
Binding binding=new Binding(
"spring.test01.queue01" //目的地
, Binding.DestinationType.QUEUE //目的地的类型
,"spring.test01.directExchange" //交换机名称
,"testRK" //路由键
,null //自定义参数,这里先不讨论
);
//声明一个绑定关系
amqpAdmin.declareBinding(binding);
}

首先明确,在RabbitMQ中,绑定一定有交换机参与的,可以是交换机之间绑定,也可以是交换机与队列的绑定,但是不允许发生队列与队列之间的绑定,也可以理解,毕竟任何消息要从交换机发送出去,队列本身是消息接收者,在没有交换机的前提下自身是不可能产生消息的,队列间的绑定因此是没有意义的

发送消息

简单的消息发送

使用RabbitTemplate的方法,我们可以使用创建并发送消息的convertAndSend方法。

测试方法如下:

1
2
3
4
5
6
7
8
@Test
void rabbitMessageSending(){
//创建并发送消息
rabbitTemplate.convertAndSend(
"spring.test01.directExchange"
,"testRK"
,"ttttest");
}

参数先后是:交换机、路由键、消息

序列化输出Obejct对象

上面的例子看出,rabbit发送消息可以发送String类型的数据,但是也仅限于String这种较为通用和特殊的数据类型了。一般的Object对象,rabbit是无法发送的,我们必须使该类实现一个Serializeable接口,告诉rabbit可以序列化输出对象,就用java自带的序列化方法就行

例如我有一个Entity类,如下:

1
2
3
4
5
@Data
public class Entity implements Serializable{
private String VerbI;
private String VerbII;
}

此时我们便可以将该类的对象输出为序列化形式,测试方法为:

1
2
3
4
5
6
7
8
9
10
11
@Test
void rabbitMessageSending(){
Entity message=new Entity();
message.setVerbI("I");
message.setVerbII("II")
//发送消息
rabbitTemplate.convertAndSend(
"spring.test01.directExchange"
,"testRK"
,message);
}

发送成功后,可以在客户端得知队列收到了一条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
rO0ABXNyAC1jb20ua2F0emVueWFzYXgubWFsbC5vcmRlci5lbnRpdHkuT3JkZXJFbnRpdHkAAAAAAAAAAQIAKkwADmF1dG9Db25maXJtRGF5dAATTGphdmEv
bGFuZy9JbnRlZ2VyO0wAC2JpbGxDb250ZW50dAASTGphdmEvbGFuZy9TdHJpbmc7TAAKYmlsbEhlYWRlcnEAfgACTAARYmlsbFJlY2VpdmVyRW1haWxxAH4A
AkwAEWJpbGxSZWNlaXZlclBob25lcQB+AAJMAAhiaWxsVHlwZXEAfgABTAALY29tbWVudFRpbWV0ABBMamF2YS91dGlsL0RhdGU7TAANY29uZmlybVN0YXR1
c3EAfgABTAAMY291cG9uQW1vdW50dAAWTGphdmEvbWF0aC9CaWdEZWNpbWFsO0wACGNvdXBvbklkdAAQTGphdmEvbGFuZy9Mb25nO0wACmNyZWF0ZVRpbWVx
AH4AA0wADGRlbGV0ZVN0YXR1c3EAfgABTAAPZGVsaXZlcnlDb21wYW55cQB+AAJMAApkZWxpdmVyeVNucQB+AAJMAAxkZWxpdmVyeVRpbWVxAH4AA0wADmRp
c2NvdW50QW1vdW50cQB+AARMAA1mcmVpZ2h0QW1vdW50cQB+AARMAAZncm93dGhxAH4AAUwAAmlkcQB+AAVMAAtpbnRlZ3JhdGlvbnEAfgABTAARaW50ZWdy
YXRpb25BbW91bnRxAH4ABEwACG1lbWJlcklkcQB+AAVMAA5tZW1iZXJVc2VybmFtZXEAfgACTAAKbW9kaWZ5VGltZXEAfgADTAAEbm90ZXEAfgACTAAHb3Jk
ZXJTbnEAfgACTAAJcGF5QW1vdW50cQB+AARMAAdwYXlUeXBlcQB+AAFMAAtwYXltZW50VGltZXEAfgADTAAPcHJvbW90aW9uQW1vdW50cQB+AARMAAtyZWNl
aXZlVGltZXEAfgADTAAMcmVjZWl2ZXJDaXR5cQB+AAJMABVyZWNlaXZlckRldGFpbEFkZHJlc3NxAH4AAkwADHJlY2VpdmVyTmFtZXEAfgACTAANcmVjZWl2
ZXJQaG9uZXEAfgACTAAQcmVjZWl2ZXJQb3N0Q29kZXEAfgACTAAQcmVjZWl2ZXJQcm92aW5jZXEAfgACTAAOcmVjZWl2ZXJSZWdpb25xAH4AAkwACnNvdXJj
ZVR5cGVxAH4AAUwABnN0YXR1c3EAfgABTAALdG90YWxBbW91bnRxAH4ABEwADnVzZUludGVncmF0aW9ucQB+AAF4cHBwcHBwcHBwcHBwcHBwcHBwcHBwcHNy
AA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAXQABGhkb3dwcHBwcHBw
cHBwcHBwcHBwcHBw

总之是可以输出任意Object类了,且用的是java自带的序列化方法

序列化Obejct为JSON格式

上面也看到了,用java自带的序列化方法很不友好,能不能序列化成JSON格式呢?

还真能,我们可以自定义。写一个配置类:

1
2
3
4
5
6
7
@Configuration
public class RabbitSerializeConfiguration {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

什么意思呢?相当于重写了rabbit默认的消息转换器,因为实例化一个rabbitTemplate单例时,其会调用一个构造方法,恰好就名叫MessageConverter,它的返回值就是一个默认的序列化方式。

而默认的消息转换器的逻辑是,如果是String类型消息就直接输出原本的消息;如果不是就参照java默认的序列化机制。

而我们在配置类中加入了同名的组件MessgaeConverter,系统配置时会直接调用且优先调用这个同名方法,这一举动会使该方法传递给rabbitTemplate的默认序列化方式被顶替为JSON序列化方式,也即是new的一个Jackson2JsonMessageConverter()。

总之我们再次测试时,消息会变成:

1
{"VerbI":"I","VerbII":"II"}

接收消息

简单的消息接收

需要使用@RabbitListener注解,测试方法:

1
2
3
4
5
6
7
/**
* 监听消息队列,获取消息
*/
@RabbitListener(queues = {"spring.test01.queue01"})
public void listenMessage(Message message){
System.out.println("监听到消息:"+message.toString()+",类型为:"+message.getClass());
}

注意一下,该注解只有在容器内才能生效,也就是在各种组件里面@Service、@Controller、@Component等才行。

开启服务,运行一下上面发送Object类的测试方法,看看打印结果:

1
监听到消息:(Body:'[B@1f06c07c(byte[820])' MessageProperties [headers={__TypeId__=Entity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=spring.test01.directExchange, receivedRoutingKey=testRK, deliveryTag=1, consumerTag=amq.ctag-zOPjexiGSPI21zDUGAr-mA, consumerQueue=spring.test01.queue01]),类型为:class org.springframework.amqp.core.Message

接收特定类型消息

接收消息时也可以直接把对象类型放上去,测试方法:

1
2
3
public void listenMessage(Message message,Entity body){
System.out.println("监听到消息:"+body);
}

看看结果:

1
监听到消息:Entity(VerbI="I",VerbII="II")

甚至可以拿到信道

测试方法:

1
public void listenMessage(Message message, Entity body, Channel channel){}

具体有什么用我就不清楚了

注意

@RabbitListener只能支持同一个消息被仅仅一个服务监听到,但是当队列中碰到很多条消息时,消息可以被负载均衡到不同的服务。

更极端的情况,例如消息队列里消息太多,比服务器数量还多怎么办呢?那就只有在一台服务器处理完一个消息时,才能接收下一个消息,未被接收的消息就会阻塞在消息代理中等待被处理

@RabbitHandler

还有一个注解是@RabbitHandler,它可以和@RabbitListener注解搭配使用,就可以区分队列消息的类型
将@RabbitListener放在类上,@RabbitHandler放在方法上,例如拿一个业务层接口Service开刀:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RabbitListener(queues = {"spring.test01.queue01"})
@Service
public class Service{
/**
* 监听Entity对象消息
*/
@RabbitHandler
//@RabbitListener(queues = {"spring.test01.queue01"})
public void listenEntityMessage(Message message, Entity body, Channel channe{
System.out.println("接收到Entity对象消息:"+body);
/**
* 接收String对象消息
*/
@RabbitHandler
public void listenStringMessage(String message){
System.out.println("接受到String对象消息:"+message);
}
}

再次分别调用上面的两个发送消息的测试方法,查看结果:

1
2
接受到String对象消息:ttttest
接收到Entity对象消息:Entity(VerbI="I",VerbII="II")

也就是说通过配合使用@RabbitListener和@RabbitHandler,可以做到相当于方法的重载,根据参数类型的不同而采取不同的处理方式

可靠投递

防止消息在发送、接收过程中因为一系列原因导致的的消息丢失。

一种解决方法是使用事务,但是据rabbitMQ官方说法,这可能会使性能下降250倍,因此不推荐使用。

而另一种解决方案是使用确认机制,具体工作流程如下:

1
2
3
4
5
1.Publisher发送消息到Broker,若Broker收到了消息则调用一个方法confirmCallback进行确认

2.Broker通过Exchange将消息发往Queue,若Queue未收到消息,则调用一个方法returnCallback表示未收到消息

3.Consumer开始接收Queue的消息时,采用的是ack机制,若Consumer正确地消费到了消息则将消息从Queue中删除,否则退回到Queue或重新投递等

生产者到消息代理确认

配置application

1
2
3
spring:
rabbitmq:
publisher-confirm-type: correlated

自定义COnfirmCallback

需要写一个配置类,如下:

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class RabbitConfirmConfiguration {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void setRabbitTemplate(){
rabbitTemplate.setConfirmCallback(
(correlationData, b, s) -> System.out.println("CorrelationData: "+correlationDa+" , ack: "+b+" reason: "+s)
);
}
}

首先明确,让Spring自动配置的一个单例rabbitTemplate,在整个服务器运行的期间都是同一个,因此只要我们找到它并且将其部分配置修改,就能让其在整个服务器期间都遵从我们的配置。

因此,我们将rabbitTemplate的ConfirmCallback修改为我们所需要的处理方法,也即是重写。

注意这里有坑,这个配置组件不应该和上面的RabbitSerializeConfiguration写在一起,会循环依赖,具体原因就是RabbitTemplate依赖于RabbitSerializeConfiguration中自定义序列化转换器,而该配置类有引入了RabbitTemplate的单例实例化对象,二者会造成了循环依赖

测试

还是用发送消息的测试方法发送一个消息,反馈消息会反馈到消息发送者,也就是临时客户端,结果为:

1
CorrelationData: null , ack: true reason: null

注意这里的ack参数,只要消息从Publisher传到Broker,都是true,他不会参考后续进展。事实上,ack的作用就是用来表示消息是否被删除,在这里就可以看作是,Publisher将消息复制了一份交给Broker,Broker收到后就给一个反馈ConfirmCallback告诉Publisher有没有收到消息,如果收到了,ack就为true,消息就会从Publisher删除;没有收到ack就为false,该消息可能会重发,也可能会直接丢弃,依照具体配置执行。

注意,上述过程可以看作是整个rabbitMQ中,两点间消息传递确认机制的模板,基本上大差不差,区别可能仅在于任何处理发送失败的消息。

消息代理到队列确认

配置application

1
2
3
4
5
spring:
rabbitmq:
publisher-returns: true
template:
mandatory: true

publisher-returns表示开启消息代理到队列的确认,而template.mandatory表示以异步(新开一个线程执行)方式回调return。

写配置类

在上面的配置类中setConfirmCallback后面加上setReturnCallback

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 2.broker到queue的确认
*/
rabbitTemplate.setReturnsCallback(returnedMessage ->
System.out.println(
"发送失败的消息内容: "+returnedMessage.getMessage()
+" 回复的状态码: "+ returnedMessage.getReplyCode()
+" 回复的文本内容: "+ returnedMessage.getReplyText()
+" 发送该消息的交换机: "+returnedMessage.getExchange()
+" 路由键: "+returnedMessage.getRoutingKey()
)
);

测试

为了人为造成发送失败,将测试方法的路由键暂时修改,随后测试.同样是在发送消息的客户端中,但是没有在临时客户端的test结果中特别展示,要在完整终端查看:

1
发送失败的消息内容: (Body:'"ttttest"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 回复的状态码: 312 回复的文本内容: NO_ROUTE 发送该消息的交换机: spring.test01.directExchange 路由键: ttttestRK

确实是有错误提示,而且会给出错误的原因:

1
2
3
4
回复的状态码: 312 
回复的文本内容: NO_ROUTE
发送该消息的交换机: spring.test01.directExchange
路由键: ttttestRK

原因就是NO_ROUTE,即是找不到路由键

队列到消费者确认

RabbitMQ自带一个自动ack的机制,一旦接收到数据就会回复ack给队列,队列接到反馈后就会删除消息
但是一个很严重的缺陷是,该机制允许服务器一接收到消息就回复ack,也就是我只管接收到,我一接收到,队列就删除该消息。

但是从逻辑上讲,服务器接收到消息后还应该进行处理,若是在这段处理的时间内服务器宕机,那么此时服务器丢失了消息,而队列那边早就把消息删了,这就导致消息的丢失。

更严重的是,服务器一宕机,队列那边剩余阻塞的消息也会全部清空。

唯一的解决方案是手动确认

配置application

1
2
3
4
5
spring:
rabbitmq:
istener:
simple:
acknowledge-mode: manual

此时,除非我们手动确认消息,否则消息在队列中是不会删除的,且会一直处于ready状态

手动确认

在接收消息的方法后面手动确认ack,加上channel.basicAck()方法,前提是参数表中获取了channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 监听Entity对象消息
*
* 加上了手动确认ack
*/
@RabbitHandler
//@RabbitListener(queues = {"spring.test01.queue01"})
public void listenEntityMessage(Message message, Entity body, Channel channel) {
System.out.println("接收到Entity对象消息:"+body);
try {
channel.basicAck(
message.getMessageProperties().getDeliveryTag() //消息的tag
, false //是否批量确认
);
}catch (Exception e){
System.out.println(message.getMessageProperties().getDeliveryTag()+"号消息");
}
}

其中第一个参数是消息的tag,根据消息进入channel的顺序,从1开始的自增id。

第二个参数表示是否连带确认后面进入该channel的所有消息,建议false

而同样,basicNack()方法就是不确认ack,但是相比basicAck会多出一个参数requeue,表示是否将消息退回队列重新ready,例如:

1
2
3
4
5
channel.basicNack(
message.getMessageProperties().getDeliveryTag() //消息的tag
, false //是否批量确认
, true //退回队列,或是删除消息
);

延时队列

死信路由

死信路由算是原版rabbitmq实现延时队列的核心,它能够使消息在broker内滞留,不立刻发出,利用这一点就能实现延时队列

死信路由的相关组件,死信交换机和死信队列,作用是专门接收和发送死信,不应该承担任何实际业务功能

死信路由的组件本质上就是普通的交换机和队列,只是官方提供了一系列参数使组件,能够声明一个组件是死信组件,并在逻辑上实现死信路由的功能,因此死信组件只是打了mod的普通组件,并未和普通组件做出区分,实际使用上和普通组件是差不多的。

场景引入

从一个现实场景入手,比较好理解

业务需求

设想一个订单超时场景,要求订单在30分钟内未处理,就要进行订单删除并解锁库存。该功能包含了订单服务order和库存服务ware,主服务order,业务为删除订单;远程调用服务为ware,业务为解锁库存。

如果是一般情况下,直接让order调用ware就行了,但是考虑到一致性问题和远程调用的一些问题,且库存的释放并不需要用户立刻感知,所以仅需要保证最终一致性就行了,在这里应该使用延时队列。

架构分析

所以设想架构,订单创建成功后,对应商品的库存也已经锁成功了,那么锁库存成功后立刻向一个交换机stock.exchange.top发出消息,路由键为stock.key.locked,消息应该讲明哪个订单号、哪些仓库、哪个商品sku、几件商品。

交换机拿到该消息后,消息进入等待,等待时间为50min(订单30min内有效),消息死亡后,要发往死信队列stcok.queue.delay,路由键仍为stock.key.locked。

死信队列收到消息后,需要让后端判断该消息的订单是否过期未支付,若是,则将该消息以路由键stock.key.unlock.unpay再次发往交换机;否则不再进行处理,将该消息删除。

如果需要解锁,则交换机拿到该消息后就将该消息以stock.key.unlock.unpay路由键发往ware服务订阅的一个队列stock.queue.unlock,ware服务接收消息并自行处理。

架构如图所示:

[https://kaztenyasax-mall.oss-cn-beijing.aliyuncs.com/RabbitMQ%E7%9A%84%E5%9F%BA%E7%A1%80%E9%85%8D%E7%BD%AE%E5%8F%8A%E4%BD%BF%E7%94%A8-1.png]

红色的是定义死信队列时应该指明的,黑色的是要手动指明的。

创建组件

首先,应该创建一个交换机,因为消息最先从服务发往交换机

1
2
3
4
5
6
7
8
9
10
11
12
 /**
* 创建交换机stock.exchange.top
*/
@Bean
public Exchange stockExchangeTop(){
//public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange(
"stock.exchange.top" //交换机名
,true //持久化
,false //自动删除
);
}

应该明确的是,如果一个消息以路由键stock.key.locked发来交换机时,该消息应当在交换机内滞留一段时间,随后才会发往死信队列

但是滞留多长时间呢?怎么发往死信队列?这些并不是交换机来决定的,而是死信队列来决定

直接上定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 创建死信队列stock.queue.delay
*/
@Bean
public Queue stockQueueDelay(){
//自定义参数,有关死信的全部参数
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-dead-letter-exchange","stock.exchange.top"); //死信队列从哪个交换机拿死信
arguments.put("x-dead-letter-routing-key","stock.key.unlock.unpay"); //死信队列拿到死信后,定时发送到交换机时的路由键
arguments.put("x-message-ttl", 5000); //死信定时时间,5000毫秒

// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
return new Queue(
"stock.queue.delay" //交换机名
,true //持久化
,false //排他
,false //自动删除
,arguments //自定义参数
);
}

可以看到的是,事实上死信队列的定义和其他队列的定义真没什么不同,区别在于它用了官方给的自定义参数,使用map将这些参数封装起来交给队列的构造方法,就能定义一个死信队列了,这些参数是:

1.x-dead-letter-exchange,这个参数表面的是死信队列应该接收哪个交换机的死信,而这个交换机也就成为了死信交换机

2.x-dead-letter-routing-key,这个参数表面的是死信队列拿到死信后,将消息以什么路由键发往死信交换机

3.x-message-ttl,表明的是死信交换机内,消息的存活时间

总之这个死信队列定义后,交换机也在事实上成为了死信交换机。

但是需要注意的是,上面不是说消息要以stock.key.locked的路由键发到交换机,才会滞留并发到死信队列吗?这一点不是没有定义吗?而且照这样,其他路由键的消息就也会被死信队列盯上,就走不了正常路由了。

别急,这一点需要手动绑定死信交换机和死信队列才行

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 创建死信delay队列和交换机的绑定
*/
@Bean
public Binding delayExchangeBinding(){
return new Binding(
"stock.queue.delay" //目的地是死信队列
, Binding.DestinationType.QUEUE //死信队列是队列(QUEUE)
,"stock.exchange.top" //中转exchange
,"stock.key.locked" //路由键,该键表明消息死亡时以该键送往死信队列
,null //自定义参数(可以不填)
);
}

只有手动创建绑定关系,死信交换机内路由键为stock.key.locked的消息才会进入死信队列。

这还没完,刚说了死信队列里的消息会以stock.key.unlock.unpay的路由键发回死信交换机,死信交换机再发往队列stock.queue.unlock,ware服务才能监听该服务,进而完成业务。

所以直接创建unlock队列和交换机的绑定关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 创建ware订阅的队列stock.queue.unlock
*/
@Bean
public Queue stockQueueUnlock(){
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
return new Queue(
"stock.queue.unlock"
,true
,false
,false
);
}
/**
* 创建交换机和unlock队列的绑定
*/
@Bean
public Binding unlockExchangeBinding(){
return new Binding(
"stock.queue.unlock" //目的地是库存队列
, Binding.DestinationType.QUEUE //库存队列是队列(QUEUE)
,"stock.exchange.top" //中转exchange
,"stock.key.unlock.#" //路由键
,null //自定义参数(可以不填)
);
}

注意unlock队列和交换机的绑定中,路由键以stock.key.unlock.#,通配符形式,因为还有其他可能的路由键发到unlock队列,比如订单异常但是库存已锁stock.key.unlock.exception等

创建监听者

创建完监听者还没完,因为就这样定义组件,springboot是不会在mq中自动创建组件的,因为你还没有连上mq

怎么连上mq呢?弄一个监听者,监听mq中的队列就行了,这样以来ware服务就连上mq了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 监听stock.queue.unlock队列,拿取解锁信息
*/
@RabbitListener(queues = "stock.queue.unlock")
public void listenerOrder(Message message,String msg, Channel channel){
System.out.println("收到解锁消息"+message.getMessageProperties().getDeliveryTag()+":"+msg);
try {
channel.basicAck(
message.getMessageProperties().getDeliveryTag() //消息的tag
, false //是否批量确认
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}