Rabbitmq 简介
Rabbitmq 是一个开源的消息代理,实现了高级消息队列协议(AMQP)。它具有高可用性、可扩展性和可靠性的特点,被广泛应用于分布式系统、微服务架构、事件驱动的应用程序等场景。
应用场景
-
异步处理:将耗时操作异步化,提高系统响应速度。例如,电商平台下单后,订单处理、库存更新、物流通知等操作可以异步进行。
-
系统解耦:各个服务之间通过消息队列通信,降低系统间的直接依赖。例如,用户服务和订单服务可以通过RabbitMQ进行通信,而不是直接调用。
-
流量削峰:在高并发场景下,通过消息队列缓存请求,平滑处理峰值流量。例如,秒杀系统中,大量的下单请求可以先写入消息队列,然后逐步处理。
-
日志处理:收集分布式系统的日志信息,统一处理和分析。例如,多个服务的日志可以发送到RabbitMQ,然后由专门的日志服务进行处理。
-
事件驱动架构:基于事件的通信模式,实现系统间的松耦合。例如,用户注册成功后,发送一个事件,触发邮件服务发送欢迎邮件、积分服务发放初始积分等操作。
实际案例
案例1:电商订单处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// 生产者:下单服务
public void createOrder(Order order) {
// 保存订单基本信息
orderRepository.save(order);
// 发送消息到RabbitMQ
OrderMessage message = new OrderMessage(order.getId(), order.getUserId(), order.getItems());
rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
return "订单创建成功,订单号:" + order.getId();
}
// 消费者:库存服务
@RabbitListener(queues = "order.inventory.queue")
public void processInventory(OrderMessage message) {
// 处理库存逻辑
for (OrderItem item : message.getItems()) {
inventoryService.reduceStock(item.getProductId(), item.getQuantity());
}
}
// 消费者:物流服务
@RabbitListener(queues = "order.logistics.queue")
public void processLogistics(OrderMessage message) {
// 处理物流逻辑
logisticsService.createShipment(message.getOrderId(), message.getUserId());
}
|
案例2:用户注册事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 生产者:用户服务
public void registerUser(User user) {
// 保存用户信息
userRepository.save(user);
// 发送用户注册事件
UserRegisteredEvent event = new UserRegisteredEvent(user.getId(), user.getEmail(), user.getUsername());
rabbitTemplate.convertAndSend("user.exchange", "user.registered", event);
return "注册成功";
}
// 消费者:邮件服务
@RabbitListener(queues = "user.email.queue")
public void sendWelcomeEmail(UserRegisteredEvent event) {
// 发送欢迎邮件
emailService.sendWelcomeEmail(event.getEmail(), event.getUsername());
}
// 消费者:积分服务
@RabbitListener(queues = "user.points.queue")
public void assignInitialPoints(UserRegisteredEvent event) {
// 分配初始积分
pointsService.assignPoints(event.getUserId(), 100); // 新用户赠送100积分
}
|
Rabbitmq 架构设计
Rabbitmq 的核心架构包含以下几个关键组件:
Broker
Broker 是 Rabbitmq 集群中的服务器节点,负责接收和处理客户端请求,存储消息数据。每个 Broker 都有一个唯一的 ID,可以独立运行。
实际应用:在高可用部署中,通常会部署多个Broker节点组成集群,并通过负载均衡器分发客户端连接。例如,一个生产环境可能有3个Broker节点,通过HAProxy进行负载均衡,当一个节点故障时,客户端可以自动连接到其他可用节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# RabbitMQ集群配置示例(Docker Compose)
version: '3'
services:
rabbitmq1:
image: rabbitmq:3.9-management
hostname: rabbitmq1
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
ports:
- "5672:5672"
- "15672:15672"
rabbitmq2:
image: rabbitmq:3.9-management
hostname: rabbitmq2
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
depends_on:
- rabbitmq1
rabbitmq3:
image: rabbitmq:3.9-management
hostname: rabbitmq3
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
depends_on:
- rabbitmq1
|
Producer
Producer 是消息生产者,负责将消息发送到 Rabbitmq 集群中的特定 Exchange。Producer 可以选择同步或异步的方式发送消息。
实际应用:生产者通常是业务系统中的一部分,负责在特定业务事件发生时发送消息。例如,支付服务在用户完成支付后,发送支付成功消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// Spring Boot中的Producer示例
@Service
public class PaymentService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void processPayment(Payment payment) {
// 处理支付逻辑
paymentRepository.save(payment);
// 发送支付成功消息
PaymentCompletedEvent event = new PaymentCompletedEvent(
payment.getId(),
payment.getOrderId(),
payment.getAmount(),
payment.getTimestamp()
);
// 同步发送
rabbitTemplate.convertAndSend("payment.exchange", "payment.completed", event);
// 异步发送(带确认回调)
rabbitTemplate.convertAndSend("payment.exchange", "payment.completed", event, new CorrelationData(payment.getId()));
}
// 确认回调
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送成功: {}", correlationData.getId());
} else {
log.error("消息发送失败: {}, 原因: {}", correlationData.getId(), cause);
// 处理失败逻辑,如重试或记录日志
}
};
}
}
|
Consumer
Consumer 是消息消费者,负责从 Rabbitmq 集群中消费消息。Consumer 可以选择手动确认或自动确认消息。
实际应用:消费者通常是独立的服务或进程,专门处理特定类型的消息。例如,通知服务可以消费各种需要发送通知的事件消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// Spring Boot中的Consumer示例
@Service
public class NotificationService {
@Autowired
private EmailService emailService;
@Autowired
private SmsService smsService;
// 自动确认模式
@RabbitListener(queues = "notification.email.queue")
public void processEmailNotification(NotificationEvent event) {
try {
emailService.sendEmail(event.getRecipient(), event.getSubject(), event.getContent());
log.info("邮件发送成功: {}", event.getId());
} catch (Exception e) {
log.error("邮件发送失败: {}", event.getId(), e);
// 自动确认模式下,即使处理失败,消息也会被确认
// 可以将失败消息记录到数据库,后续重试
}
}
// 手动确认模式
@RabbitListener(queues = "notification.sms.queue", ackMode = "MANUAL")
public void processSmsNotification(NotificationEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
smsService.sendSms(event.getRecipient(), event.getContent());
log.info("短信发送成功: {}", event.getId());
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("短信发送失败: {}", event.getId(), e);
try {
// 消息重新入队
channel.basicNack(tag, false, true);
} catch (IOException ex) {
log.error("消息拒绝失败", ex);
}
}
}
}
|
Exchange
Exchange 是消息路由和转发的组件,根据消息的路由键(Routing Key)将消息转发到一个或多个 Queue。常见的 Exchange 类型包括 Direct、Topic、Fanout 等。
实际应用:不同类型的Exchange适用于不同的消息分发场景。例如,在微服务架构中,可以使用Topic Exchange根据消息的不同属性将其路由到不同的服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
// Spring Boot中配置不同类型的Exchange
@Configuration
public class RabbitMQConfig {
// Direct Exchange配置
@Bean
public DirectExchange orderDirectExchange() {
return new DirectExchange("order.direct");
}
@Bean
public Queue orderProcessQueue() {
return new Queue("order.process");
}
@Bean
public Binding orderProcessBinding(Queue orderProcessQueue, DirectExchange orderDirectExchange) {
return BindingBuilder.bind(orderProcessQueue).to(orderDirectExchange).with("order.new");
}
// Topic Exchange配置
@Bean
public TopicExchange notificationTopicExchange() {
return new TopicExchange("notification.topic");
}
@Bean
public Queue emailNotificationQueue() {
return new Queue("notification.email");
}
@Bean
public Queue smsNotificationQueue() {
return new Queue("notification.sms");
}
@Bean
public Binding emailNotificationBinding(Queue emailNotificationQueue, TopicExchange notificationTopicExchange) {
// 匹配所有email相关的通知
return BindingBuilder.bind(emailNotificationQueue).to(notificationTopicExchange).with("notification.*.email");
}
@Bean
public Binding smsNotificationBinding(Queue smsNotificationQueue, TopicExchange notificationTopicExchange) {
// 匹配所有sms相关的通知
return BindingBuilder.bind(smsNotificationQueue).to(notificationTopicExchange).with("notification.*.sms");
}
// Fanout Exchange配置
@Bean
public FanoutExchange auditFanoutExchange() {
return new FanoutExchange("audit.fanout");
}
@Bean
public Queue auditLogQueue() {
return new Queue("audit.log");
}
@Bean
public Queue auditArchiveQueue() {
return new Queue("audit.archive");
}
@Bean
public Binding auditLogBinding(Queue auditLogQueue, FanoutExchange auditFanoutExchange) {
return BindingBuilder.bind(auditLogQueue).to(auditFanoutExchange);
}
@Bean
public Binding auditArchiveBinding(Queue auditArchiveQueue, FanoutExchange auditFanoutExchange) {
return BindingBuilder.bind(auditArchiveQueue).to(auditFanoutExchange);
}
}
|
Queue
Queue 是消息存储的组件,用于存储消息数据。每个 Queue 都有一个唯一的名称,可以绑定多个 Exchange。
实际应用:队列可以配置多种属性以满足不同的业务需求,如持久化、消息TTL、死信队列等。例如,对于重要的业务消息,可以配置持久化队列和死信队列,确保消息不会丢失。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
// 队列配置示例
@Configuration
public class QueueConfig {
// 普通队列
@Bean
public Queue standardQueue() {
return new Queue("standard.queue", true); // 第二个参数为true表示持久化
}
// 带TTL的队列(消息过期时间)
@Bean
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息60秒后过期
return new Queue("ttl.queue", true, false, false, args);
}
// 死信队列配置
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dead.letter.queue", true);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
}
// 主队列(配置死信交换机)
@Bean
public Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead.letter.exchange");
args.put("x-dead-letter-routing-key", "dead.letter");
args.put("x-message-ttl", 30000); // 30秒未消费则进入死信队列
return new Queue("main.queue", true, false, false, args);
}
}
|
Rabbitmq 消息模型
Rabbitmq 消息模型主要包括以下几个部分:
消息
消息是 Rabbitmq 中最小的传输单元,包含消息头和消息体。消息头包含消息的元数据信息,如消息的路由键、消息的优先级等。消息体包含实际的数据内容。
消息队列
消息队列是 Rabbitmq 中消息的存储和转发组件。每个消息队列都有一个唯一的名称,可以绑定多个 Exchange。消息队列可以设置为持久化或非持久化,以保证消息的可靠性。
消息路由
消息路由是 Rabbitmq 中消息的转发组件。根据消息的路由键,消息会被转发到一个或多个 Queue。消息路由可以设置为直接路由、通配符路由、主题路由等。
消息确认
消息确认是 Rabbitmq 中消息的确认机制。当消息被消费者消费后,消费者需要向 Rabbitmq 发送确认消息,以保证消息的可靠性。
Rabbitmq 如何保证消息的可靠性
Rabbitmq 如何保证消息的可靠性主要包括以下几个方面:
消息持久化
Rabbitmq 可以将消息持久化到磁盘上,以保证消息的可靠性。当 Broker 重启后,消息不会丢失。
实际应用:对于重要的业务消息,如订单、支付等,通常需要配置消息持久化,确保系统故障后消息不会丢失。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// 消息持久化配置示例
@Configuration
public class RabbitPersistenceConfig {
// 1. 配置持久化的交换机
@Bean
public DirectExchange persistentExchange() {
return new DirectExchange("persistent.exchange", true, false);
// 第二个参数true表示持久化,第三个参数false表示不自动删除
}
// 2. 配置持久化的队列
@Bean
public Queue persistentQueue() {
return new Queue("persistent.queue", true); // true表示持久化
}
// 3. 绑定关系
@Bean
public Binding persistentBinding() {
return BindingBuilder.bind(persistentQueue()).to(persistentExchange()).with("persistent.key");
}
// 4. 配置消息属性
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置消息属性
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setMandatory(true); // 开启强制性标志
// 设置消息持久化
template.setBeforePublishPostProcessors(message -> {
MessageProperties props = message.getMessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
return message;
});
return template;
}
}
|
持久化的三个层面:
- Exchange持久化:交换机持久化,重启后交换机不会丢失
- Queue持久化:队列持久化,重启后队列不会丢失
- Message持久化:消息持久化,重启后消息不会丢失(需要设置消息的delivery_mode=2)
消息确认
Rabbitmq 可以将消息确认机制设置为手动确认或自动确认。手动确认可以保证消息的可靠性,自动确认可以提高消息的吞吐量。
实际应用:在处理重要业务消息时,通常使用手动确认模式,确保消息被正确处理后再确认;对于非关键消息,可以使用自动确认提高处理速度。
生产者确认机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// 生产者确认机制配置
@Configuration
public class ProducerConfirmConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 开启发布者确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息已成功发送到交换机, id: {}",
correlationData != null ? correlationData.getId() : "");
} else {
log.error("消息发送到交换机失败, id: {}, 原因: {}",
correlationData != null ? correlationData.getId() : "", cause);
// 处理失败逻辑,如重试或记录日志
}
});
// 开启发布者返回(消息从交换机路由到队列的确认)
template.setReturnsCallback(returned -> {
log.error("消息从交换机路由到队列失败: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
// 处理失败逻辑
});
return template;
}
}
|
消费者确认机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// 消费者手动确认示例
@Component
public class ManualAckConsumer {
@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 处理消息
String content = new String(message.getBody());
log.info("接收到消息: {}", content);
// 模拟业务处理
processMessage(content);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理消息失败", e);
// 判断是否已经重试过
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (redelivered) {
log.error("消息已重试过,拒绝消息: {}", deliveryTag);
// 拒绝消息,不重新入队
channel.basicReject(deliveryTag, false);
// 可以将失败消息记录到数据库或发送到死信队列
} else {
log.warn("消息首次处理失败,重新入队: {}", deliveryTag);
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
}
}
}
private void processMessage(String content) {
// 业务处理逻辑
}
}
|
消息重试
Rabbitmq 可以将消息重试机制设置为自动重试或手动重试。自动重试可以保证消息的可靠性,手动重试可以提高消息的可靠性。
实际应用:在处理可能因为临时网络问题或服务不可用而失败的消息时,可以配置重试机制,提高消息处理的成功率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// Spring Boot中配置消息重试
@Configuration
public class RetryConfig {
// 配置重试策略
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 最大重试次数(包括第一次)
.backOffOptions(1000, 2.0, 10000) // 初始间隔、乘数、最大间隔
.recoverer(new RejectAndDontRequeueRecoverer()) // 达到最大重试次数后的处理策略
.build();
}
// 应用重试拦截器
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RetryOperationsInterceptor retryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 配置重试拦截器
Advice[] adviceChain = new Advice[] {retryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
return factory;
}
}
// 使用重试机制的消费者
@Component
public class RetryableConsumer {
@RabbitListener(queues = "retryable.queue")
public void processMessage(String message) {
log.info("处理消息: {}", message);
// 模拟随机失败
if (Math.random() < 0.5) {
log.warn("处理失败,将进行重试");
throw new RuntimeException("随机失败,触发重试");
}
log.info("处理成功");
}
}
|
自定义重试策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// 自定义重试策略,根据异常类型决定是否重试
@Bean
public RetryOperationsInterceptor customRetryInterceptor() {
return RetryInterceptorBuilder.stateless()
.retryPolicy(new SimpleRetryPolicy(3, Map.of(
TemporaryException.class, true, // 临时异常重试
PermanentException.class, false // 永久异常不重试
)))
.backOffPolicy(new ExponentialBackOffPolicy()) // 指数退避策略
.recoverer((args, cause) -> {
// 达到最大重试次数后的处理
String message = new String((byte[]) args.getArguments()[1]);
log.error("消息重试达到最大次数,放入死信队列: {}", message, cause);
// 可以将消息发送到死信队列或记录到数据库
})
.build();
}
|
消息备份
Rabbitmq 可以将消息备份到多个 Broker 上,以保证消息的可靠性。当某个 Broker 宕机后,消息不会丢失。
实际应用:在金融、支付等对数据一致性要求高的系统中,通常会部署RabbitMQ集群,并配置镜像队列,确保消息在多个节点间同步。
1
2
3
4
5
6
7
8
9
10
|
// 镜像队列配置示例
@Bean
public Queue mirroredQueue() {
Map<String, Object> args = new HashMap<>();
// 配置镜像策略,all表示所有节点都有完整副本
args.put("x-ha-policy", "all");
// 设置镜像队列同步模式
args.put("x-ha-sync-mode", "automatic");
return new Queue("mirrored.queue", true, false, false, args);
}
|
通过管理命令配置镜像队列:
1
2
3
4
5
|
# 为所有队列设置镜像策略
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}' --apply-to queues
# 为特定队列设置镜像策略
rabbitmqctl set_policy ha-important "^important\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' --apply-to queues
|
集群部署案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# RabbitMQ集群配置(Kubernetes示例)
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
spec:
serviceName: rabbitmq
replicas: 3
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.9-management
ports:
- containerPort: 5672
- containerPort: 15672
env:
- name: RABBITMQ_ERLANG_COOKIE
value: "SHARED_SECRET_COOKIE"
- name: RABBITMQ_DEFAULT_USER
value: "admin"
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
volumeMounts:
- name: data
mountPath: /var/lib/rabbitmq
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
|
Rabbitmq 如何保证消息不被重复消费
Rabbitmq 如何保证消息不被重复消费主要包括以下几个方面:
消息唯一标识
Rabbitmq 可以为每条消息生成一个唯一的消息 ID,以保证消息的唯一性。
实际应用:在订单系统中,可以使用订单ID作为消息的唯一标识,确保即使消息被重复消费,也能识别出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// 生产者:设置消息唯一ID
public void sendOrderMessage(Order order) {
// 创建消息
OrderMessage message = new OrderMessage(order);
// 设置消息属性
MessageProperties properties = new MessageProperties();
properties.setMessageId(order.getId()); // 使用订单ID作为消息ID
properties.setTimestamp(new Date()); // 设置时间戳
// 发送消息
Message amqpMessage = new Message(objectMapper.writeValueAsBytes(message), properties);
rabbitTemplate.send("order.exchange", "order.created", amqpMessage);
}
|
消息状态检查
Rabbitmq 可以将消息的状态存储到数据库中,以保证消息的状态一致性。
实际应用:在支付系统中,可以将消息处理状态记录到数据库,在消费消息前先检查状态,避免重复处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// 消费者:检查消息状态
@RabbitListener(queues = "payment.process.queue")
public void processPayment(Message message) {
String messageId = message.getMessageProperties().getMessageId();
// 检查消息是否已处理
if (messageProcessRepository.isProcessed(messageId)) {
log.info("消息已处理,跳过: {}", messageId);
return;
}
try {
// 处理消息
PaymentMessage paymentMessage = objectMapper.readValue(message.getBody(), PaymentMessage.class);
paymentService.processPayment(paymentMessage);
// 标记消息为已处理
messageProcessRepository.markAsProcessed(messageId);
} catch (Exception e) {
log.error("处理支付消息失败", e);
throw e; // 重新抛出异常,触发重试机制
}
}
|
数据库表设计:
1
2
3
4
5
6
7
8
|
CREATE TABLE message_process_record (
message_id VARCHAR(50) PRIMARY KEY,
process_status VARCHAR(20) NOT NULL, -- PROCESSED, FAILED
process_time TIMESTAMP NOT NULL,
retry_count INT DEFAULT 0,
last_retry_time TIMESTAMP,
create_time TIMESTAMP NOT NULL
);
|
分布式锁
Rabbitmq 可以使用分布式锁机制,以保证消息的一致性。
实际应用:在高并发场景下,可以使用Redis或Zookeeper实现分布式锁,确保同一时间只有一个消费者处理特定的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// 使用Redis实现分布式锁
@Component
public class DistributedLockConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private OrderService orderService;
@RabbitListener(queues = "order.payment.queue")
public void processOrderPayment(OrderPaymentMessage message) {
String lockKey = "order:payment:lock:" + message.getOrderId();
boolean locked = false;
try {
// 尝试获取分布式锁,过期时间30秒
locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS);
if (locked) {
// 获取锁成功,处理订单支付
orderService.processPayment(message.getOrderId(), message.getAmount());
log.info("订单支付处理成功: {}", message.getOrderId());
} else {
// 获取锁失败,说明消息正在被其他消费者处理
log.info("订单支付消息正在被处理,跳过: {}", message.getOrderId());
}
} finally {
// 释放锁
if (locked) {
redisTemplate.delete(lockKey);
}
}
}
}
|
消息幂等性
Rabbitmq 可以实现消息的幂等性,以保证消息的一致性。
实际应用:在转账系统中,可以通过业务逻辑设计实现幂等性,确保同一笔转账只会执行一次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
// 幂等性处理示例
@Service
public class TransferService {
@Autowired
private AccountRepository accountRepository;
@Autowired
private TransferRepository transferRepository;
@Transactional
public void processTransfer(String transferId, String fromAccount, String toAccount, BigDecimal amount) {
// 检查转账记录是否已存在
if (transferRepository.existsById(transferId)) {
log.info("转账已处理,跳过: {}", transferId);
return;
}
// 执行转账操作
Account from = accountRepository.findById(fromAccount)
.orElseThrow(() -> new AccountNotFoundException(fromAccount));
Account to = accountRepository.findById(toAccount)
.orElseThrow(() -> new AccountNotFoundException(toAccount));
// 检查余额
if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientBalanceException(fromAccount);
}
// 更新账户余额
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
// 记录转账流水
Transfer transfer = new Transfer();
transfer.setId(transferId);
transfer.setFromAccount(fromAccount);
transfer.setToAccount(toAccount);
transfer.setAmount(amount);
transfer.setStatus("COMPLETED");
transfer.setCreateTime(new Date());
transferRepository.save(transfer);
}
}
|
Rabbitmq 如何解决消息积压问题
Rabbitmq 如何解决消息积压问题主要包括以下几个方面:
增加消费者数量
Rabbitmq 可以增加消费者数量,以提高消息的处理速度。
实际应用:在电商平台的订单处理系统中,当出现订单峰值(如促销活动)时,可以动态扩展消费者实例数量,提高消息处理能力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Spring Boot中配置消费者并发数
@Configuration
public class RabbitConsumerConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5); // 初始并发消费者数量
factory.setMaxConcurrentConsumers(20); // 最大并发消费者数量
factory.setPrefetchCount(10); // 每个消费者预取的消息数量
return factory;
}
}
// 在Kubernetes环境中动态扩展消费者Pod数量
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# 消费者自动扩缩容配置(Kubernetes HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-consumer
minReplicas: 3
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: rabbitmq_queue_messages
selector:
matchLabels:
queue: order.processing
target:
type: AverageValue
averageValue: 100 # 当每个队列平均消息数超过100时扩容
|
增加 Broker 数量
Rabbitmq 可以增加 Broker 数量,以提高消息的处理速度。
实际应用:在大型分布式系统中,可以部署RabbitMQ集群,并根据负载情况动态调整节点数量,提高系统整体吞吐量。
1
2
3
4
5
6
7
8
|
# 向现有集群添加新节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
|
集群负载均衡:
1
2
3
4
5
6
7
8
9
10
11
12
|
// 客户端连接工厂配置多个地址
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// 配置多个broker地址,实现负载均衡
connectionFactory.setAddresses("rabbit1:5672,rabbit2:5672,rabbit3:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 开启发布确认
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
|
优化消息路由
Rabbitmq 可以优化消息路由,以提高消息的处理速度。
实际应用:在复杂业务系统中,可以根据消息类型和优先级设计不同的交换机和队列,实现消息的高效路由和处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 优先级队列配置
@Bean
public Queue priorityQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 设置最大优先级为10
return new Queue("priority.queue", true, false, false, args);
}
// 发送带优先级的消息
public void sendWithPriority(String message, int priority) {
MessageProperties properties = new MessageProperties();
properties.setPriority(priority); // 设置消息优先级
Message amqpMessage = new Message(message.getBytes(), properties);
rabbitTemplate.send("priority.exchange", "priority.key", amqpMessage);
}
|
消息分片处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
// 消息分片处理示例
@Configuration
public class ShardingConfig {
// 创建多个分片队列
@Bean
public List<Queue> shardQueues() {
List<Queue> queues = new ArrayList<>();
for (int i = 0; i < 4; i++) {
queues.add(new Queue("order.processing." + i));
}
return queues;
}
// 创建交换机
@Bean
public DirectExchange shardExchange() {
return new DirectExchange("order.shard.exchange");
}
// 绑定队列到交换机,使用不同的路由键
@Bean
public List<Binding> shardBindings(List<Queue> shardQueues, DirectExchange shardExchange) {
List<Binding> bindings = new ArrayList<>();
for (int i = 0; i < shardQueues.size(); i++) {
bindings.add(BindingBuilder.bind(shardQueues.get(i))
.to(shardExchange).with("shard." + i));
}
return bindings;
}
}
// 生产者:根据订单ID进行分片路由
public void sendOrderMessage(Order order) {
int shardIndex = Math.abs(order.getId().hashCode() % 4);
String routingKey = "shard." + shardIndex;
rabbitTemplate.convertAndSend("order.shard.exchange", routingKey, order);
}
|
临时队列转储
实际应用:当出现严重消息积压时,可以创建临时队列,将消息快速转储,然后使用更多的消费者并行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// 消息转储服务
@Service
public class MessageDumpService {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
public void dumpMessages(String sourceQueue, int dumpCount) {
// 创建临时队列
List<String> tempQueues = new ArrayList<>();
for (int i = 0; i < dumpCount; i++) {
String queueName = sourceQueue + ".temp." + i;
Queue queue = new Queue(queueName, false, false, true);
rabbitAdmin.declareQueue(queue);
tempQueues.add(queueName);
}
// 从源队列获取消息并分发到临时队列
int counter = 0;
while (true) {
Message message = rabbitTemplate.receive(sourceQueue, 100);
if (message == null) {
break; // 队列为空,退出循环
}
// 将消息发送到临时队列
String tempQueue = tempQueues.get(counter % tempQueues.size());
rabbitTemplate.send("", tempQueue, message);
counter++;
}
log.info("已将{}条消息从{}队列转储到{}个临时队列", counter, sourceQueue, dumpCount);
}
}
|
Rabbitmq 交换机类型
Rabbitmq 交换机类型主要包括以下几种:
Direct Exchange
Direct Exchange 是最简单的交换机类型,它根据消息的路由键将消息转发到一个或多个 Queue。
实际应用:适用于明确知道消息应该发送到哪个队列的场景,如日志系统中根据日志级别路由消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
// Direct Exchange配置示例
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange logExchange() {
return new DirectExchange("log.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("log.error");
}
@Bean
public Queue warningQueue() {
return new Queue("log.warning");
}
@Bean
public Queue infoQueue() {
return new Queue("log.info");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange logExchange) {
return BindingBuilder.bind(errorQueue).to(logExchange).with("error");
}
@Bean
public Binding warningBinding(Queue warningQueue, DirectExchange logExchange) {
return BindingBuilder.bind(warningQueue).to(logExchange).with("warning");
}
@Bean
public Binding infoBinding(Queue infoQueue, DirectExchange logExchange) {
return BindingBuilder.bind(infoQueue).to(logExchange).with("info");
}
}
// 生产者:发送日志消息
public void sendLog(String level, String message) {
LogMessage logMessage = new LogMessage(level, message, new Date());
rabbitTemplate.convertAndSend("log.direct", level, logMessage);
}
// 消费者:处理错误日志
@RabbitListener(queues = "log.error")
public void processErrorLogs(LogMessage message) {
log.error("收到错误日志: {}", message.getMessage());
// 处理错误日志,如发送告警通知
alertService.sendAlert(message);
}
|
应用场景:
- 日志路由:根据日志级别(ERROR、WARNING、INFO)将日志消息路由到不同的处理队列
- 工作队列:任务分发系统,根据任务类型路由到专门的处理队列
- 命令处理:根据命令类型将消息路由到对应的命令处理器
Topic Exchange
Topic Exchange 是一种多播交换机类型,它根据消息的路由键将消息转发到一个或多个 Queue。Topic Exchange 的路由键支持通配符,*
匹配一个单词,#
匹配零个或多个单词。
实际应用:适用于消息需要按照多维度分类的场景,如根据地区和业务类型路由消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
// Topic Exchange配置示例
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange notificationExchange() {
return new TopicExchange("notification.topic");
}
// 按地区分类的队列
@Bean
public Queue chinaQueue() {
return new Queue("notification.china");
}
@Bean
public Queue usaQueue() {
return new Queue("notification.usa");
}
// 按业务类型分类的队列
@Bean
public Queue orderQueue() {
return new Queue("notification.order");
}
@Bean
public Queue paymentQueue() {
return new Queue("notification.payment");
}
// 绑定关系
@Bean
public Binding chinaOrderBinding(Queue chinaQueue, TopicExchange notificationExchange) {
// 匹配中国地区的订单消息
return BindingBuilder.bind(chinaQueue).to(notificationExchange).with("china.order.*");
}
@Bean
public Binding chinaAllBinding(Queue chinaQueue, TopicExchange notificationExchange) {
// 匹配中国地区的所有消息
return BindingBuilder.bind(chinaQueue).to(notificationExchange).with("china.#");
}
@Bean
public Binding orderAllBinding(Queue orderQueue, TopicExchange notificationExchange) {
// 匹配所有地区的订单消息
return BindingBuilder.bind(orderQueue).to(notificationExchange).with("*.order.*");
}
}
// 生产者:发送通知消息
public void sendNotification(String region, String business, String action, String message) {
String routingKey = region + "." + business + "." + action;
NotificationMessage notification = new NotificationMessage(message, new Date());
rabbitTemplate.convertAndSend("notification.topic", routingKey, notification);
}
// 使用示例
sendNotification("china", "order", "created", "新订单创建通知");
sendNotification("usa", "payment", "completed", "支付完成通知");
|
应用场景:
- 多维度消息路由:根据地区、业务类型、操作类型等多个维度路由消息
- 事件广播:将事件广播给多个关注特定模式的消费者
- 基于主题的订阅:允许消费者订阅感兴趣的特定主题
Fanout Exchange
Fanout Exchange 是一种广播交换机类型,它将消息转发到所有绑定的 Queue,不考虑路由键。
实际应用:适用于需要将消息广播给多个消费者的场景,如系统公告、配置更新等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// Fanout Exchange配置示例
@Configuration
public class FanoutExchangeConfig {
@Bean
public FanoutExchange systemBroadcastExchange() {
return new FanoutExchange("system.broadcast");
}
@Bean
public Queue userServiceQueue() {
return new Queue("system.broadcast.user");
}
@Bean
public Queue orderServiceQueue() {
return new Queue("system.broadcast.order");
}
@Bean
public Queue inventoryServiceQueue() {
return new Queue("system.broadcast.inventory");
}
@Bean
public Binding userServiceBinding(Queue userServiceQueue, FanoutExchange systemBroadcastExchange) {
return BindingBuilder.bind(userServiceQueue).to(systemBroadcastExchange);
}
@Bean
public Binding orderServiceBinding(Queue orderServiceQueue, FanoutExchange systemBroadcastExchange) {
return BindingBuilder.bind(orderServiceQueue).to(systemBroadcastExchange);
}
@Bean
public Binding inventoryServiceBinding(Queue inventoryServiceQueue, FanoutExchange systemBroadcastExchange) {
return BindingBuilder.bind(inventoryServiceQueue).to(systemBroadcastExchange);
}
}
// 生产者:广播系统消息
public void broadcastSystemMessage(String message, String type) {
SystemMessage systemMessage = new SystemMessage(message, type, new Date());
// 发送到Fanout交换机,路由键被忽略
rabbitTemplate.convertAndSend("system.broadcast", "", systemMessage);
}
// 使用示例
broadcastSystemMessage("系统将在10分钟后进行维护", "MAINTENANCE");
broadcastSystemMessage("系统配置已更新,请重新加载", "CONFIG_UPDATE");
|
应用场景:
- 系统公告:向所有服务广播系统维护、更新通知
- 配置更新:当系统配置变更时,通知所有相关服务刷新配置
- 缓存失效:当数据发生变化时,通知所有缓存服务清除相关缓存
Headers Exchange 是一种根据消息头的属性将消息转发到一个或多个 Queue 的交换机类型,它不使用路由键进行匹配,而是根据消息的头部属性进行匹配。
实际应用:适用于需要根据多个条件进行路由的场景,特别是当路由条件不适合用字符串表示时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
// Headers Exchange配置示例
@Configuration
public class HeadersExchangeConfig {
@Bean
public HeadersExchange documentExchange() {
return new HeadersExchange("document.headers");
}
@Bean
public Queue pdfProcessQueue() {
return new Queue("document.process.pdf");
}
@Bean
public Queue imageProcessQueue() {
return new Queue("document.process.image");
}
@Bean
public Queue highPriorityQueue() {
return new Queue("document.priority.high");
}
@Bean
public Binding pdfBinding(Queue pdfProcessQueue, HeadersExchange documentExchange) {
return BindingBuilder.bind(pdfProcessQueue).to(documentExchange)
.where("format").matches("pdf");
}
@Bean
public Binding imageBinding(Queue imageProcessQueue, HeadersExchange documentExchange) {
return BindingBuilder.bind(imageProcessQueue).to(documentExchange)
.where("format").matches("jpg").or("format").matches("png");
}
@Bean
public Binding highPriorityBinding(Queue highPriorityQueue, HeadersExchange documentExchange) {
// 使用whereAll要求所有条件都匹配
return BindingBuilder.bind(highPriorityQueue).to(documentExchange)
.whereAll(Map.of(
"priority", "high",
"type", "document"
)).match();
}
}
// 生产者:发送带头部属性的消息
public void sendDocument(byte[] content, String format, String priority) {
MessageProperties properties = new MessageProperties();
properties.setHeader("format", format);
properties.setHeader("priority", priority);
properties.setHeader("type", "document");
Message message = new Message(content, properties);
rabbitTemplate.send("document.headers", "", message);
}
// 使用示例
sendDocument(pdfContent, "pdf", "high");
sendDocument(imageContent, "jpg", "normal");
|
应用场景:
- 文档处理:根据文档类型、格式、大小等属性路由到不同的处理队列
- 多条件路由:需要根据多个条件进行路由,且这些条件不适合用路由键表示
- 特殊消息处理:根据消息的特殊属性(如优先级、安全级别)进行路由
Headers Exchange vs Topic Exchange:
- Headers Exchange 基于消息头属性匹配,可以使用多个条件,不限于字符串格式
- Topic Exchange 基于路由键匹配,使用字符串模式匹配,支持通配符
- 当路由条件可以用字符串表示时,Topic Exchange 通常更简单高效
- 当需要基于多个非字符串属性或复杂条件路由时,Headers Exchange 更灵活
交换机类型选择指南
交换机类型 |
适用场景 |
优势 |
示例应用 |
Direct |
精确路由,一对一或一对多 |
简单高效,路由明确 |
日志路由、任务分发 |
Topic |
基于模式的路由,多维度分类 |
灵活的模式匹配,支持通配符 |
多区域通知、事件分发 |
Fanout |
广播消息,一对所有 |
最高效的消息分发,不需要路由计算 |
系统公告、缓存刷新 |
Headers |
基于属性的复杂路由 |
支持多条件匹配,不限于字符串 |
文档处理、多条件筛选 |
RabbitMQ 高级特性
死信队列(Dead Letter Queue)
死信队列用于处理无法被正常消费的消息。当消息被拒绝(reject/nack)且不重新入队、消息过期(TTL)或队列达到最大长度时,消息会被发送到死信队列。
实际应用:在订单处理系统中,如果订单消息处理失败达到最大重试次数,可以将其发送到死信队列进行特殊处理或人工干预。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
// 死信队列配置
@Configuration
public class DeadLetterConfig {
// 死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dead.letter.exchange");
}
// 死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue("order.dead.letter.queue");
}
// 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.dead.letter");
}
// 业务队列,配置死信交换机
@Bean
public Queue orderProcessQueue() {
Map<String, Object> args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", "order.dead.letter.exchange");
// 设置死信路由键
args.put("x-dead-letter-routing-key", "order.dead.letter");
// 设置消息过期时间(毫秒)
args.put("x-message-ttl", 60000); // 1分钟
return new Queue("order.process.queue", true, false, false, args);
}
}
// 死信队列消费者
@Component
public class DeadLetterConsumer {
@Autowired
private OrderService orderService;
@RabbitListener(queues = "order.dead.letter.queue")
public void processDeadLetter(Message message) {
try {
// 解析消息
OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);
// 记录死信消息
log.error("订单处理失败,进入死信队列: {}", orderMessage.getOrderId());
// 发送告警通知
alertService.sendAlert("订单处理失败", "订单ID: " + orderMessage.getOrderId());
// 尝试特殊处理
orderService.handleFailedOrder(orderMessage.getOrderId());
} catch (Exception e) {
log.error("处理死信消息失败", e);
}
}
}
|
延迟队列(Delayed Message)
延迟队列用于实现消息的延迟投递,常用于定时任务、延迟处理等场景。
实际应用:在电商系统中,订单创建后如果30分钟内未支付,需要自动取消并恢复库存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
// 延迟队列配置(需要安装rabbitmq_delayed_message_exchange插件)
@Configuration
public class DelayedConfig {
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("order.delayed.exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue orderTimeoutQueue() {
return new Queue("order.timeout.queue");
}
@Bean
public Binding orderTimeoutBinding() {
return BindingBuilder.bind(orderTimeoutQueue())
.to(delayedExchange())
.with("order.timeout")
.noargs();
}
}
// 发送延迟消息
public void createOrder(Order order) {
// 保存订单
orderRepository.save(order);
// 发送延迟消息,30分钟后检查订单状态
OrderTimeoutMessage message = new OrderTimeoutMessage(order.getId());
MessageProperties properties = new MessageProperties();
// 设置延迟时间(毫秒)
properties.setHeader("x-delay", 30 * 60 * 1000); // 30分钟
Message amqpMessage = new Message(objectMapper.writeValueAsBytes(message), properties);
rabbitTemplate.send("order.delayed.exchange", "order.timeout", amqpMessage);
}
// 延迟消息消费者
@Component
public class OrderTimeoutConsumer {
@Autowired
private OrderService orderService;
@RabbitListener(queues = "order.timeout.queue")
public void processOrderTimeout(OrderTimeoutMessage message) {
String orderId = message.getOrderId();
// 检查订单状态
Order order = orderService.getOrder(orderId);
if (order != null && "UNPAID".equals(order.getStatus())) {
// 订单未支付,执行取消操作
orderService.cancelOrder(orderId);
log.info("订单{}超时未支付,已自动取消", orderId);
}
}
}
|
优先级队列(Priority Queue)
优先级队列允许根据消息的优先级决定消费顺序,优先级高的消息会被优先消费。
实际应用:在客服系统中,VIP客户的服务请求应该优先处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 优先级队列配置
@Bean
public Queue supportQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 设置最大优先级为10
return new Queue("support.queue", true, false, false, args);
}
// 发送带优先级的消息
public void createSupportTicket(Ticket ticket) {
// 保存工单
ticketRepository.save(ticket);
// 根据客户等级设置优先级
int priority = switch (ticket.getCustomerLevel()) {
case "VIP" -> 10;
case "GOLD" -> 8;
case "SILVER" -> 5;
default -> 1;
};
// 发送消息
MessageProperties properties = new MessageProperties();
properties.setPriority(priority);
Message message = new Message(objectMapper.writeValueAsBytes(ticket), properties);
rabbitTemplate.send("support.exchange", "support.ticket", message);
}
|
RabbitMQ 最佳实践
生产者最佳实践
-
确认机制:始终使用发布确认(Publisher Confirms)机制确保消息成功发送到RabbitMQ服务器。
-
消息持久化:对于重要业务,确保交换机、队列和消息都设置为持久化(durable)。
-
消息序列化:使用高效的序列化方式(如JSON、Protocol Buffers)并处理好序列化异常。
-
重试机制:实现消息发送失败的重试策略,可以使用指数退避算法。
-
批量发送:在高吞吐量场景下,考虑批量发送消息以提高性能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// 生产者最佳实践示例
@Service
public class BestPracticeProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RetryTemplate retryTemplate;
public void sendWithBestPractice(Object message, String exchange, String routingKey) {
// 使用重试模板
retryTemplate.execute(context -> {
// 创建CorrelationData用于确认回调
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 设置确认回调
correlationData.getFuture().addCallback(
confirm -> {
if (confirm.isAck()) {
log.info("消息已确认: {}", correlationData.getId());
} else {
log.error("消息未确认: {}, 原因: {}", correlationData.getId(), confirm.getReason());
// 可以将未确认的消息保存到数据库,后续重试
}
},
ex -> log.error("消息确认异常", ex)
);
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
return null;
});
}
}
|
消费者最佳实践
-
手动确认:对于重要业务,使用手动确认模式,确保消息处理成功后再确认。
-
幂等性处理:实现消费者的幂等性,确保重复消费不会导致业务问题。
-
并发控制:根据业务特性和硬件资源合理设置消费者的并发数。
-
异常处理:妥善处理消费过程中的异常,决定是拒绝还是重新入队。
-
预取数量:合理设置prefetch count,避免单个消费者负载过重。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// 消费者最佳实践示例
@Component
public class BestPracticeConsumer {
@Autowired
private ProcessRecordRepository recordRepository;
@RabbitListener(queues = "best.practice.queue", ackMode = "MANUAL", concurrency = "5-10")
public void consume(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
// 检查是否已处理(幂等性检查)
if (recordRepository.existsByMessageId(messageId)) {
log.info("消息已处理,跳过: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 处理消息
Object payload = objectMapper.readValue(message.getBody(), TargetType.class);
processMessage(payload);
// 记录处理状态
recordRepository.save(new ProcessRecord(messageId, "SUCCESS"));
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理消息失败: {}", messageId, e);
// 判断是否需要重试
if (isRetryable(e)) {
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} else {
// 拒绝消息不重新入队,进入死信队列
channel.basicNack(deliveryTag, false, false);
// 记录失败状态
recordRepository.save(new ProcessRecord(messageId, "FAILED", e.getMessage()));
}
}
}
private boolean isRetryable(Exception e) {
// 判断异常是否可重试
return e instanceof TemporaryException || e instanceof IOException;
}
}
|
监控和运维最佳实践
-
健康检查:定期检查RabbitMQ集群的健康状态,包括节点状态、队列长度等。
-
告警机制:设置关键指标的告警阈值,如队列长度、消息积压、消费者数量等。
-
日志记录:记录关键操作和异常情况,便于问题排查。
-
资源隔离:为不同业务系统使用不同的vhost,实现资源隔离。
-
容量规划:根据业务增长预测,提前规划RabbitMQ集群的容量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// 监控示例
@Component
public class RabbitMQMonitor {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorQueueStatus() {
// 获取队列信息
Queue queue = rabbitTemplate.execute(channel -> {
try {
return channel.queueDeclarePassive("important.queue");
} catch (IOException e) {
log.error("获取队列信息失败", e);
return null;
}
});
if (queue != null) {
int messageCount = queue.getMessageCount();
int consumerCount = queue.getConsumerCount();
log.info("队列状态 - 消息数: {}, 消费者数: {}", messageCount, consumerCount);
// 检查队列积压
if (messageCount > 1000 && consumerCount < 5) {
log.warn("队列积压告警 - 消息数: {}, 消费者数: {}", messageCount, consumerCount);
alertService.sendAlert("队列积压", "important.queue队列积压,消息数: " + messageCount);
}
}
}
}
|
总结
RabbitMQ作为一个成熟的消息队列中间件,在分布式系统中扮演着重要角色。通过合理使用其交换机类型、队列特性和消息属性,可以构建高效、可靠的消息传递系统。在实际应用中,需要根据业务场景选择合适的消息模型,并遵循最佳实践,确保系统的稳定性和可靠性。
对于不同的业务场景,RabbitMQ提供了灵活的解决方案:
- 异步处理:通过消息队列实现系统解耦,提高响应速度
- 流量削峰:在高并发场景下缓冲请求,平滑处理峰值流量
- 可靠通信:通过消息持久化、确认机制等保证消息不丢失
- 灵活路由:利用不同类型的交换机实现复杂的消息路由逻辑
- 延时处理:使用延迟队列实现定时任务和延迟处理
在使用RabbitMQ时,应当注意消息的可靠性、幂等性处理、性能优化和监控运维等方面,确保消息队列在系统中发挥最大价值。