15.1RocketMQ消息队列

分类: 其他Spring Cloud Alibaba组件

RocketMQ 消息队列

RocketMQ 是 Spring Cloud Alibaba 的消息队列组件。本节将学习 RocketMQ 消息队列。

本节将学习:RocketMQ 简介、安装部署、生产者实现、消费者实现,以及消息类型。

RocketMQ 简介

定义

RocketMQ 是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高实时、分布式特点。

核心特性

RocketMQ 核心特性:

  • 高性能
  • 高可用
  • 丰富的消息类型
  • 顺序消息
  • 事务消息

在商城项目中集成 RocketMQ

步骤1:本地搭建 RocketMQ

文件路径: mall-microservices/docker/rocketmq/docker-compose.yml

version: '3.8' services: rocketmq-nameserver: image: apache/rocketmq:5.1.4 container_name: rocketmq-nameserver ports: - "9876:9876" environment: - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: sh mqnamesrv networks: - mall-network rocketmq-broker: image: apache/rocketmq:5.1.4 container_name: rocketmq-broker ports: - "10909:10909" - "10911:10911" environment: - NAMESRV_ADDR=rocketmq-nameserver:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: sh mqbroker -n rocketmq-nameserver:9876 depends_on: - rocketmq-nameserver networks: - mall-network rocketmq-console: image: styletang/rocketmq-console-ng:latest container_name: rocketmq-console ports: - "8080:8080" environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq-nameserver:9876 depends_on: - rocketmq-nameserver networks: - mall-network networks: mall-network: driver: bridge

启动 RocketMQ:

cd mall-microservices/docker/rocketmq docker-compose up -d # 访问控制台:http://localhost:8080

安装部署

部署步骤

RocketMQ 部署步骤:

  1. 使用 Docker Compose 启动(推荐):

    cd mall-microservices/docker/rocketmq docker-compose up -d
  2. 验证部署

    # 查看容器状态 docker ps | grep rocketmq # 查看日志 docker logs rocketmq-nameserver docker logs rocketmq-broker

步骤2:在订单服务中实现消息生产者

添加 RocketMQ 依赖

文件路径: mall-microservices/order-service/pom.xml

<!-- RocketMQ --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId> </dependency>

配置 RocketMQ

文件路径: mall-microservices/order-service/src/main/resources/application.yml

spring: cloud: stream: rocketmq: binder: name-server: localhost:9876 bindings: order-output: producer: group: order-producer-group topic: order-topic

订单创建后发送消息

文件路径: mall-microservices/order-service/src/main/java/com/mall/orderservice/service/impl/OrderServiceImpl.java

package com.mall.orderservice.service.impl; import com.mall.orderservice.entity.Order; import com.mall.orderservice.mapper.OrderMapper; import com.mall.orderservice.service.OrderService; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.HashMap; import java.util.Map; @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService { @Autowired private RocketMQTemplate rocketMQTemplate; private static final String ORDER_TOPIC = "order-topic"; private static final String ORDER_CREATED_TAG = "order-created"; @Override @Transactional(rollbackFor = Exception.class) public Order createOrder(Order order) { // 1. 创建订单 order.setOrderNo(generateOrderNo()); order.setStatus(0); save(order); // 2. 发送订单创建消息 Map<String, Object> message = new HashMap<>(); message.put("orderId", order.getId()); message.put("orderNo", order.getOrderNo()); message.put("userId", order.getUserId()); message.put("totalAmount", order.getTotalAmount()); message.put("timestamp", System.currentTimeMillis()); rocketMQTemplate.convertAndSend( ORDER_TOPIC + ":" + ORDER_CREATED_TAG, message ); return order; } private String generateOrderNo() { return "ORD" + System.currentTimeMillis() + UUID.randomUUID().toString().substring(0, 8).toUpperCase(); } }

消费者实现

消费者代码

@RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer-group" ) @Component public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { // 处理消息 } }

消息类型

类型说明

消息类型:

  • 普通消息
  • 顺序消息
  • 事务消息
  • 延时消息

官方资源

根据 RocketMQ 官方文档GitHub 仓库,RocketMQ 的核心特性包括:

  1. 多种消息类型:RocketMQ 支持普通消息、顺序消息、事务消息、延时消息、批量消息等多种消息类型。官方文档详细说明了每种消息类型的使用场景和实现原理,帮助开发者根据业务需求选择合适的消息类型。

  2. 高可靠和高性能:RocketMQ 采用分布式架构,支持主从复制、多副本机制,保证消息不丢失。官方文档显示,RocketMQ 单机可以支持百万级消息吞吐,延迟低至毫秒级别,适合高并发、大流量的业务场景。

  3. 丰富的功能特性:RocketMQ 提供了消息过滤、消息轨迹、消息查询、定时消息、消息重试等丰富的功能。官方文档中详细介绍了这些功能的使用方法和最佳实践。

参考资源

本节小结

在本节中,我们学习了:

第一个是 RocketMQ 简介。 RocketMQ 是分布式消息中间件。

第二个是安装部署。 如何部署 RocketMQ。

第三个是生产者实现。 如何实现消息生产者。

第四个是消费者实现。 如何实现消息消费者。

第五个是消息类型。 RocketMQ 支持的消息类型。

这就是 RocketMQ 消息队列。使用 RocketMQ,可以实现异步消息处理。

在下一节,我们将学习 RocketMQ 事务消息。