Rabbitmq知识整理

Rabbitmq 简介

Rabbitmq 是一个开源的消息代理,实现了高级消息队列协议(AMQP)。它具有高可用性、可扩展性和可靠性的特点,被广泛应用于分布式系统、微服务架构、事件驱动的应用程序等场景。

应用场景

  1. 异步处理:将耗时操作异步化,提高系统响应速度。例如,电商平台下单后,订单处理、库存更新、物流通知等操作可以异步进行。

  2. 系统解耦:各个服务之间通过消息队列通信,降低系统间的直接依赖。例如,用户服务和订单服务可以通过RabbitMQ进行通信,而不是直接调用。

  3. 流量削峰:在高并发场景下,通过消息队列缓存请求,平滑处理峰值流量。例如,秒杀系统中,大量的下单请求可以先写入消息队列,然后逐步处理。

  4. 日志处理:收集分布式系统的日志信息,统一处理和分析。例如,多个服务的日志可以发送到RabbitMQ,然后由专门的日志服务进行处理。

  5. 事件驱动架构:基于事件的通信模式,实现系统间的松耦合。例如,用户注册成功后,发送一个事件,触发邮件服务发送欢迎邮件、积分服务发放初始积分等操作。

实际案例

案例1:电商订单处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 生产者:下单服务
public void createOrder(Order order) {
    // 保存订单基本信息
    orderRepository.save(order);
    
    // 发送消息到RabbitMQ
    OrderMessage message = new OrderMessage(order.getId(), order.getUserId(), order.getItems());
    rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
    
    return "订单创建成功,订单号:" + order.getId();
}

// 消费者:库存服务
@RabbitListener(queues = "order.inventory.queue")
public void processInventory(OrderMessage message) {
    // 处理库存逻辑
    for (OrderItem item : message.getItems()) {
        inventoryService.reduceStock(item.getProductId(), item.getQuantity());
    }
}

// 消费者:物流服务
@RabbitListener(queues = "order.logistics.queue")
public void processLogistics(OrderMessage message) {
    // 处理物流逻辑
    logisticsService.createShipment(message.getOrderId(), message.getUserId());
}

案例2:用户注册事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 生产者:用户服务
public void registerUser(User user) {
    // 保存用户信息
    userRepository.save(user);
    
    // 发送用户注册事件
    UserRegisteredEvent event = new UserRegisteredEvent(user.getId(), user.getEmail(), user.getUsername());
    rabbitTemplate.convertAndSend("user.exchange", "user.registered", event);
    
    return "注册成功";
}

// 消费者:邮件服务
@RabbitListener(queues = "user.email.queue")
public void sendWelcomeEmail(UserRegisteredEvent event) {
    // 发送欢迎邮件
    emailService.sendWelcomeEmail(event.getEmail(), event.getUsername());
}

// 消费者:积分服务
@RabbitListener(queues = "user.points.queue")
public void assignInitialPoints(UserRegisteredEvent event) {
    // 分配初始积分
    pointsService.assignPoints(event.getUserId(), 100); // 新用户赠送100积分
}

Rabbitmq 架构设计

Rabbitmq 的核心架构包含以下几个关键组件:

Broker

Broker 是 Rabbitmq 集群中的服务器节点,负责接收和处理客户端请求,存储消息数据。每个 Broker 都有一个唯一的 ID,可以独立运行。

实际应用:在高可用部署中,通常会部署多个Broker节点组成集群,并通过负载均衡器分发客户端连接。例如,一个生产环境可能有3个Broker节点,通过HAProxy进行负载均衡,当一个节点故障时,客户端可以自动连接到其他可用节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# RabbitMQ集群配置示例(Docker Compose)
version: '3'

services:
  rabbitmq1:
    image: rabbitmq:3.9-management
    hostname: rabbitmq1
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
    ports:
      - "5672:5672"
      - "15672:15672"

  rabbitmq2:
    image: rabbitmq:3.9-management
    hostname: rabbitmq2
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
    depends_on:
      - rabbitmq1

  rabbitmq3:
    image: rabbitmq:3.9-management
    hostname: rabbitmq3
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
    depends_on:
      - rabbitmq1

Producer

Producer 是消息生产者,负责将消息发送到 Rabbitmq 集群中的特定 Exchange。Producer 可以选择同步或异步的方式发送消息。

实际应用:生产者通常是业务系统中的一部分,负责在特定业务事件发生时发送消息。例如,支付服务在用户完成支付后,发送支付成功消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Spring Boot中的Producer示例
@Service
public class PaymentService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void processPayment(Payment payment) {
        // 处理支付逻辑
        paymentRepository.save(payment);
        
        // 发送支付成功消息
        PaymentCompletedEvent event = new PaymentCompletedEvent(
            payment.getId(),
            payment.getOrderId(),
            payment.getAmount(),
            payment.getTimestamp()
        );
        
        // 同步发送
        rabbitTemplate.convertAndSend("payment.exchange", "payment.completed", event);
        
        // 异步发送(带确认回调)
        rabbitTemplate.convertAndSend("payment.exchange", "payment.completed", event, new CorrelationData(payment.getId()));
    }
    
    // 确认回调
    @Bean
    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送成功: {}", correlationData.getId());
            } else {
                log.error("消息发送失败: {}, 原因: {}", correlationData.getId(), cause);
                // 处理失败逻辑,如重试或记录日志
            }
        };
    }
}

Consumer

Consumer 是消息消费者,负责从 Rabbitmq 集群中消费消息。Consumer 可以选择手动确认或自动确认消息。

实际应用:消费者通常是独立的服务或进程,专门处理特定类型的消息。例如,通知服务可以消费各种需要发送通知的事件消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Spring Boot中的Consumer示例
@Service
public class NotificationService {
    
    @Autowired
    private EmailService emailService;
    
    @Autowired
    private SmsService smsService;
    
    // 自动确认模式
    @RabbitListener(queues = "notification.email.queue")
    public void processEmailNotification(NotificationEvent event) {
        try {
            emailService.sendEmail(event.getRecipient(), event.getSubject(), event.getContent());
            log.info("邮件发送成功: {}", event.getId());
        } catch (Exception e) {
            log.error("邮件发送失败: {}", event.getId(), e);
            // 自动确认模式下,即使处理失败,消息也会被确认
            // 可以将失败消息记录到数据库,后续重试
        }
    }
    
    // 手动确认模式
    @RabbitListener(queues = "notification.sms.queue", ackMode = "MANUAL")
    public void processSmsNotification(NotificationEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            smsService.sendSms(event.getRecipient(), event.getContent());
            log.info("短信发送成功: {}", event.getId());
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("短信发送失败: {}", event.getId(), e);
            try {
                // 消息重新入队
                channel.basicNack(tag, false, true);
            } catch (IOException ex) {
                log.error("消息拒绝失败", ex);
            }
        }
    }
}

Exchange

Exchange 是消息路由和转发的组件,根据消息的路由键(Routing Key)将消息转发到一个或多个 Queue。常见的 Exchange 类型包括 Direct、Topic、Fanout 等。

实际应用:不同类型的Exchange适用于不同的消息分发场景。例如,在微服务架构中,可以使用Topic Exchange根据消息的不同属性将其路由到不同的服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Spring Boot中配置不同类型的Exchange
@Configuration
public class RabbitMQConfig {
    
    // Direct Exchange配置
    @Bean
    public DirectExchange orderDirectExchange() {
        return new DirectExchange("order.direct");
    }
    
    @Bean
    public Queue orderProcessQueue() {
        return new Queue("order.process");
    }
    
    @Bean
    public Binding orderProcessBinding(Queue orderProcessQueue, DirectExchange orderDirectExchange) {
        return BindingBuilder.bind(orderProcessQueue).to(orderDirectExchange).with("order.new");
    }
    
    // Topic Exchange配置
    @Bean
    public TopicExchange notificationTopicExchange() {
        return new TopicExchange("notification.topic");
    }
    
    @Bean
    public Queue emailNotificationQueue() {
        return new Queue("notification.email");
    }
    
    @Bean
    public Queue smsNotificationQueue() {
        return new Queue("notification.sms");
    }
    
    @Bean
    public Binding emailNotificationBinding(Queue emailNotificationQueue, TopicExchange notificationTopicExchange) {
        // 匹配所有email相关的通知
        return BindingBuilder.bind(emailNotificationQueue).to(notificationTopicExchange).with("notification.*.email");
    }
    
    @Bean
    public Binding smsNotificationBinding(Queue smsNotificationQueue, TopicExchange notificationTopicExchange) {
        // 匹配所有sms相关的通知
        return BindingBuilder.bind(smsNotificationQueue).to(notificationTopicExchange).with("notification.*.sms");
    }
    
    // Fanout Exchange配置
    @Bean
    public FanoutExchange auditFanoutExchange() {
        return new FanoutExchange("audit.fanout");
    }
    
    @Bean
    public Queue auditLogQueue() {
        return new Queue("audit.log");
    }
    
    @Bean
    public Queue auditArchiveQueue() {
        return new Queue("audit.archive");
    }
    
    @Bean
    public Binding auditLogBinding(Queue auditLogQueue, FanoutExchange auditFanoutExchange) {
        return BindingBuilder.bind(auditLogQueue).to(auditFanoutExchange);
    }
    
    @Bean
    public Binding auditArchiveBinding(Queue auditArchiveQueue, FanoutExchange auditFanoutExchange) {
        return BindingBuilder.bind(auditArchiveQueue).to(auditFanoutExchange);
    }
}

Queue

Queue 是消息存储的组件,用于存储消息数据。每个 Queue 都有一个唯一的名称,可以绑定多个 Exchange。

实际应用:队列可以配置多种属性以满足不同的业务需求,如持久化、消息TTL、死信队列等。例如,对于重要的业务消息,可以配置持久化队列和死信队列,确保消息不会丢失。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 队列配置示例
@Configuration
public class QueueConfig {
    
    // 普通队列
    @Bean
    public Queue standardQueue() {
        return new Queue("standard.queue", true); // 第二个参数为true表示持久化
    }
    
    // 带TTL的队列(消息过期时间)
    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 60000); // 消息60秒后过期
        return new Queue("ttl.queue", true, false, false, args);
    }
    
    // 死信队列配置
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange");
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead.letter.queue", true);
    }
    
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
    }
    
    // 主队列(配置死信交换机)
    @Bean
    public Queue mainQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        args.put("x-dead-letter-routing-key", "dead.letter");
        args.put("x-message-ttl", 30000); // 30秒未消费则进入死信队列
        return new Queue("main.queue", true, false, false, args);
    }
}

Rabbitmq 消息模型

Rabbitmq 消息模型主要包括以下几个部分:

消息

消息是 Rabbitmq 中最小的传输单元,包含消息头和消息体。消息头包含消息的元数据信息,如消息的路由键、消息的优先级等。消息体包含实际的数据内容。

消息队列

消息队列是 Rabbitmq 中消息的存储和转发组件。每个消息队列都有一个唯一的名称,可以绑定多个 Exchange。消息队列可以设置为持久化或非持久化,以保证消息的可靠性。

消息路由

消息路由是 Rabbitmq 中消息的转发组件。根据消息的路由键,消息会被转发到一个或多个 Queue。消息路由可以设置为直接路由、通配符路由、主题路由等。

消息确认

消息确认是 Rabbitmq 中消息的确认机制。当消息被消费者消费后,消费者需要向 Rabbitmq 发送确认消息,以保证消息的可靠性。

Rabbitmq 如何保证消息的可靠性

Rabbitmq 如何保证消息的可靠性主要包括以下几个方面:

消息持久化

Rabbitmq 可以将消息持久化到磁盘上,以保证消息的可靠性。当 Broker 重启后,消息不会丢失。

实际应用:对于重要的业务消息,如订单、支付等,通常需要配置消息持久化,确保系统故障后消息不会丢失。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 消息持久化配置示例
@Configuration
public class RabbitPersistenceConfig {
    
    // 1. 配置持久化的交换机
    @Bean
    public DirectExchange persistentExchange() {
        return new DirectExchange("persistent.exchange", true, false);
        // 第二个参数true表示持久化,第三个参数false表示不自动删除
    }
    
    // 2. 配置持久化的队列
    @Bean
    public Queue persistentQueue() {
        return new Queue("persistent.queue", true); // true表示持久化
    }
    
    // 3. 绑定关系
    @Bean
    public Binding persistentBinding() {
        return BindingBuilder.bind(persistentQueue()).to(persistentExchange()).with("persistent.key");
    }
    
    // 4. 配置消息属性
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 设置消息属性
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setMandatory(true); // 开启强制性标志
        
        // 设置消息持久化
        template.setBeforePublishPostProcessors(message -> {
            MessageProperties props = message.getMessageProperties();
            props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
            return message;
        });
        
        return template;
    }
}

持久化的三个层面

  1. Exchange持久化:交换机持久化,重启后交换机不会丢失
  2. Queue持久化:队列持久化,重启后队列不会丢失
  3. Message持久化:消息持久化,重启后消息不会丢失(需要设置消息的delivery_mode=2)

消息确认

Rabbitmq 可以将消息确认机制设置为手动确认或自动确认。手动确认可以保证消息的可靠性,自动确认可以提高消息的吞吐量。

实际应用:在处理重要业务消息时,通常使用手动确认模式,确保消息被正确处理后再确认;对于非关键消息,可以使用自动确认提高处理速度。

生产者确认机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 生产者确认机制配置
@Configuration
public class ProducerConfirmConfig {
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // 开启发布者确认
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已成功发送到交换机, id: {}", 
                    correlationData != null ? correlationData.getId() : "");
            } else {
                log.error("消息发送到交换机失败, id: {}, 原因: {}", 
                    correlationData != null ? correlationData.getId() : "", cause);
                // 处理失败逻辑,如重试或记录日志
            }
        });
        
        // 开启发布者返回(消息从交换机路由到队列的确认)
        template.setReturnsCallback(returned -> {
            log.error("消息从交换机路由到队列失败: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                returned.getExchange(), returned.getRoutingKey(),
                returned.getReplyCode(), returned.getReplyText());
            // 处理失败逻辑
        });
        
        return template;
    }
}

消费者确认机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 消费者手动确认示例
@Component
public class ManualAckConsumer {
    
    @RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 处理消息
            String content = new String(message.getBody());
            log.info("接收到消息: {}", content);
            
            // 模拟业务处理
            processMessage(content);
            
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("处理消息失败", e);
            
            // 判断是否已经重试过
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if (redelivered) {
                log.error("消息已重试过,拒绝消息: {}", deliveryTag);
                // 拒绝消息,不重新入队
                channel.basicReject(deliveryTag, false);
                // 可以将失败消息记录到数据库或发送到死信队列
            } else {
                log.warn("消息首次处理失败,重新入队: {}", deliveryTag);
                // 拒绝消息,重新入队
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    
    private void processMessage(String content) {
        // 业务处理逻辑
    }
}

消息重试

Rabbitmq 可以将消息重试机制设置为自动重试或手动重试。自动重试可以保证消息的可靠性,手动重试可以提高消息的可靠性。

实际应用:在处理可能因为临时网络问题或服务不可用而失败的消息时,可以配置重试机制,提高消息处理的成功率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Spring Boot中配置消息重试
@Configuration
public class RetryConfig {
    
    // 配置重试策略
    @Bean
    public RetryOperationsInterceptor retryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateless()
            .maxAttempts(3) // 最大重试次数(包括第一次)
            .backOffOptions(1000, 2.0, 10000) // 初始间隔、乘数、最大间隔
            .recoverer(new RejectAndDontRequeueRecoverer()) // 达到最大重试次数后的处理策略
            .build();
    }
    
    // 应用重试拦截器
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            RetryOperationsInterceptor retryOperationsInterceptor) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        
        // 配置重试拦截器
        Advice[] adviceChain = new Advice[] {retryOperationsInterceptor};
        factory.setAdviceChain(adviceChain);
        
        return factory;
    }
}

// 使用重试机制的消费者
@Component
public class RetryableConsumer {
    
    @RabbitListener(queues = "retryable.queue")
    public void processMessage(String message) {
        log.info("处理消息: {}", message);
        
        // 模拟随机失败
        if (Math.random() < 0.5) {
            log.warn("处理失败,将进行重试");
            throw new RuntimeException("随机失败,触发重试");
        }
        
        log.info("处理成功");
    }
}

自定义重试策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 自定义重试策略,根据异常类型决定是否重试
@Bean
public RetryOperationsInterceptor customRetryInterceptor() {
    return RetryInterceptorBuilder.stateless()
        .retryPolicy(new SimpleRetryPolicy(3, Map.of(
            TemporaryException.class, true, // 临时异常重试
            PermanentException.class, false // 永久异常不重试
        )))
        .backOffPolicy(new ExponentialBackOffPolicy()) // 指数退避策略
        .recoverer((args, cause) -> {
            // 达到最大重试次数后的处理
            String message = new String((byte[]) args.getArguments()[1]);
            log.error("消息重试达到最大次数,放入死信队列: {}", message, cause);
            // 可以将消息发送到死信队列或记录到数据库
        })
        .build();
}

消息备份

Rabbitmq 可以将消息备份到多个 Broker 上,以保证消息的可靠性。当某个 Broker 宕机后,消息不会丢失。

实际应用:在金融、支付等对数据一致性要求高的系统中,通常会部署RabbitMQ集群,并配置镜像队列,确保消息在多个节点间同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 镜像队列配置示例
@Bean
public Queue mirroredQueue() {
    Map<String, Object> args = new HashMap<>();
    // 配置镜像策略,all表示所有节点都有完整副本
    args.put("x-ha-policy", "all");
    // 设置镜像队列同步模式
    args.put("x-ha-sync-mode", "automatic");
    return new Queue("mirrored.queue", true, false, false, args);
}

通过管理命令配置镜像队列

1
2
3
4
5
# 为所有队列设置镜像策略
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}' --apply-to queues

# 为特定队列设置镜像策略
rabbitmqctl set_policy ha-important "^important\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' --apply-to queues

集群部署案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# RabbitMQ集群配置(Kubernetes示例)
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.9-management
        ports:
        - containerPort: 5672
        - containerPort: 15672
        env:
        - name: RABBITMQ_ERLANG_COOKIE
          value: "SHARED_SECRET_COOKIE"
        - name: RABBITMQ_DEFAULT_USER
          value: "admin"
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        volumeMounts:
        - name: data
          mountPath: /var/lib/rabbitmq
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

Rabbitmq 如何保证消息不被重复消费

Rabbitmq 如何保证消息不被重复消费主要包括以下几个方面:

消息唯一标识

Rabbitmq 可以为每条消息生成一个唯一的消息 ID,以保证消息的唯一性。

实际应用:在订单系统中,可以使用订单ID作为消息的唯一标识,确保即使消息被重复消费,也能识别出来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 生产者:设置消息唯一ID
public void sendOrderMessage(Order order) {
    // 创建消息
    OrderMessage message = new OrderMessage(order);
    
    // 设置消息属性
    MessageProperties properties = new MessageProperties();
    properties.setMessageId(order.getId()); // 使用订单ID作为消息ID
    properties.setTimestamp(new Date()); // 设置时间戳
    
    // 发送消息
    Message amqpMessage = new Message(objectMapper.writeValueAsBytes(message), properties);
    rabbitTemplate.send("order.exchange", "order.created", amqpMessage);
}

消息状态检查

Rabbitmq 可以将消息的状态存储到数据库中,以保证消息的状态一致性。

实际应用:在支付系统中,可以将消息处理状态记录到数据库,在消费消息前先检查状态,避免重复处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 消费者:检查消息状态
@RabbitListener(queues = "payment.process.queue")
public void processPayment(Message message) {
    String messageId = message.getMessageProperties().getMessageId();
    
    // 检查消息是否已处理
    if (messageProcessRepository.isProcessed(messageId)) {
        log.info("消息已处理,跳过: {}", messageId);
        return;
    }
    
    try {
        // 处理消息
        PaymentMessage paymentMessage = objectMapper.readValue(message.getBody(), PaymentMessage.class);
        paymentService.processPayment(paymentMessage);
        
        // 标记消息为已处理
        messageProcessRepository.markAsProcessed(messageId);
    } catch (Exception e) {
        log.error("处理支付消息失败", e);
        throw e; // 重新抛出异常,触发重试机制
    }
}

数据库表设计

1
2
3
4
5
6
7
8
CREATE TABLE message_process_record (
    message_id VARCHAR(50) PRIMARY KEY,
    process_status VARCHAR(20) NOT NULL, -- PROCESSED, FAILED
    process_time TIMESTAMP NOT NULL,
    retry_count INT DEFAULT 0,
    last_retry_time TIMESTAMP,
    create_time TIMESTAMP NOT NULL
);

分布式锁

Rabbitmq 可以使用分布式锁机制,以保证消息的一致性。

实际应用:在高并发场景下,可以使用Redis或Zookeeper实现分布式锁,确保同一时间只有一个消费者处理特定的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 使用Redis实现分布式锁
@Component
public class DistributedLockConsumer {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private OrderService orderService;
    
    @RabbitListener(queues = "order.payment.queue")
    public void processOrderPayment(OrderPaymentMessage message) {
        String lockKey = "order:payment:lock:" + message.getOrderId();
        boolean locked = false;
        
        try {
            // 尝试获取分布式锁,过期时间30秒
            locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS);
            
            if (locked) {
                // 获取锁成功,处理订单支付
                orderService.processPayment(message.getOrderId(), message.getAmount());
                log.info("订单支付处理成功: {}", message.getOrderId());
            } else {
                // 获取锁失败,说明消息正在被其他消费者处理
                log.info("订单支付消息正在被处理,跳过: {}", message.getOrderId());
            }
        } finally {
            // 释放锁
            if (locked) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}

消息幂等性

Rabbitmq 可以实现消息的幂等性,以保证消息的一致性。

实际应用:在转账系统中,可以通过业务逻辑设计实现幂等性,确保同一笔转账只会执行一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 幂等性处理示例
@Service
public class TransferService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    @Autowired
    private TransferRepository transferRepository;
    
    @Transactional
    public void processTransfer(String transferId, String fromAccount, String toAccount, BigDecimal amount) {
        // 检查转账记录是否已存在
        if (transferRepository.existsById(transferId)) {
            log.info("转账已处理,跳过: {}", transferId);
            return;
        }
        
        // 执行转账操作
        Account from = accountRepository.findById(fromAccount)
            .orElseThrow(() -> new AccountNotFoundException(fromAccount));
        Account to = accountRepository.findById(toAccount)
            .orElseThrow(() -> new AccountNotFoundException(toAccount));
        
        // 检查余额
        if (from.getBalance().compareTo(amount) < 0) {
            throw new InsufficientBalanceException(fromAccount);
        }
        
        // 更新账户余额
        from.setBalance(from.getBalance().subtract(amount));
        to.setBalance(to.getBalance().add(amount));
        accountRepository.save(from);
        accountRepository.save(to);
        
        // 记录转账流水
        Transfer transfer = new Transfer();
        transfer.setId(transferId);
        transfer.setFromAccount(fromAccount);
        transfer.setToAccount(toAccount);
        transfer.setAmount(amount);
        transfer.setStatus("COMPLETED");
        transfer.setCreateTime(new Date());
        transferRepository.save(transfer);
    }
}

Rabbitmq 如何解决消息积压问题

Rabbitmq 如何解决消息积压问题主要包括以下几个方面:

增加消费者数量

Rabbitmq 可以增加消费者数量,以提高消息的处理速度。

实际应用:在电商平台的订单处理系统中,当出现订单峰值(如促销活动)时,可以动态扩展消费者实例数量,提高消息处理能力。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Spring Boot中配置消费者并发数
@Configuration
public class RabbitConsumerConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5); // 初始并发消费者数量
        factory.setMaxConcurrentConsumers(20); // 最大并发消费者数量
        factory.setPrefetchCount(10); // 每个消费者预取的消息数量
        return factory;
    }
}

// 在Kubernetes环境中动态扩展消费者Pod数量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 消费者自动扩缩容配置(Kubernetes HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: order-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: order-consumer
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: rabbitmq_queue_messages
        selector:
          matchLabels:
            queue: order.processing
      target:
        type: AverageValue
        averageValue: 100 # 当每个队列平均消息数超过100时扩容

增加 Broker 数量

Rabbitmq 可以增加 Broker 数量,以提高消息的处理速度。

实际应用:在大型分布式系统中,可以部署RabbitMQ集群,并根据负载情况动态调整节点数量,提高系统整体吞吐量。

1
2
3
4
5
6
7
8
# 向现有集群添加新节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

集群负载均衡

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 客户端连接工厂配置多个地址
@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    // 配置多个broker地址,实现负载均衡
    connectionFactory.setAddresses("rabbit1:5672,rabbit2:5672,rabbit3:5672");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    // 开启发布确认
    connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
}

优化消息路由

Rabbitmq 可以优化消息路由,以提高消息的处理速度。

实际应用:在复杂业务系统中,可以根据消息类型和优先级设计不同的交换机和队列,实现消息的高效路由和处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 优先级队列配置
@Bean
public Queue priorityQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-max-priority", 10); // 设置最大优先级为10
    return new Queue("priority.queue", true, false, false, args);
}

// 发送带优先级的消息
public void sendWithPriority(String message, int priority) {
    MessageProperties properties = new MessageProperties();
    properties.setPriority(priority); // 设置消息优先级
    Message amqpMessage = new Message(message.getBytes(), properties);
    rabbitTemplate.send("priority.exchange", "priority.key", amqpMessage);
}

消息分片处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 消息分片处理示例
@Configuration
public class ShardingConfig {
    
    // 创建多个分片队列
    @Bean
    public List<Queue> shardQueues() {
        List<Queue> queues = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            queues.add(new Queue("order.processing." + i));
        }
        return queues;
    }
    
    // 创建交换机
    @Bean
    public DirectExchange shardExchange() {
        return new DirectExchange("order.shard.exchange");
    }
    
    // 绑定队列到交换机,使用不同的路由键
    @Bean
    public List<Binding> shardBindings(List<Queue> shardQueues, DirectExchange shardExchange) {
        List<Binding> bindings = new ArrayList<>();
        for (int i = 0; i < shardQueues.size(); i++) {
            bindings.add(BindingBuilder.bind(shardQueues.get(i))
                .to(shardExchange).with("shard." + i));
        }
        return bindings;
    }
}

// 生产者:根据订单ID进行分片路由
public void sendOrderMessage(Order order) {
    int shardIndex = Math.abs(order.getId().hashCode() % 4);
    String routingKey = "shard." + shardIndex;
    rabbitTemplate.convertAndSend("order.shard.exchange", routingKey, order);
}

临时队列转储

实际应用:当出现严重消息积压时,可以创建临时队列,将消息快速转储,然后使用更多的消费者并行处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 消息转储服务
@Service
public class MessageDumpService {
    
    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void dumpMessages(String sourceQueue, int dumpCount) {
        // 创建临时队列
        List<String> tempQueues = new ArrayList<>();
        for (int i = 0; i < dumpCount; i++) {
            String queueName = sourceQueue + ".temp." + i;
            Queue queue = new Queue(queueName, false, false, true);
            rabbitAdmin.declareQueue(queue);
            tempQueues.add(queueName);
        }
        
        // 从源队列获取消息并分发到临时队列
        int counter = 0;
        while (true) {
            Message message = rabbitTemplate.receive(sourceQueue, 100);
            if (message == null) {
                break; // 队列为空,退出循环
            }
            
            // 将消息发送到临时队列
            String tempQueue = tempQueues.get(counter % tempQueues.size());
            rabbitTemplate.send("", tempQueue, message);
            counter++;
        }
        
        log.info("已将{}条消息从{}队列转储到{}个临时队列", counter, sourceQueue, dumpCount);
    }
}

Rabbitmq 交换机类型

Rabbitmq 交换机类型主要包括以下几种:

Direct Exchange

Direct Exchange 是最简单的交换机类型,它根据消息的路由键将消息转发到一个或多个 Queue。

实际应用:适用于明确知道消息应该发送到哪个队列的场景,如日志系统中根据日志级别路由消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Direct Exchange配置示例
@Configuration
public class DirectExchangeConfig {
    
    @Bean
    public DirectExchange logExchange() {
        return new DirectExchange("log.direct");
    }
    
    @Bean
    public Queue errorQueue() {
        return new Queue("log.error");
    }
    
    @Bean
    public Queue warningQueue() {
        return new Queue("log.warning");
    }
    
    @Bean
    public Queue infoQueue() {
        return new Queue("log.info");
    }
    
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange logExchange) {
        return BindingBuilder.bind(errorQueue).to(logExchange).with("error");
    }
    
    @Bean
    public Binding warningBinding(Queue warningQueue, DirectExchange logExchange) {
        return BindingBuilder.bind(warningQueue).to(logExchange).with("warning");
    }
    
    @Bean
    public Binding infoBinding(Queue infoQueue, DirectExchange logExchange) {
        return BindingBuilder.bind(infoQueue).to(logExchange).with("info");
    }
}

// 生产者:发送日志消息
public void sendLog(String level, String message) {
    LogMessage logMessage = new LogMessage(level, message, new Date());
    rabbitTemplate.convertAndSend("log.direct", level, logMessage);
}

// 消费者:处理错误日志
@RabbitListener(queues = "log.error")
public void processErrorLogs(LogMessage message) {
    log.error("收到错误日志: {}", message.getMessage());
    // 处理错误日志,如发送告警通知
    alertService.sendAlert(message);
}

应用场景

  1. 日志路由:根据日志级别(ERROR、WARNING、INFO)将日志消息路由到不同的处理队列
  2. 工作队列:任务分发系统,根据任务类型路由到专门的处理队列
  3. 命令处理:根据命令类型将消息路由到对应的命令处理器

Topic Exchange

Topic Exchange 是一种多播交换机类型,它根据消息的路由键将消息转发到一个或多个 Queue。Topic Exchange 的路由键支持通配符,* 匹配一个单词,# 匹配零个或多个单词。

实际应用:适用于消息需要按照多维度分类的场景,如根据地区和业务类型路由消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Topic Exchange配置示例
@Configuration
public class TopicExchangeConfig {
    
    @Bean
    public TopicExchange notificationExchange() {
        return new TopicExchange("notification.topic");
    }
    
    // 按地区分类的队列
    @Bean
    public Queue chinaQueue() {
        return new Queue("notification.china");
    }
    
    @Bean
    public Queue usaQueue() {
        return new Queue("notification.usa");
    }
    
    // 按业务类型分类的队列
    @Bean
    public Queue orderQueue() {
        return new Queue("notification.order");
    }
    
    @Bean
    public Queue paymentQueue() {
        return new Queue("notification.payment");
    }
    
    // 绑定关系
    @Bean
    public Binding chinaOrderBinding(Queue chinaQueue, TopicExchange notificationExchange) {
        // 匹配中国地区的订单消息
        return BindingBuilder.bind(chinaQueue).to(notificationExchange).with("china.order.*");
    }
    
    @Bean
    public Binding chinaAllBinding(Queue chinaQueue, TopicExchange notificationExchange) {
        // 匹配中国地区的所有消息
        return BindingBuilder.bind(chinaQueue).to(notificationExchange).with("china.#");
    }
    
    @Bean
    public Binding orderAllBinding(Queue orderQueue, TopicExchange notificationExchange) {
        // 匹配所有地区的订单消息
        return BindingBuilder.bind(orderQueue).to(notificationExchange).with("*.order.*");
    }
}

// 生产者:发送通知消息
public void sendNotification(String region, String business, String action, String message) {
    String routingKey = region + "." + business + "." + action;
    NotificationMessage notification = new NotificationMessage(message, new Date());
    rabbitTemplate.convertAndSend("notification.topic", routingKey, notification);
}

// 使用示例
sendNotification("china", "order", "created", "新订单创建通知");
sendNotification("usa", "payment", "completed", "支付完成通知");

应用场景

  1. 多维度消息路由:根据地区、业务类型、操作类型等多个维度路由消息
  2. 事件广播:将事件广播给多个关注特定模式的消费者
  3. 基于主题的订阅:允许消费者订阅感兴趣的特定主题

Fanout Exchange

Fanout Exchange 是一种广播交换机类型,它将消息转发到所有绑定的 Queue,不考虑路由键。

实际应用:适用于需要将消息广播给多个消费者的场景,如系统公告、配置更新等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Fanout Exchange配置示例
@Configuration
public class FanoutExchangeConfig {
    
    @Bean
    public FanoutExchange systemBroadcastExchange() {
        return new FanoutExchange("system.broadcast");
    }
    
    @Bean
    public Queue userServiceQueue() {
        return new Queue("system.broadcast.user");
    }
    
    @Bean
    public Queue orderServiceQueue() {
        return new Queue("system.broadcast.order");
    }
    
    @Bean
    public Queue inventoryServiceQueue() {
        return new Queue("system.broadcast.inventory");
    }
    
    @Bean
    public Binding userServiceBinding(Queue userServiceQueue, FanoutExchange systemBroadcastExchange) {
        return BindingBuilder.bind(userServiceQueue).to(systemBroadcastExchange);
    }
    
    @Bean
    public Binding orderServiceBinding(Queue orderServiceQueue, FanoutExchange systemBroadcastExchange) {
        return BindingBuilder.bind(orderServiceQueue).to(systemBroadcastExchange);
    }
    
    @Bean
    public Binding inventoryServiceBinding(Queue inventoryServiceQueue, FanoutExchange systemBroadcastExchange) {
        return BindingBuilder.bind(inventoryServiceQueue).to(systemBroadcastExchange);
    }
}

// 生产者:广播系统消息
public void broadcastSystemMessage(String message, String type) {
    SystemMessage systemMessage = new SystemMessage(message, type, new Date());
    // 发送到Fanout交换机,路由键被忽略
    rabbitTemplate.convertAndSend("system.broadcast", "", systemMessage);
}

// 使用示例
broadcastSystemMessage("系统将在10分钟后进行维护", "MAINTENANCE");
broadcastSystemMessage("系统配置已更新,请重新加载", "CONFIG_UPDATE");

应用场景

  1. 系统公告:向所有服务广播系统维护、更新通知
  2. 配置更新:当系统配置变更时,通知所有相关服务刷新配置
  3. 缓存失效:当数据发生变化时,通知所有缓存服务清除相关缓存

Headers Exchange

Headers Exchange 是一种根据消息头的属性将消息转发到一个或多个 Queue 的交换机类型,它不使用路由键进行匹配,而是根据消息的头部属性进行匹配。

实际应用:适用于需要根据多个条件进行路由的场景,特别是当路由条件不适合用字符串表示时。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Headers Exchange配置示例
@Configuration
public class HeadersExchangeConfig {
    
    @Bean
    public HeadersExchange documentExchange() {
        return new HeadersExchange("document.headers");
    }
    
    @Bean
    public Queue pdfProcessQueue() {
        return new Queue("document.process.pdf");
    }
    
    @Bean
    public Queue imageProcessQueue() {
        return new Queue("document.process.image");
    }
    
    @Bean
    public Queue highPriorityQueue() {
        return new Queue("document.priority.high");
    }
    
    @Bean
    public Binding pdfBinding(Queue pdfProcessQueue, HeadersExchange documentExchange) {
        return BindingBuilder.bind(pdfProcessQueue).to(documentExchange)
            .where("format").matches("pdf");
    }
    
    @Bean
    public Binding imageBinding(Queue imageProcessQueue, HeadersExchange documentExchange) {
        return BindingBuilder.bind(imageProcessQueue).to(documentExchange)
            .where("format").matches("jpg").or("format").matches("png");
    }
    
    @Bean
    public Binding highPriorityBinding(Queue highPriorityQueue, HeadersExchange documentExchange) {
        // 使用whereAll要求所有条件都匹配
        return BindingBuilder.bind(highPriorityQueue).to(documentExchange)
            .whereAll(Map.of(
                "priority", "high",
                "type", "document"
            )).match();
    }
}

// 生产者:发送带头部属性的消息
public void sendDocument(byte[] content, String format, String priority) {
    MessageProperties properties = new MessageProperties();
    properties.setHeader("format", format);
    properties.setHeader("priority", priority);
    properties.setHeader("type", "document");
    
    Message message = new Message(content, properties);
    rabbitTemplate.send("document.headers", "", message);
}

// 使用示例
sendDocument(pdfContent, "pdf", "high");
sendDocument(imageContent, "jpg", "normal");

应用场景

  1. 文档处理:根据文档类型、格式、大小等属性路由到不同的处理队列
  2. 多条件路由:需要根据多个条件进行路由,且这些条件不适合用路由键表示
  3. 特殊消息处理:根据消息的特殊属性(如优先级、安全级别)进行路由

Headers Exchange vs Topic Exchange

  • Headers Exchange 基于消息头属性匹配,可以使用多个条件,不限于字符串格式
  • Topic Exchange 基于路由键匹配,使用字符串模式匹配,支持通配符
  • 当路由条件可以用字符串表示时,Topic Exchange 通常更简单高效
  • 当需要基于多个非字符串属性或复杂条件路由时,Headers Exchange 更灵活

交换机类型选择指南

交换机类型 适用场景 优势 示例应用
Direct 精确路由,一对一或一对多 简单高效,路由明确 日志路由、任务分发
Topic 基于模式的路由,多维度分类 灵活的模式匹配,支持通配符 多区域通知、事件分发
Fanout 广播消息,一对所有 最高效的消息分发,不需要路由计算 系统公告、缓存刷新
Headers 基于属性的复杂路由 支持多条件匹配,不限于字符串 文档处理、多条件筛选

RabbitMQ 高级特性

死信队列(Dead Letter Queue)

死信队列用于处理无法被正常消费的消息。当消息被拒绝(reject/nack)且不重新入队、消息过期(TTL)或队列达到最大长度时,消息会被发送到死信队列。

实际应用:在订单处理系统中,如果订单消息处理失败达到最大重试次数,可以将其发送到死信队列进行特殊处理或人工干预。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 死信队列配置
@Configuration
public class DeadLetterConfig {
    
    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("order.dead.letter.exchange");
    }
    
    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("order.dead.letter.queue");
    }
    
    // 绑定死信队列到死信交换机
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("order.dead.letter");
    }
    
    // 业务队列,配置死信交换机
    @Bean
    public Queue orderProcessQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", "order.dead.letter.exchange");
        // 设置死信路由键
        args.put("x-dead-letter-routing-key", "order.dead.letter");
        // 设置消息过期时间(毫秒)
        args.put("x-message-ttl", 60000); // 1分钟
        return new Queue("order.process.queue", true, false, false, args);
    }
}

// 死信队列消费者
@Component
public class DeadLetterConsumer {
    
    @Autowired
    private OrderService orderService;
    
    @RabbitListener(queues = "order.dead.letter.queue")
    public void processDeadLetter(Message message) {
        try {
            // 解析消息
            OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);
            
            // 记录死信消息
            log.error("订单处理失败,进入死信队列: {}", orderMessage.getOrderId());
            
            // 发送告警通知
            alertService.sendAlert("订单处理失败", "订单ID: " + orderMessage.getOrderId());
            
            // 尝试特殊处理
            orderService.handleFailedOrder(orderMessage.getOrderId());
        } catch (Exception e) {
            log.error("处理死信消息失败", e);
        }
    }
}

延迟队列(Delayed Message)

延迟队列用于实现消息的延迟投递,常用于定时任务、延迟处理等场景。

实际应用:在电商系统中,订单创建后如果30分钟内未支付,需要自动取消并恢复库存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 延迟队列配置(需要安装rabbitmq_delayed_message_exchange插件)
@Configuration
public class DelayedConfig {
    
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("order.delayed.exchange", "x-delayed-message", true, false, args);
    }
    
    @Bean
    public Queue orderTimeoutQueue() {
        return new Queue("order.timeout.queue");
    }
    
    @Bean
    public Binding orderTimeoutBinding() {
        return BindingBuilder.bind(orderTimeoutQueue())
            .to(delayedExchange())
            .with("order.timeout")
            .noargs();
    }
}

// 发送延迟消息
public void createOrder(Order order) {
    // 保存订单
    orderRepository.save(order);
    
    // 发送延迟消息,30分钟后检查订单状态
    OrderTimeoutMessage message = new OrderTimeoutMessage(order.getId());
    
    MessageProperties properties = new MessageProperties();
    // 设置延迟时间(毫秒)
    properties.setHeader("x-delay", 30 * 60 * 1000); // 30分钟
    
    Message amqpMessage = new Message(objectMapper.writeValueAsBytes(message), properties);
    rabbitTemplate.send("order.delayed.exchange", "order.timeout", amqpMessage);
}

// 延迟消息消费者
@Component
public class OrderTimeoutConsumer {
    
    @Autowired
    private OrderService orderService;
    
    @RabbitListener(queues = "order.timeout.queue")
    public void processOrderTimeout(OrderTimeoutMessage message) {
        String orderId = message.getOrderId();
        
        // 检查订单状态
        Order order = orderService.getOrder(orderId);
        if (order != null && "UNPAID".equals(order.getStatus())) {
            // 订单未支付,执行取消操作
            orderService.cancelOrder(orderId);
            log.info("订单{}超时未支付,已自动取消", orderId);
        }
    }
}

优先级队列(Priority Queue)

优先级队列允许根据消息的优先级决定消费顺序,优先级高的消息会被优先消费。

实际应用:在客服系统中,VIP客户的服务请求应该优先处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 优先级队列配置
@Bean
public Queue supportQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-max-priority", 10); // 设置最大优先级为10
    return new Queue("support.queue", true, false, false, args);
}

// 发送带优先级的消息
public void createSupportTicket(Ticket ticket) {
    // 保存工单
    ticketRepository.save(ticket);
    
    // 根据客户等级设置优先级
    int priority = switch (ticket.getCustomerLevel()) {
        case "VIP" -> 10;
        case "GOLD" -> 8;
        case "SILVER" -> 5;
        default -> 1;
    };
    
    // 发送消息
    MessageProperties properties = new MessageProperties();
    properties.setPriority(priority);
    
    Message message = new Message(objectMapper.writeValueAsBytes(ticket), properties);
    rabbitTemplate.send("support.exchange", "support.ticket", message);
}

RabbitMQ 最佳实践

生产者最佳实践

  1. 确认机制:始终使用发布确认(Publisher Confirms)机制确保消息成功发送到RabbitMQ服务器。

  2. 消息持久化:对于重要业务,确保交换机、队列和消息都设置为持久化(durable)。

  3. 消息序列化:使用高效的序列化方式(如JSON、Protocol Buffers)并处理好序列化异常。

  4. 重试机制:实现消息发送失败的重试策略,可以使用指数退避算法。

  5. 批量发送:在高吞吐量场景下,考虑批量发送消息以提高性能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 生产者最佳实践示例
@Service
public class BestPracticeProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private RetryTemplate retryTemplate;
    
    public void sendWithBestPractice(Object message, String exchange, String routingKey) {
        // 使用重试模板
        retryTemplate.execute(context -> {
            // 创建CorrelationData用于确认回调
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            
            // 设置确认回调
            correlationData.getFuture().addCallback(
                confirm -> {
                    if (confirm.isAck()) {
                        log.info("消息已确认: {}", correlationData.getId());
                    } else {
                        log.error("消息未确认: {}, 原因: {}", correlationData.getId(), confirm.getReason());
                        // 可以将未确认的消息保存到数据库,后续重试
                    }
                },
                ex -> log.error("消息确认异常", ex)
            );
            
            // 发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            return null;
        });
    }
}

消费者最佳实践

  1. 手动确认:对于重要业务,使用手动确认模式,确保消息处理成功后再确认。

  2. 幂等性处理:实现消费者的幂等性,确保重复消费不会导致业务问题。

  3. 并发控制:根据业务特性和硬件资源合理设置消费者的并发数。

  4. 异常处理:妥善处理消费过程中的异常,决定是拒绝还是重新入队。

  5. 预取数量:合理设置prefetch count,避免单个消费者负载过重。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 消费者最佳实践示例
@Component
public class BestPracticeConsumer {
    
    @Autowired
    private ProcessRecordRepository recordRepository;
    
    @RabbitListener(queues = "best.practice.queue", ackMode = "MANUAL", concurrency = "5-10")
    public void consume(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            // 检查是否已处理(幂等性检查)
            if (recordRepository.existsByMessageId(messageId)) {
                log.info("消息已处理,跳过: {}", messageId);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 处理消息
            Object payload = objectMapper.readValue(message.getBody(), TargetType.class);
            processMessage(payload);
            
            // 记录处理状态
            recordRepository.save(new ProcessRecord(messageId, "SUCCESS"));
            
            // 确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("处理消息失败: {}", messageId, e);
            
            // 判断是否需要重试
            if (isRetryable(e)) {
                // 拒绝消息并重新入队
                channel.basicNack(deliveryTag, false, true);
            } else {
                // 拒绝消息不重新入队,进入死信队列
                channel.basicNack(deliveryTag, false, false);
                // 记录失败状态
                recordRepository.save(new ProcessRecord(messageId, "FAILED", e.getMessage()));
            }
        }
    }
    
    private boolean isRetryable(Exception e) {
        // 判断异常是否可重试
        return e instanceof TemporaryException || e instanceof IOException;
    }
}

监控和运维最佳实践

  1. 健康检查:定期检查RabbitMQ集群的健康状态,包括节点状态、队列长度等。

  2. 告警机制:设置关键指标的告警阈值,如队列长度、消息积压、消费者数量等。

  3. 日志记录:记录关键操作和异常情况,便于问题排查。

  4. 资源隔离:为不同业务系统使用不同的vhost,实现资源隔离。

  5. 容量规划:根据业务增长预测,提前规划RabbitMQ集群的容量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 监控示例
@Component
public class RabbitMQMonitor {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void monitorQueueStatus() {
        // 获取队列信息
        Queue queue = rabbitTemplate.execute(channel -> {
            try {
                return channel.queueDeclarePassive("important.queue");
            } catch (IOException e) {
                log.error("获取队列信息失败", e);
                return null;
            }
        });
        
        if (queue != null) {
            int messageCount = queue.getMessageCount();
            int consumerCount = queue.getConsumerCount();
            
            log.info("队列状态 - 消息数: {}, 消费者数: {}", messageCount, consumerCount);
            
            // 检查队列积压
            if (messageCount > 1000 && consumerCount < 5) {
                log.warn("队列积压告警 - 消息数: {}, 消费者数: {}", messageCount, consumerCount);
                alertService.sendAlert("队列积压", "important.queue队列积压,消息数: " + messageCount);
            }
        }
    }
}

总结

RabbitMQ作为一个成熟的消息队列中间件,在分布式系统中扮演着重要角色。通过合理使用其交换机类型、队列特性和消息属性,可以构建高效、可靠的消息传递系统。在实际应用中,需要根据业务场景选择合适的消息模型,并遵循最佳实践,确保系统的稳定性和可靠性。

对于不同的业务场景,RabbitMQ提供了灵活的解决方案:

  1. 异步处理:通过消息队列实现系统解耦,提高响应速度
  2. 流量削峰:在高并发场景下缓冲请求,平滑处理峰值流量
  3. 可靠通信:通过消息持久化、确认机制等保证消息不丢失
  4. 灵活路由:利用不同类型的交换机实现复杂的消息路由逻辑
  5. 延时处理:使用延迟队列实现定时任务和延迟处理

在使用RabbitMQ时,应当注意消息的可靠性、幂等性处理、性能优化和监控运维等方面,确保消息队列在系统中发挥最大价值。

使用 Hugo 构建
主题 StackJimmy 设计