MQ消息队列¶
初识MQ¶
同步调用¶
同步调用:微服务间基于Feign的调用
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步调用¶
异步通信的优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
什么是MQ¶
MessageQueue 消息队列:存放消息的队列。(图中的Broker)
RabbitMQ¶
RabbitMQ概述¶
RabbitMQ是基于Erlang语言开发的开源消息通信中间件。
RabbitMQ的结构:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
RabbitMQ部署¶
在线拉取下载镜像
安装MQ:执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
- 消息队列端口:5672
- MQ后台端口:15672
SpringAMQP¶
SpringAMQP概述¶
https://spring.io/projects/spring-amqp
AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
简单队列¶
1、引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置mq连接
spring:
rabbitmq:
host: 20.27.55.226 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: admin # 密码
3、消费者配置队列
4、消费者监听
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者接收到消息 :" + msg);
}
}
5、生产者发送到队列
convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者在消费者配置中声明一个队列,然后才能convertAndSend
@Test
void testSimpleQueue() {
String queueName = "simple.queue";
String msg = "wmh likes rabbit";
rabbitTemplate.convertAndSend(queueName, msg);
}
工作队列¶
相当于简单队列的负载均衡版本。
消费者会预确认消息再处理,设置preFetch这个值,可以控制预取消息的上限
spring:
rabbitmq:
host: 20.27.55.226 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: admin # 密码
listener:
simple:
# 每次只能获取一条消息,处理完成才能获取下一个消息
prefetch: 1
Fanout 广播¶
消费者:
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.q1"),
exchange = @Exchange(name = "wmh.fanout",
type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg) {
System.out.println(msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.q2"),
exchange = @Exchange(name = "wmh.fanout",
type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg) {
System.out.println(msg);
}
}
生产者:
for (int i = 0; i < 10; i++) {
String exchange = "wmh.fanout";
String msg = "wmh likes fanout! - " + i;
rabbitTemplate.convertAndSend(exchange, "", msg);
}
Direct 指定路由¶
会将接收到的消息根据规则路由到指定的Queue
消费者:
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.q1"),
exchange = @Exchange(name = "wmh.direct",
type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue1(String msg) {
System.out.println(msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.q2"),
exchange = @Exchange(name = "wmh.direct",
type = ExchangeTypes.DIRECT),
key = {"blue", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.out.println(msg);
}
}
生产者:
@Test
void testDirectQueue() {
String exchange = "wmh.direct";
String msg = "wmh likes direct!";
rabbitTemplate.convertAndSend(exchange, "yellow", msg);
}
Topic 匹配路由¶
生产者:
@Test
void testSimpleQueue() {
String exchange = "wmh.topic";
String msg = "wmh likes direct!";
rabbitTemplate.convertAndSend(exchange, "china.news", msg);
}
消费者:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.q1"),
exchange = @Exchange(name = "wmh.topic",
type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue1(String msg) {
System.out.println(msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.q2"),
exchange = @Exchange(name = "wmh.topic",
type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue2(String msg) {
System.err.println(msg);
}
消息转换器¶
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理默认实现是基于JDK的ObjectOutputStream
完成序列化。
修改序列化方式需要定义一个MessageConverter
类型的Bean
1、引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2、生产者和消费者声明MessageConverter
3、消费者配置队列
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4、消费者配置监听
@RabbitListener(queues = "object.queue")
public void listenJsonQueue(User user) {
System.out.println(user.toString());
}
5、生产者发送消息