工程实践 分布式系统

RabbitMQ 消息队列:与 Kafka 的选型对比及银行实战

从 RabbitMQ 核心概念(Exchange/Queue/Binding)到 Spring AMQP 实战,详解 RabbitMQ 与 Kafka 的选型决策、银行支付场景应用,以及常见问题与避坑指南。

发布于 2026/03/22 更新于 2026/03/22 1 分钟

“Kafka 是日志系统发家的流平台,RabbitMQ 是以消息路由为核心的消息代理。两者都能做消息队列,但适用场景完全不同。选错技术栈的代价,是整个异步架构的重新设计。“

前言

在 HSBC 银行系统里,Kafka 承担了日志收集、事件流、审计追踪等大数据场景。但 RabbitMQ 仍然有它不可替代的场景:可靠消息传递复杂路由事务性消息

Kafka 的核心优势是高吞吐(百万级/秒),RabbitMQ 的核心优势是灵活路由可靠性。两者不是竞争关系,而是互补关系。

1. RabbitMQ 核心概念

RabbitMQ 的模型比 Kafka 复杂,但概念非常清晰:

Producer → Exchange → Binding → Queue → Consumer

           Routing Key

核心概念:
  Exchange(交换机):接收 Producer 消息,按规则路由
  Queue(队列):存储消息,消费者从中取消息
  Binding(绑定):Exchange 和 Queue 之间的路由规则
  Routing Key(路由键):Producer 发送消息时指定,Exchange 依据此路由

1.1 四种 Exchange 类型

# 1. Direct Exchange:精确匹配路由键
# 生产者:exchange=payments, routing_key=payment.created
# 绑定:queue=payment-created-queue, binding_key=payment.created
# → 消息精确路由到 payment-created-queue

# 2. Fanout Exchange:广播,所有绑定的队列都收到
# 生产者:exchange=notifications, routing_key=(任意)
# 绑定:queue=email-queue + queue=sms-queue + queue=push-queue
# → 消息同时发往三个队列

# 3. Topic Exchange:通配符匹配
# 绑定键格式:<word>.<word>.<word>
# * 匹配一个单词
# # 匹配零个或多个单词
# 示例:
#   binding_key: payment.*.created   → payment.refund.created ✓, payment.completed.created ✓
#   binding_key: payment.#             → payment.anything.anything ✓

# 4. Headers Exchange:基于消息头匹配(较少用)
# 按消息头的键值对匹配,而非路由键

2. Spring AMQP 实战

2.1 依赖与配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# application.yml
spring:
  rabbitmq:
    host: rabbitmq.prod.internal
    port: 5672
    username: ${RABBITMQ_USER}
    password: ${RABBITMQ_PASSWORD}
    virtual-host: /banking

    # 连接池配置
    connection-timeout: 10000
    requested-heartbeat: 60
    listener:
      simple:
        acknowledge-mode: manual    # 手动 ACK(生产环境必须)
        prefetch: 10                # 每次预取 10 条消息
        concurrency: 5               # 最小消费者数
        max-concurrency: 20          # 最大消费者数
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          max-interval: 10000
          multiplier: 2.0

2.2 配置类:声明 Exchange、Queue、Binding

@Configuration
@RequiredArgsConstructor
public class RabbitMQConfig {

    public static final String PAYMENT_EXCHANGE = "payment.exchange";
    public static final String PAYMENT_QUEUE = "payment.processing.queue";
    public static final String PAYMENT_ROUTING_KEY = "payment.created";
    public static final String DLX_EXCHANGE = "payment.dlx.exchange";
    public static final String DLQ_QUEUE = "payment.dlq";

    // 1. 声明死信交换机(消息处理失败后进入这里)
    @Bean
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder
            .directExchange(DLX_EXCHANGE)
            .durable(true)
            .build();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_QUEUE).build();
    }

    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder
            .bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("payment.dead");
    }

    // 2. 声明主交换机
    @Bean
    public DirectExchange paymentExchange() {
        return ExchangeBuilder
            .directExchange(PAYMENT_EXCHANGE)
            .durable(true)
            .build();
    }

    // 3. 声明主队列(配置死信路由)
    @Bean
    public Queue paymentQueue() {
        return QueueBuilder.durable(PAYMENT_QUEUE)
            .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
            .withArgument("x-dead-letter-routing-key", "payment.dead")
            .withArgument("x-message-ttl", 300000)  // 队列级 TTL:5 分钟
            .build();
    }

    // 4. 绑定
    @Bean
    public Binding paymentBinding() {
        return BindingBuilder
            .bind(paymentQueue())
            .to(paymentExchange())
            .with(PAYMENT_ROUTING_KEY);
    }
}

2.3 生产者:可靠消息发送

@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentEventPublisher {

    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper;

    // 普通发送(fire-and-forget)
    public void publishPaymentCreated(Payment payment) {
        PaymentEvent event = new PaymentEvent(
            "PAYMENT_CREATED",
            payment.getTransactionId(),
            payment.getAmount(),
            Instant.now()
        );

        rabbitTemplate.convertAndSend(
            RabbitMQConfig.PAYMENT_EXCHANGE,
            RabbitMQConfig.PAYMENT_ROUTING_KEY,
            event,
            message -> {
                // 消息属性:持久化 + 内容类型
                message.getMessageProperties().setDeliveryMode(
                    MessageDeliveryMode.PERSISTENT);  // 持久化到磁盘
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setMessageId(
                    payment.getTransactionId());
                message.getMessageProperties().setTimestamp(Date.from(Instant.now()));
                return message;
            }
        );

        log.info("支付事件已发布: txnId={}, eventType={}",
            payment.getTransactionId(), event.type());
    }

    // 可靠发送:Confirm 模式(Broker 确认收到)
    public void publishWithConfirm(Payment payment) {
        // 开启 Publisher Confirms
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.error("消息发送失败: txnId={}, cause={}",
                    payment.getTransactionId(), cause);
                // 发送到备用队列或重试
                publishToRetryQueue(payment);
            } else {
                log.debug("消息发送确认: txnId={}",
                    payment.getTransactionId());
            }
        });

        PaymentEvent event = new PaymentEvent(
            "PAYMENT_CREATED",
            payment.getTransactionId(),
            payment.getAmount(),
            Instant.now()
        );

        rabbitTemplate.convertAndSend(
            RabbitMQConfig.PAYMENT_EXCHANGE,
            RabbitMQConfig.PAYMENT_ROUTING_KEY,
            event);
    }
}

2.4 消费者:手动 ACK + 重试

@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentEventConsumer {

    private static final int MAX_RETRY = 3;

    private final RabbitTemplate rabbitTemplate;
    private final PaymentService paymentService;

    @RabbitListener(queues = RabbitMQConfig.PAYMENT_QUEUE)
    public void handlePaymentCreated(
            Message message,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
            Channel channel) {

        String messageId = message.getMessageProperties().getMessageId();
        int retryCount = getRetryCount(message);

        try {
            // 1. 解析消息
            PaymentEvent event = parseMessage(message);

            log.info("收到支付事件: txnId={}, retry={}",
                event.transactionId(), retryCount);

            // 2. 业务处理(幂等)
            paymentService.processPayment(event);

            // 3. 手动 ACK:确认消息已处理
            channel.basicAck(deliveryTag, false);
            log.info("支付处理成功: txnId={}", event.transactionId());

        } catch (RetryableException e) {
            // 4a. 可重试异常:重试或发送到 DLQ
            handleRetry(message, channel, deliveryTag, retryCount, e);

        } catch (Exception e) {
            // 4b. 不可重试异常:直接发送到 DLQ
            log.error("支付处理失败(不可重试): messageId={}", messageId, e);
            channel.basicReject(deliveryTag, false);  // 不重试,直接入 DLQ
        }
    }

    private void handleRetry(Message message, Channel channel,
                             long deliveryTag, int retryCount, Exception e) {
        String messageId = message.getMessageProperties().getMessageId();

        if (retryCount >= MAX_RETRY) {
            // 重试次数耗尽,进入 DLQ
            log.warn("重试次数耗尽,发送到 DLQ: messageId={}, retry={}",
                messageId, retryCount);
            try {
                channel.basicReject(deliveryTag, false);
            } catch (IOException ioException) {
                log.error("拒绝消息失败", ioException);
            }
        } else {
            // 重新入队(延迟重试)
            try {
                channel.basicNack(deliveryTag, false, true);
                log.info("消息重试入队: messageId={}, retry={}",
                    messageId, retryCount + 1);
            } catch (IOException ioException) {
                log.error("NACK 失败", ioException);
            }
        }
    }

    private int getRetryCount(Message message) {
        Object count = message.getMessageProperties()
            .getHeaders().get("x-retry-count");
        return count != null ? (int) count : 0;
    }

    private PaymentEvent parseMessage(Message message) throws IOException {
        return new ObjectMapper().readValue(
            message.getBody(), PaymentEvent.class);
    }
}

3. RabbitMQ vs Kafka:选型决策

维度RabbitMQKafka
消息模型队列(Queue)流(Log)
消费模式竞争消费(单播)发布订阅(广播)
消息顺序单 Queue 内有序Partition 内有序
消息持久化可选,默认不持久化默认持久化(磁盘顺序写)
吞吐量1-10 万/秒100 万+/秒
消息 TTL支持(队列/消息级)仅消息级
延迟极低(微秒级)毫秒级(批量吞吐优先)
死信队列原生支持需手动设计
事务消息支持支持(Exactly-once)
运维复杂度中(Erlang VM)高(ZooKeeper/KRaft)
选型口诀:

Kafka:日志、事件流、大数据、审计、CDC
RabbitMQ:任务队列、复杂路由、可靠消息、延迟队列

3.1 银行场景选型

适合 RabbitMQ 的银行场景:
 支付回调处理(需要 ACK、死信队列)
 异步通知(邮件/短信/Push)→ Fanout 广播
 任务队列(异步执行业务)→ 竞争消费
 延迟队列(订单超时取消)→ 插件 delayed-message
 RPC 远程调用(RabbitMQ RPC 模式)

适合 Kafka 的银行场景:
 交易日志审计(高吞吐、长期保留)
 实时风控规则引擎(事件流处理)
 跨系统事件总线(微服务事件驱动)
 用户行为分析(高吞吐、离线处理)

4. 银行实战:支付回调异步处理

@Service
@Slf4j
public class PaymentCallbackService {

    private final RabbitTemplate rabbitTemplate;

    // 接收第三方支付回调,立即返回 + 消息队列异步处理
    @PostMapping("/callback/alipay")
    public ResponseEntity<?> alipayCallback(
            @RequestBody AlipayCallbackRequest request) {

        // 1. 快速幂等校验
        if (idempotentService.isProcessed(request.getTradeNo())) {
            return ResponseEntity.ok(Map.of("code", "SUCCESS"));
        }

        // 2. 立即返回给支付宝(避免超时)
        // 回调必须在 3 秒内响应,否则支付宝会重试
        ResponseEntity.ok(Map.of("code", "SUCCESS"));

        // 3. 异步发送到 RabbitMQ(不阻塞回调)
        PaymentCallbackEvent event = new PaymentCallbackEvent(
            request.getTradeNo(),
            request.getAmount(),
            "ALIPAY",
            Instant.now()
        );

        rabbitTemplate.convertAndSend(
            "payment.callback.exchange",
            "callback.alipay",
            event,
            m -> {
                m.getMessageProperties().setPriority(5);  // 高优先级
                m.getMessageProperties().setExpiration(
                    String.valueOf(Duration.ofHours(1).toMillis()));
                return m;
            }
        );

        return ResponseEntity.ok(Map.of("code", "SUCCESS"));
    }
}

4.1 延迟队列:订单超时取消

// 使用 RabbitMQ 延迟插件(rabbitmq_delayed_message_exchange)
@Configuration
public class DelayQueueConfig {

    public static final String ORDER_EXCHANGE = "order.delay.exchange";
    public static final String ORDER_QUEUE = "order.timeout.queue";
    public static final String ORDER_CANCEL_ROUTING_KEY = "order.timeout";

    @Bean
    public CustomExchange orderDelayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(ORDER_EXCHANGE, "x-delayed-message",
            true, false, args);
    }

    @Bean
    public Queue orderTimeoutQueue() {
        return QueueBuilder.durable(ORDER_QUEUE).build();
    }

    @Bean
    public Binding orderTimeoutBinding() {
        return BindingBuilder
            .bind(orderTimeoutQueue())
            .to(orderDelayExchange())
            .with(ORDER_CANCEL_ROUTING_KEY)
            .noargs();
    }
}

// 发送延迟消息(订单创建后 30 分钟未支付,自动取消)
public void scheduleOrderTimeout(String orderId) {
    rabbitTemplate.convertAndSend(
        DelayQueueConfig.ORDER_EXCHANGE,
        DelayQueueConfig.ORDER_CANCEL_ROUTING_KEY,
        Map.of("orderId", orderId, "action", "CANCEL"),
        message -> {
            // 延迟 30 分钟
            message.getMessageProperties().setDelay(30 * 60 * 1000);
            return message;
        }
    );
}

5. 高可用配置

# RabbitMQ 集群配置(3 节点)
# /etc/rabbitmq/rabbitmq.conf
listeners.tcp.default = 5672
management.tcp.port = 15672

# 开启 HA 队列
queue_master_locator = min-masters
ha-mode = exactly
ha-params = 3
ha-sync-mode = automatic

# 开启持久化
queue durability_enabled = true
// 消费者端:连接多个节点,自动故障转移
@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    // 连接字符串:逗号分隔多个节点
    factory.setUri("amqp://user:pass@node1:5672,user:pass@node2:5672,user:pass@node3:5672");
    factory.setRequestedHeartBeat(60);
    factory.setConnectionTimeout(10000);
    return factory;
}

相关阅读:Kafka 消息队列 · [Redis 实现消息队列](/coding/Redis/Redis 实现消息队列) · 分布式事务与 Saga 模式实战