0%

消息队列选型与应用:RabbitMQ、Kafka、RocketMQ 全面对比

消息队列选型与应用:RabbitMQ、Kafka、RocketMQ 全面对比

本文深入分析主流消息队列的技术特点、适用场景和选型策略,结合实际案例讲解消息队列在解耦、异步、削峰等场景的最佳实践。

一、为什么需要消息队列?

1.1 核心应用场景

1
2
3
4
5
6
7
8
9
10
11
12
graph TB
A[消息队列核心价值] --> B[解耦]
A --> C[异步]
A --> D[削峰]
A --> E[顺序保证]
A --> F[事务消息]

B --> B1[系统间松耦合]
C --> C1[提升响应速度]
D --> D1[平滑流量峰值]
E --> E1[保证业务顺序]
F --> F1[分布式事务]

1.2 场景详解

场景 1:系统解耦

问题:订单系统直接调用库存、物流、通知系统,耦合严重。

1
2
3
4
5
6
7
8
9
10
11
12
13
graph TB
subgraph 改造前
A[订单系统] --> B[库存系统]
A --> C[物流系统]
A --> D[通知系统]
end

subgraph 改造后
E[订单系统] --> F[消息队列]
F --> G[库存系统]
F --> H[物流系统]
F --> I[通知系统]
end

改造前代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 订单服务 - 强耦合
@Service
public class OrderService {

@Autowired
private InventoryService inventoryService;

@Autowired
private LogisticsService logisticsService;

@Autowired
private NotificationService notificationService;

public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(request.toOrder());

// 同步调用,任何一个失败都会导致订单创建失败
inventoryService.deduct(order.getItems());
logisticsService.createDelivery(order);
notificationService.sendOrderCreated(order);

return order;
}
}

改造后代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 订单服务 - 解耦
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(request.toOrder());

// 发送消息,立即返回
OrderCreatedEvent event = new OrderCreatedEvent(order);
rabbitTemplate.convertAndSend("order.events", event);

return order;
}
}

// 库存服务 - 独立消费
@Component
public class InventoryListener {

@RabbitListener(queues = "inventory.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
inventoryService.deduct(event.getItems());
}
}

// 物流服务 - 独立消费
@Component
public class LogisticsListener {

@RabbitListener(queues = "logistics.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
logisticsService.createDelivery(event.getOrder());
}
}

收益:

  • 订单系统响应时间从 500ms 降至 50ms
  • 库存/物流系统故障不影响订单创建
  • 新增服务(如数据分析)无需修改订单系统

场景 2:异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 用户注册 - 同步处理(慢)
public User register(RegisterRequest request) {
User user = userRepository.save(request.toUser());

// 以下操作耗时且非核心
emailService.sendWelcomeEmail(user); // 200ms
smsService.sendVerifySms(user); // 300ms
analyticsService.trackUserRegister(user); // 100ms
couponService.grantWelcomeCoupon(user); // 150ms

return user; // 总耗时:750ms+
}

// 用户注册 - 异步处理(快)
public User register(RegisterRequest request) {
User user = userRepository.save(request.toUser());

// 发送消息,立即返回
eventPublisher.publish(new UserRegisteredEvent(user));

return user; // 总耗时:50ms
}

场景 3:削峰填谷

1
2
3
4
5
6
7
graph LR
A[流量峰值 10000 QPS] --> B[消息队列]
B --> C[后端服务 2000 QPS]

style A fill:#ff6b6b
style B fill:#4ecdc4
style C fill:#ffe66d

秒杀场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 秒杀请求 - 削峰处理
@Service
public class SeckillService {

@Autowired
private RabbitTemplate rabbitTemplate;

public SeckillResult seckill(SeckillRequest request) {
// 1. 快速验证
if (!seckillValidator.validate(request)) {
return SeckillResult.failed("验证失败");
}

// 2. 写入消息队列(快速返回)
rabbitTemplate.convertAndSend("seckill.queue", request);

return SeckillResult.processing("排队中");
}
}

// 后台消费 - 匀速处理
@Component
public class SeckillConsumer {

@RabbitListener(queues = "seckill.queue")
public void consume(SeckillRequest request) {
// 以稳定速率处理(2000 QPS)
seckillProcessor.process(request);
}
}

二、主流消息队列对比

2.1 产品概览

特性 RabbitMQ Kafka RocketMQ ActiveMQ
开发语言 Erlang Scala/Java Java Java
发布年份 2007 2011 2012 2004
所属组织 VMware Apache Apache Apache
协议支持 AMQP 自定义 自定义 JMS
消息模型 队列 Topic Topic/队列 队列
持久化 内存/磁盘 磁盘 磁盘 磁盘
吞吐量 万级 十万级 十万级 万级
延迟 微秒级 毫秒级 毫秒级 毫秒级
可靠性
社区活跃度 极高

2.2 架构对比

RabbitMQ 架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
graph TB
A[Producer] --> B[Exchange]
B --> C[Queue1]
B --> D[Queue2]
B --> E[Queue3]
C --> F[Consumer1]
D --> G[Consumer2]
E --> H[Consumer3]

subgraph RabbitMQ Server
B
C
D
E
end

核心概念:

  • Producer:消息生产者
  • Exchange:消息交换机(Direct/Fanout/Topic/Headers)
  • Queue:消息队列
  • Consumer:消息消费者
  • Binding:Exchange 和 Queue 的绑定关系

Kafka 架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
graph TB
A[Producer] --> B[Kafka Cluster]
B --> C[Topic: Partition 0]
B --> D[Topic: Partition 1]
B --> E[Topic: Partition 2]
C --> F[Consumer Group 1]
D --> F
E --> F
C --> G[Consumer Group 2]
D --> G
E --> G

subgraph Kafka Cluster
C
D
E
end

核心概念:

  • Topic:消息主题
  • Partition:主题分区(物理存储单元)
  • Broker:Kafka 服务器节点
  • Consumer Group:消费者组
  • Offset:消息偏移量
  • Zookeeper:元数据管理

RocketMQ 架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph TB
A[Producer] --> B[NameServer]
A --> C[Broker A]
A --> D[Broker B]
C --> E[Consumer]
D --> E
B -.-> C
B -.-> D

subgraph RocketMQ Cluster
B
C
D
end

核心概念:

  • NameServer:无状态协调节点
  • Broker:消息存储和转发
  • Topic:消息主题
  • Queue:队列(分区)
  • Consumer Group:消费者组
  • Producer Group:生产者组

2.3 性能对比

吞吐量测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 测试环境
- 服务器:4 核 8G × 3
- 网络:千兆局域网
- 消息大小:1KB

# RabbitMQ
吞吐量:~20,000 msg/s
延迟:~1ms

# Kafka
吞吐量:~100,000 msg/s
延迟:~5ms

# RocketMQ
吞吐量:~80,000 msg/s
延迟:~3ms

可靠性对比

场景 RabbitMQ Kafka RocketMQ
消息持久化
事务消息
顺序消息 队列内有序 分区有序 分区有序
消息重试
死信队列
消息追踪

三、RabbitMQ 深度实践

3.1 安装与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Docker Compose 部署
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.11-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 30s
timeout: 10s
retries: 5

volumes:
rabbitmq_data:

3.2 交换机类型详解

1
2
3
4
5
6
7
8
9
10
graph TB
A[Exchange 类型] --> B[Direct]
A --> C[Fanout]
A --> D[Topic]
A --> E[Headers]

B --> B1[精确匹配 RoutingKey]
C --> C1[广播到所有队列]
D --> D1[通配符匹配]
E --> E1[Header 匹配]

Direct Exchange(直连交换机)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class DirectExchangeConfig {

@Bean
public DirectExchange directExchange() {
return new DirectExchange("order.direct");
}

@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-message-ttl", 60000) // 消息 TTL 60 秒
.build();
}

@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange directExchange) {
return BindingBuilder.bind(orderQueue)
.to(directExchange)
.with("order.created"); // Routing Key
}
}

// 生产者
rabbitTemplate.convertAndSend("order.direct", "order.created", message);

// 消费者 - 只接收 order.created 消息
@RabbitListener(queues = "order.queue")
public void handleOrderCreated(Message message) {
// 处理订单创建
}

Fanout Exchange(扇形交换机)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class FanoutExchangeConfig {

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("order.fanout");
}

@Bean
public Queue inventoryQueue() {
return QueueBuilder.durable("inventory.queue").build();
}

@Bean
public Queue logisticsQueue() {
return QueueBuilder.durable("logistics.queue").build();
}

@Bean
public Queue notificationQueue() {
return QueueBuilder.durable("notification.queue").build();
}

@Bean
public Binding inventoryBinding(Queue inventoryQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(inventoryQueue).to(fanoutExchange);
}

@Bean
public Binding logisticsBinding(Queue logisticsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(logisticsQueue).to(fanoutExchange);
}

@Bean
public Binding notificationBinding(Queue notificationQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(notificationQueue).to(fanoutExchange);
}
}

// 生产者 - 消息广播到所有队列
rabbitTemplate.convertAndSend("order.fanout", "", message);

Topic Exchange(主题交换机)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class TopicExchangeConfig {

@Bean
public TopicExchange topicExchange() {
return new TopicExchange("order.topic");
}

@Bean
public Queue allOrdersQueue() {
return QueueBuilder.durable("order.all").build();
}

@Bean
public Queue createdOrdersQueue() {
return QueueBuilder.durable("order.created").build();
}

@Bean
public Queue paidOrdersQueue() {
return QueueBuilder.durable("order.paid").build();
}

@Bean
public Binding allBinding(Queue allOrdersQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(allOrdersQueue)
.to(topicExchange)
.with("order.*"); // 匹配 order.开头的所有消息
}

@Bean
public Binding createdBinding(Queue createdOrdersQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(createdOrdersQueue)
.to(topicExchange)
.with("order.created"); // 精确匹配
}

@Bean
public Binding paidBinding(Queue paidOrdersQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(paidOrdersQueue)
.to(topicExchange)
.with("order.paid.*"); // 匹配 order.paid.开头的消息
}
}

// 生产者
rabbitTemplate.convertAndSend("order.topic", "order.created.vip", message);
rabbitTemplate.convertAndSend("order.topic", "order.paid.normal", message);

3.3 消息可靠性保证

生产者确认(Publisher Confirm)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class RabbitConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin123");

// 开启生产者确认
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);

return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 确认回调
rabbitTemplate.setConfirmCallback((correlationData, acknowledged, cause) -> {
if (acknowledged) {
log.info("消息发送成功:{}", correlationData);
} else {
log.error("消息发送失败:{}", cause);
// 记录失败,后续重试
}
});

// 返回回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("消息被退回:{}", returnedMessage.getMessage());
// 处理退回消息
});

return rabbitTemplate;
}
}

消费者手动 ACK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
public class ConsumerConfig {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

// 手动 ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 并发消费者数量
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(10);

// 预取数量
factory.setPrefetchCount(10);

return factory;
}
}

// 消费者 - 手动 ACK
@Component
public class OrderConsumer {

@RabbitListener(queues = "order.queue")
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
// 处理消息
OrderCreatedEvent event = parseEvent(message);
orderService.process(event);

// 手动确认
channel.basicAck(deliveryTag, false);

} catch (Exception e) {
log.error("处理消息失败", e);

// 判断是否重试
Integer retryCount = getRetryCount(message);
if (retryCount < 3) {
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
} else {
// 发送到死信队列
channel.basicNack(deliveryTag, false, false);
sendToDeadLetterQueue(message);
}
}
}
}

死信队列(DLQ)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class DeadLetterConfig {

@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue").build();
}

@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}

@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue)
.to(deadLetterExchange)
.with("dead.letter");
}

@Bean
public Queue orderQueueWithDLQ() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter")
.withArgument("x-message-ttl", 60000)
.build();
}
}

3.4 延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Configuration
public class DelayQueueConfig {

// 延迟消息 TTL 队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.withArgument("x-message-ttl", 60000) // 60 秒
.withArgument("x-dead-letter-exchange", "order.exchange")
.withArgument("x-dead-letter-routing-key", "order.timeout")
.build();
}

@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay.exchange");
}

@Bean
public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue)
.to(delayExchange)
.with("order.delay");
}

// 超时订单队列(死信队列)
@Bean
public Queue timeoutOrderQueue() {
return QueueBuilder.durable("order.timeout.queue").build();
}

@Bean
public Binding timeoutBinding(Queue timeoutOrderQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(timeoutOrderQueue)
.to(delayExchange)
.with("order.timeout");
}
}

// 发送延迟消息
public void sendDelayOrder(Order order) {
rabbitTemplate.convertAndSend(
"delay.exchange",
"order.delay",
order,
message -> {
// 设置延迟时间(毫秒)
message.getMessageProperties().setDelay(30 * 60 * 1000); // 30 分钟
return message;
}
);
}

// 消费超时订单
@RabbitListener(queues = "order.timeout.queue")
public void handleTimeoutOrder(Order order) {
orderService.cancelOrder(order.getId());
}

四、Kafka 深度实践

4.1 安装与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Docker Compose 部署
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

4.2 核心概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph TB
A[Topic] --> B[Partition 0]
A --> C[Partition 1]
A --> D[Partition 2]

B --> E[Offset 0]
B --> F[Offset 1]
B --> G[Offset 2]

H[Consumer Group] --> I[Consumer 1]
H --> J[Consumer 2]

I --> B
J --> C

4.3 生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();

// 基础配置
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 可靠性配置
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性

// 性能配置
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小 16KB
config.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间 10ms
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());

// 发送回调
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> record,
RecordMetadata metadata) {
log.info("发送成功:topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
}

@Override
public void onError(ProducerRecord<String, String> record,
RecordMetadata metadata, Exception exception) {
log.error("发送失败", exception);
}
});

return template;
}
}

4.4 消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();

// 基础配置
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 消费者组
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");

// Offset 提交
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 从最新开始

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

// 并发消费者
factory.setConcurrency(3);

return factory;
}
}

// 消费者 - 手动提交 Offset
@Component
public class OrderConsumer {

@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
log.info("收到消息:key={}, value={}", record.key(), record.value());

// 处理消息
orderService.process(record.value());

// 手动提交 Offset
ack.acknowledge();

} catch (Exception e) {
log.error("处理消息失败", e);
// 不提交 Offset,下次重新消费
throw e;
}
}
}

4.5 顺序消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 生产者 - 保证分区有序
public void sendOrderMessage(Order order) {
// 使用订单 ID 作为 key,保证同一订单的消息发送到同一分区
String key = String.valueOf(order.getId());
kafkaTemplate.send("order-topic", key, order.toJson());
}

// 消费者 - 单线程消费保证顺序
@Component
public class OrderConsumer {

@KafkaListener(
topics = "order-topic",
groupId = "order-consumer-group",
containerFactory = "singleThreadKafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, String> record) {
// 单线程处理,保证分区内顺序
orderService.process(record.value());
}
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> singleThreadKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1); // 单线程
return factory;
}

4.6 精确一次语义(Exactly-Once)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class KafkaExactlyOnceConfig {

@Bean
public ProducerFactory<String, String> exactlyOnceProducerFactory() {
Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1"); // 事务 ID
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
config.put(ProducerConfig.ACKS_CONFIG, "all");

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
return new KafkaTransactionManager<>(exactlyOnceProducerFactory());
}
}

// 事务消息
@Service
public class OrderService {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Transactional // 开启事务
public void createOrder(Order order) {
// 1. 保存订单到数据库
orderRepository.save(order);

// 2. 发送消息(同一事务)
kafkaTemplate.executeInTransaction(template -> {
template.send("order-topic", order.toJson());
template.send("inventory-topic", order.getItems().toJson());
return null;
});
}
}

五、RocketMQ 深度实践

5.1 安装与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Docker Compose 部署
version: '3.8'
services:
namesrv:
image: apache/rocketmq:4.9.4
container_name: rmqnamesrv
ports:
- "9876:9876"
command: sh mqnamesrv
environment:
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m"

broker:
image: apache/rocketmq:4.9.4
container_name: rmqbroker
depends_on:
- namesrv
ports:
- "10911:10911"
- "10909:10909"
command: sh mqbroker -n namesrv:9876
environment:
JAVA_OPT_EXT: "-server -Xms256m -Xmx256m"
BROKER_CONF: |
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
autoCreateTopicEnable=true

5.2 生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Configuration
public class RocketMQConfig {

@Value("${rocketmq.name-server}")
private String nameServer;

@Bean
public DefaultMQProducer producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("order-producer-group");
producer.setNamesrvAddr(nameServer);
producer.setInstanceName(UUID.randomUUID().toString());

// 重试配置
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);

// 超时配置
producer.setSendMsgTimeout(3000);

producer.start();
return producer;
}
}

// 发送消息
@Service
public class OrderMessageService {

@Autowired
private DefaultMQProducer producer;

// 同步发送
public void sendSync(Order order) throws Exception {
Message msg = new Message(
"order-topic",
"order-created",
order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET)
);

SendResult result = producer.send(msg);
log.info("发送成功:msgId={}", result.getMsgId());
}

// 异步发送
public void sendAsync(Order order) throws Exception {
Message msg = new Message(
"order-topic",
"order-created",
order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET)
);

producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:msgId={}", sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
}

// 单向发送(不关心结果)
public void sendOneway(Order order) throws Exception {
Message msg = new Message(
"order-topic",
"order-created",
order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET)
);

producer.sendOneway(msg);
}
}

5.3 消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class RocketMQConsumerConfig {

@Value("${rocketmq.name-server}")
private String nameServer;

@Bean
public DefaultMQPushConsumer consumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.setNamesrvAddr(nameServer);
consumer.setInstanceName(UUID.randomUUID().toString());

// 消费点位
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 并发配置
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);

// 批量消费
consumer.setConsumeMessageBatchMaxSize(1);

// 重试配置
consumer.setMaxReconsumeTimes(3);

consumer.subscribe("order-topic", "*");

// 消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("收到消息:msgId={}, body={}", msg.getMsgId(), body);

// 处理消息
orderService.process(body);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) {
log.error("处理消息失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
return consumer;
}
}

5.4 事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 事务消息监听器
@Component
public class OrderTransactionListener implements TransactionListener {

@Autowired
private OrderService orderService;

@Autowired
private LocalTransactionTable transactionTable;

// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = parseOrder(msg);

// 1. 执行本地事务
orderService.createOrder(order);

// 2. 记录事务状态
transactionTable.record(msg.getMsgId(), TransactionStatus.COMMITTED);

return LocalTransactionState.COMMIT_MESSAGE;

} catch (Exception e) {
log.error("执行本地事务失败", e);
transactionTable.record(msg.getMsgId(), TransactionStatus.ROLLBACK);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

// 事务状态回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String msgId = msg.getMsgId();
TransactionStatus status = transactionTable.getStatus(msgId);

log.info("事务回查:msgId={}, status={}", msgId, status);

switch (status) {
case COMMITTED:
return LocalTransactionState.COMMIT_MESSAGE;
case ROLLBACK:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
}
}

// 发送事务消息
@Service
public class OrderTransactionService {

@Autowired
private TransactionMQProducer producer;

public void sendTransactionMessage(Order order) throws Exception {
Message msg = new Message(
"order-topic",
"order-created",
order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET)
);

// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(msg, order);

log.info("事务消息发送:msgId={}, state={}",
result.getMsgId(), result.getLocalTransactionState());
}
}

六、选型指南

6.1 选型决策树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
graph TD
A[开始选型] --> B{需要高吞吐?}
B -->|是 | C{需要顺序消息?}
B -->|否 | D{需要复杂路由?}

C -->|是 | E[Kafka]
C -->|否 | F{需要事务消息?}

D -->|是 | G[RabbitMQ]
D -->|否 | H{Java 技术栈?}

F -->|是 | I[RocketMQ]
F -->|否 | E

H -->|是 | I
H -->|否 | G

6.2 场景推荐

场景 推荐方案 理由
日志收集 Kafka 高吞吐、顺序保证
实时计算 Kafka 流处理生态完善
订单处理 RocketMQ 事务消息、可靠性高
通知系统 RabbitMQ 路由灵活、延迟低
金融交易 RocketMQ 事务消息、精确一次
物联网 Kafka 海量数据、高吞吐
微服务解耦 RabbitMQ 协议标准、易于集成

6.3 混合使用策略

1
2
3
4
5
6
7
8
9
10
11
graph TB
A[业务系统] --> B[RabbitMQ]
A --> C[Kafka]

B --> D[订单处理]
B --> E[通知系统]
B --> F[任务调度]

C --> G[日志收集]
C --> H[用户行为]
C --> I[实时监控]

七、总结

消息队列选型的关键要点:

  1. 明确需求:吞吐量、延迟、可靠性、顺序性
  2. 技术匹配:团队技术栈、运维能力
  3. 场景优先:不同场景选择不同方案
  4. 混合使用:多种 MQ 配合使用

没有最好的消息队列,只有最适合的。记住:合适的架构才是最好的架构


参考资料: