消息队列选型与应用:RabbitMQ、Kafka、RocketMQ 全面对比
本文深入分析主流消息队列的技术特点、适用场景和选型策略,结合实际案例讲解消息队列在解耦、异步、削峰等场景的最佳实践。
一、为什么需要消息队列? 1.1 核心应用场景 1 2 3 4 5 6 7 8 9 10 11 12 graph TB A[消息队列核心价值] --> B[解耦] A --> C[异步] A --> D[削峰] A --> E[顺序保证] A --> F[事务消息] B --> B1[系统间松耦合] C --> C1[提升响应速度] D --> D1[平滑流量峰值] E --> E1[保证业务顺序] F --> F1[分布式事务]
1.2 场景详解 场景 1:系统解耦 问题 :订单系统直接调用库存、物流、通知系统,耦合严重。
1 2 3 4 5 6 7 8 9 10 11 12 13 graph TB subgraph 改造前 A[订单系统] --> B[库存系统] A --> C[物流系统] A --> D[通知系统] end subgraph 改造后 E[订单系统] --> F[消息队列] F --> G[库存系统] F --> H[物流系统] F --> I[通知系统] end
改造前代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Service public class OrderService { @Autowired private InventoryService inventoryService; @Autowired private LogisticsService logisticsService; @Autowired private NotificationService notificationService; public Order createOrder (CreateOrderRequest request) { Order order = orderRepository.save(request.toOrder()); inventoryService.deduct(order.getItems()); logisticsService.createDelivery(order); notificationService.sendOrderCreated(order); return order; } }
改造后代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public Order createOrder (CreateOrderRequest request) { Order order = orderRepository.save(request.toOrder()); OrderCreatedEvent event = new OrderCreatedEvent (order); rabbitTemplate.convertAndSend("order.events" , event); return order; } } @Component public class InventoryListener { @RabbitListener(queues = "inventory.queue") public void handleOrderCreated (OrderCreatedEvent event) { inventoryService.deduct(event.getItems()); } } @Component public class LogisticsListener { @RabbitListener(queues = "logistics.queue") public void handleOrderCreated (OrderCreatedEvent event) { logisticsService.createDelivery(event.getOrder()); } }
收益:
订单系统响应时间从 500ms 降至 50ms
库存/物流系统故障不影响订单创建
新增服务(如数据分析)无需修改订单系统
场景 2:异步处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public User register (RegisterRequest request) { User user = userRepository.save(request.toUser()); emailService.sendWelcomeEmail(user); smsService.sendVerifySms(user); analyticsService.trackUserRegister(user); couponService.grantWelcomeCoupon(user); return user; } public User register (RegisterRequest request) { User user = userRepository.save(request.toUser()); eventPublisher.publish(new UserRegisteredEvent (user)); return user; }
场景 3:削峰填谷 1 2 3 4 5 6 7 graph LR A[流量峰值 10000 QPS] --> B[消息队列] B --> C[后端服务 2000 QPS] style A fill:#ff6b6b style B fill:#4ecdc4 style C fill:#ffe66d
秒杀场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Service public class SeckillService { @Autowired private RabbitTemplate rabbitTemplate; public SeckillResult seckill (SeckillRequest request) { if (!seckillValidator.validate(request)) { return SeckillResult.failed("验证失败" ); } rabbitTemplate.convertAndSend("seckill.queue" , request); return SeckillResult.processing("排队中" ); } } @Component public class SeckillConsumer { @RabbitListener(queues = "seckill.queue") public void consume (SeckillRequest request) { seckillProcessor.process(request); } }
二、主流消息队列对比 2.1 产品概览
特性
RabbitMQ
Kafka
RocketMQ
ActiveMQ
开发语言
Erlang
Scala/Java
Java
Java
发布年份
2007
2011
2012
2004
所属组织
VMware
Apache
Apache
Apache
协议支持
AMQP
自定义
自定义
JMS
消息模型
队列
Topic
Topic/队列
队列
持久化
内存/磁盘
磁盘
磁盘
磁盘
吞吐量
万级
十万级
十万级
万级
延迟
微秒级
毫秒级
毫秒级
毫秒级
可靠性
高
中
高
高
社区活跃度
高
极高
高
中
2.2 架构对比 RabbitMQ 架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 graph TB A[Producer] --> B[Exchange] B --> C[Queue1] B --> D[Queue2] B --> E[Queue3] C --> F[Consumer1] D --> G[Consumer2] E --> H[Consumer3] subgraph RabbitMQ Server B C D E end
核心概念:
Producer :消息生产者
Exchange :消息交换机(Direct/Fanout/Topic/Headers)
Queue :消息队列
Consumer :消息消费者
Binding :Exchange 和 Queue 的绑定关系
Kafka 架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 graph TB A[Producer] --> B[Kafka Cluster] B --> C[Topic: Partition 0] B --> D[Topic: Partition 1] B --> E[Topic: Partition 2] C --> F[Consumer Group 1] D --> F E --> F C --> G[Consumer Group 2] D --> G E --> G subgraph Kafka Cluster C D E end
核心概念:
Topic :消息主题
Partition :主题分区(物理存储单元)
Broker :Kafka 服务器节点
Consumer Group :消费者组
Offset :消息偏移量
Zookeeper :元数据管理
RocketMQ 架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 graph TB A[Producer] --> B[NameServer] A --> C[Broker A] A --> D[Broker B] C --> E[Consumer] D --> E B -.-> C B -.-> D subgraph RocketMQ Cluster B C D end
核心概念:
NameServer :无状态协调节点
Broker :消息存储和转发
Topic :消息主题
Queue :队列(分区)
Consumer Group :消费者组
Producer Group :生产者组
2.3 性能对比 吞吐量测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - 服务器:4 核 8G × 3 - 网络:千兆局域网 - 消息大小:1KB 吞吐量:~20,000 msg/s 延迟:~1ms 吞吐量:~100,000 msg/s 延迟:~5ms 吞吐量:~80,000 msg/s 延迟:~3ms
可靠性对比
场景
RabbitMQ
Kafka
RocketMQ
消息持久化
✓
✓
✓
事务消息
✗
✓
✓
顺序消息
队列内有序
分区有序
分区有序
消息重试
✓
✗
✓
死信队列
✓
✓
✓
消息追踪
✗
✗
✓
三、RabbitMQ 深度实践 3.1 安装与配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 version: '3.8' services: rabbitmq: image: rabbitmq:3.11-management container_name: rabbitmq ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin123 volumes: - rabbitmq_data:/var/lib/rabbitmq healthcheck: test: ["CMD" , "rabbitmq-diagnostics" , "-q" , "ping" ] interval: 30s timeout: 10s retries: 5 volumes: rabbitmq_data:
3.2 交换机类型详解 1 2 3 4 5 6 7 8 9 10 graph TB A[Exchange 类型] --> B[Direct] A --> C[Fanout] A --> D[Topic] A --> E[Headers] B --> B1[精确匹配 RoutingKey] C --> C1[广播到所有队列] D --> D1[通配符匹配] E --> E1[Header 匹配]
Direct Exchange(直连交换机) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Configuration public class DirectExchangeConfig { @Bean public DirectExchange directExchange () { return new DirectExchange ("order.direct" ); } @Bean public Queue orderQueue () { return QueueBuilder.durable("order.queue" ) .withArgument("x-message-ttl" , 60000 ) .build(); } @Bean public Binding orderBinding (Queue orderQueue, DirectExchange directExchange) { return BindingBuilder.bind(orderQueue) .to(directExchange) .with("order.created" ); } } rabbitTemplate.convertAndSend("order.direct" , "order.created" , message); @RabbitListener(queues = "order.queue") public void handleOrderCreated (Message message) { }
Fanout Exchange(扇形交换机) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration public class FanoutExchangeConfig { @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange ("order.fanout" ); } @Bean public Queue inventoryQueue () { return QueueBuilder.durable("inventory.queue" ).build(); } @Bean public Queue logisticsQueue () { return QueueBuilder.durable("logistics.queue" ).build(); } @Bean public Queue notificationQueue () { return QueueBuilder.durable("notification.queue" ).build(); } @Bean public Binding inventoryBinding (Queue inventoryQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(inventoryQueue).to(fanoutExchange); } @Bean public Binding logisticsBinding (Queue logisticsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logisticsQueue).to(fanoutExchange); } @Bean public Binding notificationBinding (Queue notificationQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(notificationQueue).to(fanoutExchange); } } rabbitTemplate.convertAndSend("order.fanout" , "" , message);
Topic Exchange(主题交换机) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Configuration public class TopicExchangeConfig { @Bean public TopicExchange topicExchange () { return new TopicExchange ("order.topic" ); } @Bean public Queue allOrdersQueue () { return QueueBuilder.durable("order.all" ).build(); } @Bean public Queue createdOrdersQueue () { return QueueBuilder.durable("order.created" ).build(); } @Bean public Queue paidOrdersQueue () { return QueueBuilder.durable("order.paid" ).build(); } @Bean public Binding allBinding (Queue allOrdersQueue, TopicExchange topicExchange) { return BindingBuilder.bind(allOrdersQueue) .to(topicExchange) .with("order.*" ); } @Bean public Binding createdBinding (Queue createdOrdersQueue, TopicExchange topicExchange) { return BindingBuilder.bind(createdOrdersQueue) .to(topicExchange) .with("order.created" ); } @Bean public Binding paidBinding (Queue paidOrdersQueue, TopicExchange topicExchange) { return BindingBuilder.bind(paidOrdersQueue) .to(topicExchange) .with("order.paid.*" ); } } rabbitTemplate.convertAndSend("order.topic" , "order.created.vip" , message); rabbitTemplate.convertAndSend("order.topic" , "order.paid.normal" , message);
3.3 消息可靠性保证 生产者确认(Publisher Confirm) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory factory = new CachingConnectionFactory (); factory.setHost("localhost" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin123" ); factory.setPublisherConfirms(true ); factory.setPublisherReturns(true ); return factory; } @Bean public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory); rabbitTemplate.setConfirmCallback((correlationData, acknowledged, cause) -> { if (acknowledged) { log.info("消息发送成功:{}" , correlationData); } else { log.error("消息发送失败:{}" , cause); } }); rabbitTemplate.setReturnsCallback(returnedMessage -> { log.error("消息被退回:{}" , returnedMessage.getMessage()); }); return rabbitTemplate; } }
消费者手动 ACK 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Configuration public class ConsumerConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory ( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory (); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConcurrentConsumers(5 ); factory.setMaxConcurrentConsumers(10 ); factory.setPrefetchCount(10 ); return factory; } } @Component public class OrderConsumer { @RabbitListener(queues = "order.queue") public void handleMessage (Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { OrderCreatedEvent event = parseEvent(message); orderService.process(event); channel.basicAck(deliveryTag, false ); } catch (Exception e) { log.error("处理消息失败" , e); Integer retryCount = getRetryCount(message); if (retryCount < 3 ) { channel.basicNack(deliveryTag, false , true ); } else { channel.basicNack(deliveryTag, false , false ); sendToDeadLetterQueue(message); } } } }
死信队列(DLQ) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Configuration public class DeadLetterConfig { @Bean public Queue deadLetterQueue () { return QueueBuilder.durable("dead.letter.queue" ).build(); } @Bean public DirectExchange deadLetterExchange () { return new DirectExchange ("dead.letter.exchange" ); } @Bean public Binding deadLetterBinding (Queue deadLetterQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue) .to(deadLetterExchange) .with("dead.letter" ); } @Bean public Queue orderQueueWithDLQ () { return QueueBuilder.durable("order.queue" ) .withArgument("x-dead-letter-exchange" , "dead.letter.exchange" ) .withArgument("x-dead-letter-routing-key" , "dead.letter" ) .withArgument("x-message-ttl" , 60000 ) .build(); } }
3.4 延迟队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Configuration public class DelayQueueConfig { @Bean public Queue delayQueue () { return QueueBuilder.durable("delay.queue" ) .withArgument("x-message-ttl" , 60000 ) .withArgument("x-dead-letter-exchange" , "order.exchange" ) .withArgument("x-dead-letter-routing-key" , "order.timeout" ) .build(); } @Bean public DirectExchange delayExchange () { return new DirectExchange ("delay.exchange" ); } @Bean public Binding delayBinding (Queue delayQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayQueue) .to(delayExchange) .with("order.delay" ); } @Bean public Queue timeoutOrderQueue () { return QueueBuilder.durable("order.timeout.queue" ).build(); } @Bean public Binding timeoutBinding (Queue timeoutOrderQueue, DirectExchange delayExchange) { return BindingBuilder.bind(timeoutOrderQueue) .to(delayExchange) .with("order.timeout" ); } } public void sendDelayOrder (Order order) { rabbitTemplate.convertAndSend( "delay.exchange" , "order.delay" , order, message -> { message.getMessageProperties().setDelay(30 * 60 * 1000 ); return message; } ); } @RabbitListener(queues = "order.timeout.queue") public void handleTimeoutOrder (Order order) { orderService.cancelOrder(order.getId()); }
四、Kafka 深度实践 4.1 安装与配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:7.4.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
4.2 核心概念 1 2 3 4 5 6 7 8 9 10 11 12 13 14 graph TB A[Topic] --> B[Partition 0] A --> C[Partition 1] A --> D[Partition 2] B --> E[Offset 0] B --> F[Offset 1] B --> G[Offset 2] H[Consumer Group] --> I[Consumer 1] H --> J[Consumer 2] I --> B J --> C
4.3 生产者配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory () { Map<String, Object> config = new HashMap <>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.ACKS_CONFIG, "all" ); config.put(ProducerConfig.RETRIES_CONFIG, 3 ); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true ); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); config.put(ProducerConfig.LINGER_MS_CONFIG, 10 ); config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy" ); return new DefaultKafkaProducerFactory <>(config); } @Bean public KafkaTemplate<String, String> kafkaTemplate () { KafkaTemplate<String, String> template = new KafkaTemplate <>(producerFactory()); template.setProducerListener(new ProducerListener <String, String>() { @Override public void onSuccess (ProducerRecord<String, String> record, RecordMetadata metadata) { log.info("发送成功:topic={}, partition={}, offset={}" , metadata.topic(), metadata.partition(), metadata.offset()); } @Override public void onError (ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) { log.error("发送失败" , exception); } }); return template; } }
4.4 消费者配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory () { Map<String, Object> config = new HashMap <>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group" ); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" ); return new DefaultKafkaConsumerFactory <>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory () { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory <>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3 ); return factory; } } @Component public class OrderConsumer { @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consume (ConsumerRecord<String, String> record, Acknowledgment ack) { try { log.info("收到消息:key={}, value={}" , record.key(), record.value()); orderService.process(record.value()); ack.acknowledge(); } catch (Exception e) { log.error("处理消息失败" , e); throw e; } } }
4.5 顺序消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void sendOrderMessage (Order order) { String key = String.valueOf(order.getId()); kafkaTemplate.send("order-topic" , key, order.toJson()); } @Component public class OrderConsumer { @KafkaListener( topics = "order-topic", groupId = "order-consumer-group", containerFactory = "singleThreadKafkaListenerContainerFactory" ) public void consume (ConsumerRecord<String, String> record) { orderService.process(record.value()); } } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> singleThreadKafkaListenerContainerFactory () { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory <>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1 ); return factory; }
4.6 精确一次语义(Exactly-Once) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration public class KafkaExactlyOnceConfig { @Bean public ProducerFactory<String, String> exactlyOnceProducerFactory () { Map<String, Object> config = new HashMap <>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1" ); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true ); config.put(ProducerConfig.ACKS_CONFIG, "all" ); return new DefaultKafkaProducerFactory <>(config); } @Bean public KafkaTransactionManager<String, String> kafkaTransactionManager () { return new KafkaTransactionManager <>(exactlyOnceProducerFactory()); } } @Service public class OrderService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Transactional public void createOrder (Order order) { orderRepository.save(order); kafkaTemplate.executeInTransaction(template -> { template.send("order-topic" , order.toJson()); template.send("inventory-topic" , order.getItems().toJson()); return null ; }); } }
五、RocketMQ 深度实践 5.1 安装与配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 version: '3.8' services: namesrv: image: apache/rocketmq:4.9.4 container_name: rmqnamesrv ports: - "9876:9876" command: sh mqnamesrv environment: JAVA_OPT_EXT: "-server -Xms128m -Xmx128m" broker: image: apache/rocketmq:4.9.4 container_name: rmqbroker depends_on: - namesrv ports: - "10911:10911" - "10909:10909" command: sh mqbroker -n namesrv:9876 environment: JAVA_OPT_EXT: "-server -Xms256m -Xmx256m" BROKER_CONF: | brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 autoCreateTopicEnable=true
5.2 生产者配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Configuration public class RocketMQConfig { @Value("${rocketmq.name-server}") private String nameServer; @Bean public DefaultMQProducer producer () throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer ("order-producer-group" ); producer.setNamesrvAddr(nameServer); producer.setInstanceName(UUID.randomUUID().toString()); producer.setRetryTimesWhenSendFailed(3 ); producer.setRetryTimesWhenSendAsyncFailed(3 ); producer.setSendMsgTimeout(3000 ); producer.start(); return producer; } } @Service public class OrderMessageService { @Autowired private DefaultMQProducer producer; public void sendSync (Order order) throws Exception { Message msg = new Message ( "order-topic" , "order-created" , order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult result = producer.send(msg); log.info("发送成功:msgId={}" , result.getMsgId()); } public void sendAsync (Order order) throws Exception { Message msg = new Message ( "order-topic" , "order-created" , order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.send(msg, new SendCallback () { @Override public void onSuccess (SendResult sendResult) { log.info("发送成功:msgId={}" , sendResult.getMsgId()); } @Override public void onException (Throwable e) { log.error("发送失败" , e); } }); } public void sendOneway (Order order) throws Exception { Message msg = new Message ( "order-topic" , "order-created" , order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.sendOneway(msg); } }
5.3 消费者配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Configuration public class RocketMQConsumerConfig { @Value("${rocketmq.name-server}") private String nameServer; @Bean public DefaultMQPushConsumer consumer () throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("order-consumer-group" ); consumer.setNamesrvAddr(nameServer); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeThreadMin(10 ); consumer.setConsumeThreadMax(20 ); consumer.setConsumeMessageBatchMaxSize(1 ); consumer.setMaxReconsumeTimes(3 ); consumer.subscribe("order-topic" , "*" ); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { try { String body = new String (msg.getBody(), RemotingHelper.DEFAULT_CHARSET); log.info("收到消息:msgId={}, body={}" , msg.getMsgId(), body); orderService.process(body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("处理消息失败" , e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); return consumer; } }
5.4 事务消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Component public class OrderTransactionListener implements TransactionListener { @Autowired private OrderService orderService; @Autowired private LocalTransactionTable transactionTable; @Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) { try { Order order = parseOrder(msg); orderService.createOrder(order); transactionTable.record(msg.getMsgId(), TransactionStatus.COMMITTED); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("执行本地事务失败" , e); transactionTable.record(msg.getMsgId(), TransactionStatus.ROLLBACK); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction (MessageExt msg) { String msgId = msg.getMsgId(); TransactionStatus status = transactionTable.getStatus(msgId); log.info("事务回查:msgId={}, status={}" , msgId, status); switch (status) { case COMMITTED: return LocalTransactionState.COMMIT_MESSAGE; case ROLLBACK: return LocalTransactionState.ROLLBACK_MESSAGE; default : return LocalTransactionState.UNKNOW; } } } @Service public class OrderTransactionService { @Autowired private TransactionMQProducer producer; public void sendTransactionMessage (Order order) throws Exception { Message msg = new Message ( "order-topic" , "order-created" , order.toJson().getBytes(RemotingHelper.DEFAULT_CHARSET) ); TransactionSendResult result = producer.sendMessageInTransaction(msg, order); log.info("事务消息发送:msgId={}, state={}" , result.getMsgId(), result.getLocalTransactionState()); } }
六、选型指南 6.1 选型决策树 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 graph TD A[开始选型] --> B{需要高吞吐?} B -->|是 | C{需要顺序消息?} B -->|否 | D{需要复杂路由?} C -->|是 | E[Kafka] C -->|否 | F{需要事务消息?} D -->|是 | G[RabbitMQ] D -->|否 | H{Java 技术栈?} F -->|是 | I[RocketMQ] F -->|否 | E H -->|是 | I H -->|否 | G
6.2 场景推荐
场景
推荐方案
理由
日志收集
Kafka
高吞吐、顺序保证
实时计算
Kafka
流处理生态完善
订单处理
RocketMQ
事务消息、可靠性高
通知系统
RabbitMQ
路由灵活、延迟低
金融交易
RocketMQ
事务消息、精确一次
物联网
Kafka
海量数据、高吞吐
微服务解耦
RabbitMQ
协议标准、易于集成
6.3 混合使用策略 1 2 3 4 5 6 7 8 9 10 11 graph TB A[业务系统] --> B[RabbitMQ] A --> C[Kafka] B --> D[订单处理] B --> E[通知系统] B --> F[任务调度] C --> G[日志收集] C --> H[用户行为] C --> I[实时监控]
七、总结 消息队列选型的关键要点:
明确需求 :吞吐量、延迟、可靠性、顺序性
技术匹配 :团队技术栈、运维能力
场景优先 :不同场景选择不同方案
混合使用 :多种 MQ 配合使用
没有最好的消息队列,只有最适合的。记住:合适的架构才是最好的架构 。
参考资料:
架构师点评:消息队列选型要从业务一致性开始 RabbitMQ、Kafka、RocketMQ 的差异不只是吞吐量和功能列表。真正的选型起点应该是业务对一致性、顺序性、延迟、可追溯和运维复杂度的要求。很多团队把 MQ 当成“削峰填谷工具”,上线后才发现消息重复、顺序错乱、消费堆积、补偿困难、死信无人处理,最后异步链路反而成为稳定性黑洞。
从架构角度看,消息队列引入的是一种系统耦合方式。同步调用的耦合显性,异步消息的耦合隐性。消息 Topic、事件模型、消费语义、重试策略和补偿机制如果没有治理,就会出现“谁都能发、谁都能消费、出了问题没人负责”的局面。
企业落地建议:建立事件驱动治理规范 建议在引入 MQ 时同步建设以下规范:
事件模型标准化 :明确事件命名、Schema、版本、业务主键、traceId 和幂等键。
消费幂等设计 :默认假设消息会重复投递,消费者必须具备幂等能力。
重试与死信策略 :区分瞬时失败和业务失败,避免无限重试拖垮下游。
顺序与事务边界 :只在真正需要时使用顺序消息,不要为局部顺序牺牲整体吞吐和可用性。
可观测性 :监控生产速率、消费延迟、积压量、失败率、死信量和端到端链路延迟。
在多团队环境中,Topic 应像 API 一样被登记和评审。生产者、消费者、消息保留时间、Schema 变更、下线计划,都需要进入资产管理。
AI Agent 与异步任务编排 AI Agent 平台天然会遇到长任务、异步回调、多步骤编排和失败恢复问题。消息队列可以作为 Agent 任务调度和事件驱动的基础设施,但必须控制边界:
Agent 任务消息要包含任务 ID、操作者、权限上下文和幂等键;
长任务状态要可查询,不能只依赖一次性回调;
对外部系统写入要使用事务外盒或补偿机制;
对失败任务提供可重放、可暂停、可人工接管能力;
对 PR 审查官这类工作流,可以用 MQ 解耦 diff 解析、Agent 并行审查、仲裁汇总和评论回写。
这样消息队列就不只是中间件组件,而是多 Agent 协作平台的可靠任务骨架。
AI Agent / AI Coding 实战
想把架构、DevOps 与 AI Agent 结合起来提升研发效能?
面向企业研发团队,提供 AI Coding、DevOps 自动化、架构治理和多 Agent 协作落地咨询。
合作咨询:chartvip@hotmail.com