15. 消息驱动项目集成
1. 项目准备
我们知道Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream可以帮我们屏蔽底层消息中间件的差异,降低切换维护成本,统一消息的编程模型(声明和绑定频道),解决微服务系统中的一些问题。目前支持RabbitMQ 和 Kafka 。 了解了Stream消息驱动的原理和设计思想后,接下来本案例以RabbitMQ为例,模拟项目中如何使用Stream消息驱动。新建三个模块:
cloud-stream-provider:作为生产者进行发消息模块。 cloud-stream-consumer1:作为消息接收模块1 cloud-stream-consumer2:作为消息接收模块2
2. Stream消息驱动之生产者
2.1 pom 添加依赖
三个工程pom均添加相关依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
完整pom文件:
<parent>
<artifactId>springcloud-feign</artifactId>
<groupId>org.qytest.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>cloud-stream-provider</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<!-- devtools热部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
2.2 配置bootstrap.yml
cloud-stream-provider消息生产者 -》bootstrap.yml -》8001工程:
server:
port: 8001
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置需要绑定的rabbitMq的服务信息
defaultRabbit: ## 表示定义的名称,用于binding的整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings: ## 服务的整合处理
output: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input
destination: qyExchange # 表示要是用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置为text/plain
## 设置要绑定的消息服务的具体设置
binder: defaultRabbit
eureka:
client:
register-with-eureka: true #向注册中心注册自己
fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
instance-id: provider8001 #主机名称修改
prefer-ip-address: true #访问路径可以显示ip
注意:生产者配置的是output
2.3 启动类
三个工程启动类均正常开启最服务发现注册的支持即可,其他无特别:
@SpringBootApplication
@EnableEurekaClient
public class StreamProviderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamProviderApplication.class, args);
}
}
2.4 生产者业务类
cloud-stream-provider工程业务类处理,模拟发送消息的业务:
1)SendMessageController.java:
@RestController
public class SendMessageController {
@Resource
private IMessageProviderService messageProviderService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}
2)IMessageProviderService接口:
public interface IMessageProviderService {
String send() ;
}
3)MessageProviderServiceImpl接口实现类:
@Slf4j
@EnableBinding(Source.class) //定义消息的推送管道,即:源
//将@EnableBinding注释应用于应用程序的配置类之一。@EnableBinding注释本身使用@Configuration进行元注释
/*此处不再需要引入 spring 注解 @Service,这里的业务实现类是与RabbitMQ配合的,使用的 SpringCloud Stream 的注解*/
public class MessageProviderServiceImpl implements IMessageProviderService {
@Resource
//在Spring Cloud Stream 1.0中,唯一支持的可绑定组件是Spring消息传递MessageChannel及其扩展名SubscribableChannel和PollableChannel
private MessageChannel output; //消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
log.info("*****serial:" + serial);
return "RabbitMQ 消息发送方:" + serial;
}
}
3. Stream消息驱动之消费者
cloud-stream-consumer1消息接收模块1-》bootstrap.yml -》8002工程:
3.1 pom 添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.2 配置bootstrap.yml
server:
port: 8002
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的 rabbitmq 的服务信息
defaultRabbit: # 表示定义的名称,用于 binding 整合
type: rabbit # 消息组件类型
environment: # 设置 rabbitmq 的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input
destination: qyExchange # 表示要使用的 Exchange 名称定义
content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true #向注册中心注册自己
fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
instance-id: consumer8002 #主机名称修改
prefer-ip-address: true #访问路径可以显示ip
注意:消费者配置的是input
3.3 消费者业务类
建一个ReceiveMessageListenerController.java模拟消费消息即可:
/**
*增加订阅监听器
**/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value(value = "${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)//使用@StreamListener进行自动内容类型处理
//@StreamListener注释提供了一种更简单的处理入站邮件的模型,特别是在处理涉及内容类型管理和类型强制的用例时。
public void input(Message<String> message) {
System.out.println("消费者 1 号,----->接收到的消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
cloud-stream-consumer2消息接收模块2-》bootstrap.yml -》8003工程操作同cloud-stream-consumer1,不再赘述。
4. 测试
浏览器多次调用http://localhost:8001/sendMessage模拟生产者发送消息,观察后台日志及RabbitMQ查看实时速度:
测试结果,模拟生产及消费消息成功,但是通过观察可知,出现了重复消费的现象!
5. 如何解决消息的重复消费?消息丢失?
当集群方式进行消息消费时,就会存在消息的重复消费问题。通过分组
解决,只要是一个组的消费者,就处于竞争关系,一次只能有一个去消费,这就可以解决重复消费的问题了。而且分组(group)还解决了持久化的问题。
修改 8002(group1)、8003(group2) 的 yml 配置文件,添加group分组,再次进行测试,发现消息轮询被两个消费者消费:
未设置消息分组的微服务在服务宕机重启后并不会获取并消费消息,发生消息丢失故障。而设置了分组的,服务重启后会获取并消费新消息。
在实际的生产过程中,一定要配置消息分组(group),以免造成服务宕机造成的消息丢失的问题