“Kafka 是日志系统发家的流平台,RabbitMQ 是以消息路由为核心的消息代理。两者都能做消息队列,但适用场景完全不同。选错技术栈的代价,是整个异步架构的重新设计。“
前言
在 HSBC 银行系统里,Kafka 承担了日志收集、事件流、审计追踪等大数据场景。但 RabbitMQ 仍然有它不可替代的场景:可靠消息传递、复杂路由、事务性消息。
Kafka 的核心优势是高吞吐(百万级/秒),RabbitMQ 的核心优势是灵活路由和可靠性。两者不是竞争关系,而是互补关系。
1. RabbitMQ 核心概念
RabbitMQ 的模型比 Kafka 复杂,但概念非常清晰:
Producer → Exchange → Binding → Queue → Consumer
↑
Routing Key
核心概念:
Exchange(交换机):接收 Producer 消息,按规则路由
Queue(队列):存储消息,消费者从中取消息
Binding(绑定):Exchange 和 Queue 之间的路由规则
Routing Key(路由键):Producer 发送消息时指定,Exchange 依据此路由
1.1 四种 Exchange 类型
# 1. Direct Exchange:精确匹配路由键
# 生产者:exchange=payments, routing_key=payment.created
# 绑定:queue=payment-created-queue, binding_key=payment.created
# → 消息精确路由到 payment-created-queue
# 2. Fanout Exchange:广播,所有绑定的队列都收到
# 生产者:exchange=notifications, routing_key=(任意)
# 绑定:queue=email-queue + queue=sms-queue + queue=push-queue
# → 消息同时发往三个队列
# 3. Topic Exchange:通配符匹配
# 绑定键格式:<word>.<word>.<word>
# * 匹配一个单词
# # 匹配零个或多个单词
# 示例:
# binding_key: payment.*.created → payment.refund.created ✓, payment.completed.created ✓
# binding_key: payment.# → payment.anything.anything ✓
# 4. Headers Exchange:基于消息头匹配(较少用)
# 按消息头的键值对匹配,而非路由键
2. Spring AMQP 实战
2.1 依赖与配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# application.yml
spring:
rabbitmq:
host: rabbitmq.prod.internal
port: 5672
username: ${RABBITMQ_USER}
password: ${RABBITMQ_PASSWORD}
virtual-host: /banking
# 连接池配置
connection-timeout: 10000
requested-heartbeat: 60
listener:
simple:
acknowledge-mode: manual # 手动 ACK(生产环境必须)
prefetch: 10 # 每次预取 10 条消息
concurrency: 5 # 最小消费者数
max-concurrency: 20 # 最大消费者数
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
2.2 配置类:声明 Exchange、Queue、Binding
@Configuration
@RequiredArgsConstructor
public class RabbitMQConfig {
public static final String PAYMENT_EXCHANGE = "payment.exchange";
public static final String PAYMENT_QUEUE = "payment.processing.queue";
public static final String PAYMENT_ROUTING_KEY = "payment.created";
public static final String DLX_EXCHANGE = "payment.dlx.exchange";
public static final String DLQ_QUEUE = "payment.dlq";
// 1. 声明死信交换机(消息处理失败后进入这里)
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder
.directExchange(DLX_EXCHANGE)
.durable(true)
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_QUEUE).build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("payment.dead");
}
// 2. 声明主交换机
@Bean
public DirectExchange paymentExchange() {
return ExchangeBuilder
.directExchange(PAYMENT_EXCHANGE)
.durable(true)
.build();
}
// 3. 声明主队列(配置死信路由)
@Bean
public Queue paymentQueue() {
return QueueBuilder.durable(PAYMENT_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "payment.dead")
.withArgument("x-message-ttl", 300000) // 队列级 TTL:5 分钟
.build();
}
// 4. 绑定
@Bean
public Binding paymentBinding() {
return BindingBuilder
.bind(paymentQueue())
.to(paymentExchange())
.with(PAYMENT_ROUTING_KEY);
}
}
2.3 生产者:可靠消息发送
@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentEventPublisher {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
// 普通发送(fire-and-forget)
public void publishPaymentCreated(Payment payment) {
PaymentEvent event = new PaymentEvent(
"PAYMENT_CREATED",
payment.getTransactionId(),
payment.getAmount(),
Instant.now()
);
rabbitTemplate.convertAndSend(
RabbitMQConfig.PAYMENT_EXCHANGE,
RabbitMQConfig.PAYMENT_ROUTING_KEY,
event,
message -> {
// 消息属性:持久化 + 内容类型
message.getMessageProperties().setDeliveryMode(
MessageDeliveryMode.PERSISTENT); // 持久化到磁盘
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setMessageId(
payment.getTransactionId());
message.getMessageProperties().setTimestamp(Date.from(Instant.now()));
return message;
}
);
log.info("支付事件已发布: txnId={}, eventType={}",
payment.getTransactionId(), event.type());
}
// 可靠发送:Confirm 模式(Broker 确认收到)
public void publishWithConfirm(Payment payment) {
// 开启 Publisher Confirms
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败: txnId={}, cause={}",
payment.getTransactionId(), cause);
// 发送到备用队列或重试
publishToRetryQueue(payment);
} else {
log.debug("消息发送确认: txnId={}",
payment.getTransactionId());
}
});
PaymentEvent event = new PaymentEvent(
"PAYMENT_CREATED",
payment.getTransactionId(),
payment.getAmount(),
Instant.now()
);
rabbitTemplate.convertAndSend(
RabbitMQConfig.PAYMENT_EXCHANGE,
RabbitMQConfig.PAYMENT_ROUTING_KEY,
event);
}
}
2.4 消费者:手动 ACK + 重试
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentEventConsumer {
private static final int MAX_RETRY = 3;
private final RabbitTemplate rabbitTemplate;
private final PaymentService paymentService;
@RabbitListener(queues = RabbitMQConfig.PAYMENT_QUEUE)
public void handlePaymentCreated(
Message message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) {
String messageId = message.getMessageProperties().getMessageId();
int retryCount = getRetryCount(message);
try {
// 1. 解析消息
PaymentEvent event = parseMessage(message);
log.info("收到支付事件: txnId={}, retry={}",
event.transactionId(), retryCount);
// 2. 业务处理(幂等)
paymentService.processPayment(event);
// 3. 手动 ACK:确认消息已处理
channel.basicAck(deliveryTag, false);
log.info("支付处理成功: txnId={}", event.transactionId());
} catch (RetryableException e) {
// 4a. 可重试异常:重试或发送到 DLQ
handleRetry(message, channel, deliveryTag, retryCount, e);
} catch (Exception e) {
// 4b. 不可重试异常:直接发送到 DLQ
log.error("支付处理失败(不可重试): messageId={}", messageId, e);
channel.basicReject(deliveryTag, false); // 不重试,直接入 DLQ
}
}
private void handleRetry(Message message, Channel channel,
long deliveryTag, int retryCount, Exception e) {
String messageId = message.getMessageProperties().getMessageId();
if (retryCount >= MAX_RETRY) {
// 重试次数耗尽,进入 DLQ
log.warn("重试次数耗尽,发送到 DLQ: messageId={}, retry={}",
messageId, retryCount);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("拒绝消息失败", ioException);
}
} else {
// 重新入队(延迟重试)
try {
channel.basicNack(deliveryTag, false, true);
log.info("消息重试入队: messageId={}, retry={}",
messageId, retryCount + 1);
} catch (IOException ioException) {
log.error("NACK 失败", ioException);
}
}
}
private int getRetryCount(Message message) {
Object count = message.getMessageProperties()
.getHeaders().get("x-retry-count");
return count != null ? (int) count : 0;
}
private PaymentEvent parseMessage(Message message) throws IOException {
return new ObjectMapper().readValue(
message.getBody(), PaymentEvent.class);
}
}
3. RabbitMQ vs Kafka:选型决策
| 维度 | RabbitMQ | Kafka |
|---|---|---|
| 消息模型 | 队列(Queue) | 流(Log) |
| 消费模式 | 竞争消费(单播) | 发布订阅(广播) |
| 消息顺序 | 单 Queue 内有序 | Partition 内有序 |
| 消息持久化 | 可选,默认不持久化 | 默认持久化(磁盘顺序写) |
| 吞吐量 | 1-10 万/秒 | 100 万+/秒 |
| 消息 TTL | 支持(队列/消息级) | 仅消息级 |
| 延迟 | 极低(微秒级) | 毫秒级(批量吞吐优先) |
| 死信队列 | 原生支持 | 需手动设计 |
| 事务消息 | 支持 | 支持(Exactly-once) |
| 运维复杂度 | 中(Erlang VM) | 高(ZooKeeper/KRaft) |
选型口诀:
Kafka:日志、事件流、大数据、审计、CDC
RabbitMQ:任务队列、复杂路由、可靠消息、延迟队列
3.1 银行场景选型
适合 RabbitMQ 的银行场景:
✅ 支付回调处理(需要 ACK、死信队列)
✅ 异步通知(邮件/短信/Push)→ Fanout 广播
✅ 任务队列(异步执行业务)→ 竞争消费
✅ 延迟队列(订单超时取消)→ 插件 delayed-message
✅ RPC 远程调用(RabbitMQ RPC 模式)
适合 Kafka 的银行场景:
✅ 交易日志审计(高吞吐、长期保留)
✅ 实时风控规则引擎(事件流处理)
✅ 跨系统事件总线(微服务事件驱动)
✅ 用户行为分析(高吞吐、离线处理)
4. 银行实战:支付回调异步处理
@Service
@Slf4j
public class PaymentCallbackService {
private final RabbitTemplate rabbitTemplate;
// 接收第三方支付回调,立即返回 + 消息队列异步处理
@PostMapping("/callback/alipay")
public ResponseEntity<?> alipayCallback(
@RequestBody AlipayCallbackRequest request) {
// 1. 快速幂等校验
if (idempotentService.isProcessed(request.getTradeNo())) {
return ResponseEntity.ok(Map.of("code", "SUCCESS"));
}
// 2. 立即返回给支付宝(避免超时)
// 回调必须在 3 秒内响应,否则支付宝会重试
ResponseEntity.ok(Map.of("code", "SUCCESS"));
// 3. 异步发送到 RabbitMQ(不阻塞回调)
PaymentCallbackEvent event = new PaymentCallbackEvent(
request.getTradeNo(),
request.getAmount(),
"ALIPAY",
Instant.now()
);
rabbitTemplate.convertAndSend(
"payment.callback.exchange",
"callback.alipay",
event,
m -> {
m.getMessageProperties().setPriority(5); // 高优先级
m.getMessageProperties().setExpiration(
String.valueOf(Duration.ofHours(1).toMillis()));
return m;
}
);
return ResponseEntity.ok(Map.of("code", "SUCCESS"));
}
}
4.1 延迟队列:订单超时取消
// 使用 RabbitMQ 延迟插件(rabbitmq_delayed_message_exchange)
@Configuration
public class DelayQueueConfig {
public static final String ORDER_EXCHANGE = "order.delay.exchange";
public static final String ORDER_QUEUE = "order.timeout.queue";
public static final String ORDER_CANCEL_ROUTING_KEY = "order.timeout";
@Bean
public CustomExchange orderDelayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(ORDER_EXCHANGE, "x-delayed-message",
true, false, args);
}
@Bean
public Queue orderTimeoutQueue() {
return QueueBuilder.durable(ORDER_QUEUE).build();
}
@Bean
public Binding orderTimeoutBinding() {
return BindingBuilder
.bind(orderTimeoutQueue())
.to(orderDelayExchange())
.with(ORDER_CANCEL_ROUTING_KEY)
.noargs();
}
}
// 发送延迟消息(订单创建后 30 分钟未支付,自动取消)
public void scheduleOrderTimeout(String orderId) {
rabbitTemplate.convertAndSend(
DelayQueueConfig.ORDER_EXCHANGE,
DelayQueueConfig.ORDER_CANCEL_ROUTING_KEY,
Map.of("orderId", orderId, "action", "CANCEL"),
message -> {
// 延迟 30 分钟
message.getMessageProperties().setDelay(30 * 60 * 1000);
return message;
}
);
}
5. 高可用配置
# RabbitMQ 集群配置(3 节点)
# /etc/rabbitmq/rabbitmq.conf
listeners.tcp.default = 5672
management.tcp.port = 15672
# 开启 HA 队列
queue_master_locator = min-masters
ha-mode = exactly
ha-params = 3
ha-sync-mode = automatic
# 开启持久化
queue durability_enabled = true
// 消费者端:连接多个节点,自动故障转移
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
// 连接字符串:逗号分隔多个节点
factory.setUri("amqp://user:pass@node1:5672,user:pass@node2:5672,user:pass@node3:5672");
factory.setRequestedHeartBeat(60);
factory.setConnectionTimeout(10000);
return factory;
}
相关阅读:Kafka 消息队列 · [Redis 实现消息队列](/coding/Redis/Redis 实现消息队列) · 分布式事务与 Saga 模式实战