13. Bus消息总线
1. SpringCloud Bus概述
SpringCloud Bus官网:SpringCloud Bus官网 有读者说我看不懂英文啊,没关系中文学习手册,小编也给准备好了:SpringCloud Bus中文手册
官网简介: Spring Cloud Bus将分布式系统的节点与轻量级消息代理链接。这可以用于广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,总线就像一个分布式执行器,用于扩展的Spring Boot应用程序,但也可以用作应用程序之间的通信通道。目前唯一的实现是使用AMQP代理作为传输,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上。
重点理解:
Spring Cloud Bus
是用来将分布式系统的节点与轻量级消息系统链接起来的框架。- 它整合了
Java的事件处理机制
和消息中间件
的功能。 - 能
管理和传播分布式系统间的消息
,就像一个分布式执行器,可用于广播状态更改
、事件推送
等,也可以当做微服务间的通信通道。 实现配置的动态刷新
:在学习SpringCloud Config时,我们讲述了加入依赖 actuator,通过POST请求进行配置手工刷新。而SpringCloud Bus 配合 SpringCloud Config 使用可以真正实现配置的动态刷新。Bus只支持两种消息代理
:RabbitMQ和Kafaka。
那么SpringCloud Bus+RabbitMQ+Config如何实现分布式配置中心消息广播呢?我们一起演示来看下。
2. 关于消息总线
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共同的消息主题。并让系统中所有的微服务实例都连接上来。由于该主题中产生的消息会被所有的微服务实例监听和消费,所以称它为消息总线
。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。
基本原理
(读到这里,默认MQ知识你已经是ok的):ConfigClient 实例都监听 MQ 中同一个 topic(默认是springCloudBus)。当服务刷新数据的时候,它会把这个消息放到主题中,这样其他监听同一个主题的服务就能得到通知,进而更新自身的配置。
Tip:关于MQ的基础知识、主题等还有需要补课的,可参看:RabbitMQ、RocketMQ 、Kafka区别
3. 项目搭建集成
3.1 RabbitMQ环境准备
以mac环境为例,小编默认你的mac已经安装了HomeBrew(软件包管理系统),没有安装的看过来:mac安装Homebrew
终端执行brew info rabbitmq
查看是否有MQ环境,如果显示Not installed没有安装过; 未安装,则执行brew install rabbitmq
安装一下; 安装完启动MQ: brew services start rabbitmq
; 启用图形化管理界面插件http://localhost:15672:rabbitmq-plugins enable rabbitmq_management
(默认用户名/密码:guest/guest)
顺便回顾下rabbitmq相关命令: brew启动brew services start rabbitmq; 重启brew services restart rabbitmq 前台运行rabbitmq-server 后台运行rabbitmq-server -detached 新建用户rabbitmqctl add_user 账号 密码 给用户分配操作权限rabbitmqctl set_user_tags 账号 administrator 修改密码rabbitmqctl change_password Username Newpassword 修改密码 删除用户rabbitmqctl delete_user Username 删除用户 查看所有用户rabbitmqctl list_users 查看用户清单 为用户设置访问权限rabbitmqctl set_permissions -p / 用户名 “.“ “.” “.“ rabbitmqctl set_permissions -p / root “.” “.“ “.”
3.2 动态刷新刷新全局广播
设计思想
:利用消息总线触发一个服务端ConfigServer的 /bus/refresh 端点,而刷新所有客户端的配置(全局广播)。
1)修改pom文件,添加依赖 首先配置中心服务端模块、客户端模块相关工程pom均添加消息总线支持,以我们的rabbitmq环境为例添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!--actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
如果是kafka则添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<!--actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
2)修改 yml 配置文件,添加 RabbitMQ 配置,并暴露监控断点
config客户端bootstrap.yml添加:
## rabbitmq的相关配置 rabbitmq的相关配置 15672是 Web 管理界面的端口;5672是MQ访问的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
config服务端application.yml添加:
## rabbitmq的相关配置 15672是 Web 管理界面的端口;5672是MQ访问的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
## rabbitmq相关配置,暴露bus刷新配置的端点
management:
endpoints:
web:
exposure:
include: "bus-refresh"
3)测试
首先启动注册中心、Config服务端、Config客户端1、Config客户端2,然后修改远程仓库github内容进行更新测试:
刷新config服务端(http://localhost:1234/main/config-dev.yml)可以正常获取最新的配置信息, 刷新config客户端(http://localhost:1235/configInfo、http://localhost:1236/configInfo)没有获取到更新后的信息:
访问http://localhost:3344/actuator返回结果进行分析,最新链接为busrefresh
{
"_links":{
"self":{
"href":"http://localhost:1234/actuator",
"templated":false
},
"busrefresh-destinations":{
"href":"http://localhost:1234/actuator/busrefresh/{*destinations}",
"templated":true
},
"busrefresh":{
"href":"http://localhost:1234/actuator/busrefresh",
"templated":false
}
}
}
所以接下来通过idea终端执行指令curl -X POST “http://localhost:1234/actuator/busrefresh“
发送POST请求刷新服务端,再次去访问两个客户端,结果能够正常同步到最新的配置信息。
预期结果:一次发送,处处生效。实现了配置的全局广播。
注:动态刷新配置中发送post接口更新的是config的服务端接口; 之前提到过的手工刷新配置是发送post接口更新的是config的每一个客户端接口(多个客户端时一个个更新明显不友好),注意区分。
3.3 动态刷新定点通知
设计思想
: 配置发生变更时,如果不想全部通知,只想定点通知某一个实例生效而不是全部。
1) 从官网可知,定点通知处理实例的寻址规则:http://配置中心服务端地址:配置中心服务端的端口号/actuator/bus-refresh/{destination}
通过{destination}制定具体需要更新的客户端。规则:spring-application-name:端口
2)定点通知的话,比如我们只通知客户端2变更,可执行 :curl -X POST “http://localhost:1234/actuator/bus-refresh/cloud-config-client-two:1236“
3)测试访问