3. zookeeper的安装和客户端使用
1. 安装和集群搭建
1.1安装单机 ZK
对于系统安全性、实时性要求不是很高的系统,为了节约成本,使用单机 Zookeeper 作 为协调服务器也是可以的。
1.1.1 准备工作
(1) 下载 Zookeeper 安装包 在 zookeeper.apache.org 官网下载。 解压:
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
rm apache-zookeeper-3.6.2-bin.tar.gz
1.1.2 安装配置 zk
(1) 复制配置文件 进入到 zk 解压目录的 conf 目录,复制 zoo_sample.cfg 文件,并命名为 zoo.cfg。 (2) 修改配置文件 修改 zoo.cfg 配置文件中 zk 快照的存放目录。不过,由于系统不会自动创建,所以需要 手工创建这些目录。
# The number of milliseconds of each tick
// 心跳时间,为了确保client-server连接存在,以毫秒为单位,最小超时时间为2个心跳时间
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
// 多少个tickTime内,允许其他server连接并初始化数据,如果zookeeper管理的数据较大,则相应增大这个值
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
// 多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
// 用户存放内存数据库快照的文件夹,同时用于集群myid文件也存在这个文件夹里
dataDir=/Users/hsfxuebao/zookeeper3.7.1/data
# the port at which the clients will connect
// 客户端监听端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
1.1.3 操作 zk
- 启动zk:
./zkServer.sh start
- 查看状态
./zkServer.sh status
- 重启 zk
./zkServer.sh restart
- 停止 zk
./zkServer.sh stop
1.1.4 zk 四字命令
从 ZooKeeper3.3.0 开始支持某些特定的四字命令字母与其的交互。它们大多是查询命令, 用来获取 ZooKeeper 服务的当前状态及相关信息。 用户在客户端一般是通过 nc 命令向 zk 提交相应的命令。
(1) 安装 nc 命令
nc, netcat,是一个简单、可靠的网络工具。由于其短小精悍、功能实用,被誉为“网 络界的瑞士军刀”。不过,该命令在 CentOS7 系统中是需要安装的。
(2) 开启四字命令功能
zk 的四字命令功能默认是没有开启的,需要在 zoo.cfg 中通过配置开启。
4lw.commands.whitelist=*
(3) 常用的四字命令
命令 | 用法示例 | 描述 |
---|---|---|
conf | echo conf | nc localhost 2181 | (New in 3.3.0)输出相关服务配置的详细信息。比如端口、 zk 数据及日志配置路径、最大连接数,session 超时时间、 serverId 等 |
cons | echo cons | nc localhost 2181 | (New in 3.3.0)列出所有连接到这台服务器的客户端连接/会话的详细信息。包括“接受/发送”的包数量、 session id 、操作延迟、最后的操作执行等信息。 |
crst | echo crst | nc localhost 2181 | (New in 3.3.0)重置当前这台服务器所有连接/会话的统计信息 |
dump | echo dump | nc localhost 2181 | 列出未经处理的会话和临时节点(只在 leader 上有效)。 |
envi | echo envi | nc localhost 2181 | 输出关于服务器的环境详细信息(不同于 conf 命令),比如host.name、 java.version、java.home、user.dir=/data/zookeeper-3.4.6/bin之类信息 |
ruok | echo ruok | nc localhost 2181 | 测试服务是否处于正确运行状态。如果正常返回"imok",否则返回空。 |
srst | echo srst | nc localhost 2181 | 重置服务器的统计信息 |
srvr | echo srvr | nc localhost 2181 | (New in 3.3.0)输出服务器的详细信息。 zk 版本、接收/发送包数量、连接数、模式(leader/follower)、节点总数。 |
stat | echo stat | nc localhost 2181 | 输出服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower)、节点总数、延迟。 所有客户端的列表。 |
wchs | echo wchs | nc localhost 2181 | (New in 3.3.0)列出服务器 watches的简洁信息:连接总数、 watching节点总数和 watches 总数 |
wchc | echo wchc | nc localhost 2181 | (New in 3.3.0)通过 session 分组,列出 watch 的所有节点,它的输出是一个与 watch 相关的会话的节点列表。如果 watches 数量很大的话,将会产生很大的开销,会影响性能,小心使用。 |
wchp | echo wchp | nc localhost 2181 | (New in 3.3.0)通过路径分组,列出所有的 watch 的 session id 信息。它输出一个与 session 相关的路径。如果 watches 数量很大的话,将会产生很大的开销,会影响性能,小心使用。 |
mntr | echo mntr | nc localhost 2181 | (New in 3.4.0)列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模式(leader/follower)、节点总数、watch 总数、临时节点总数。 |
1.2搭建会动态扩缩容的 ZK 集群
对于单机 Zookeeper,存在单点故障问题,系统的安全性降低。 Zookeeper 在生产场景 中一般是以集群的形式出现的。
1.2.1 集群说明
下面要搭建一个由四台 zk 构成的 zk 集群,其中一台为 Leader,两台 Follower,一台 Observer。 从 Zookeeper3.5.0 版本开始, zk 集群可以实现在无需重启集群的情况下动态的扩容与缩 容。这是新版 zk 与之前相比最大的区别与优势。我们要搭建的集群就可以实现该功能。
1.2.2 克隆并配置第一台主机
(1) 克隆并配置主机 克隆前面单机 Zookeeper 主机,并修改主机名与 ip。
(2) 删除无效数据 克隆前面的 zookeeperOS 主机,并命名为 zookeeperOS1。 进入 zookeeperOS1 的 /usr/data/zookeeper 目录中,将其中的所有内容全部删除,因为这些内容全部都是原母版 zk的,与集群主机没有关系。
(3) 创建 myid 文件 在/usr/data/zookeeper 目录中创建表示当前主机编号的 myid 文件,该编号为当前主机 在集群中的唯一标识。
(4) 定义 zoo.cfg.dynamic 在 conf 目录中定义一个配置文件,名称可以随意,不过,一般为 zoo.cfg.dynamic。这是 配置内容的动态部分,是一个动态配置文件。其内容就是指定集群中各个节点信息。
(5) 修改 zoo.cfg 在 zoo.cfg 文件中添加 zk 集群节点列表。 dynamicConfigFile 中指定的文件路径,最好使用绝对路径。 备课时使用相对路径没有问 题,但直播时使用相对路径出现了问题。
1.2.3 克隆并配置另两台主机
克隆并配置另外两台主机的方式是相同的。 (1) 克隆主机 克隆三台前面 zookeeperOS1 主机,并命名为 zookeeperOS2、 zookeeperOS3 与 zookeeperOS4,并修改主机名与 ip。
(2) 修改 myid 修改 myid 的值与 zoo.cfg 中指定的主机编号相同。
1.2.4 集群动态扩容
这里要在原集群不重启的情况下动态增加一台新的 Server, zookeeperOS5。
(1) 克隆并配置主机 克隆 zookeeperOS1 主机,并命名为 zookeeperOS5,并修改主机名与 ip。
(2) 修改 myid 修改 myid 为 5。
(3) 修改 zoo.cfg.dynamic 修改动态配置文件 zoo.cfg.dynamic:增加上当前 server 的信息。
(4) 修改 zoo.cfg
(5) 运行 reconfig 命令 在原来集群中任意 server 中运行 zkCli.sh 命令,打开客户端。然后运行 reconfig –add 命令,将新的配置信息添加到原集群中的所有动态配置文件中。
此时可以看到配置中新增加了 5 号 server。
(6) 启动 5 号主机
1.2.5 集群动态缩容
动态缩容与动态扩容过程类似。不同的是,运行的reconfig命令为(动态删除5号server): reconfig –remove 5
然后直接将 5 号机停掉即可。这时的集群 server 满员数量即为 4 台,而不是由 5 台满员 宕机一台的情况。
3.3老版 zk 的在线扩容
在线扩容即不影响 zk 集群可用性的前提下进行的扩容,但这种扩容不能称为动态扩容。 因为这种扩容方式需要将原集群中的所有 server 全部逐个重新配置并重启
。所以, 这种在线扩容也称为“滚动重启式”扩容
。
假设原集群中包含三台 zk,它们的 zoo.cfg 中配置了这三台的 server 地址:
server.1=...
server.2=...
server.3=...
现在要扩容一台主机。配置该 server 的 zoo.cfg 中的 server 地址为:
server.1=...
server.2=...
server.3=...
server.4=...
配置好第四台后,直接启动第四台。通过 mntr 命令可以看到集群已经包含四台 server 了。
修改原集群中的某台 server 的配置文件,将扩容的第四台 server 地址添加到其中。然后 restart 该 server。再将原集群中的其它 server 按照上述方式逐个重新配置、重启。
不过,在配置并重启过程中,最好将原 Leader 放到最后一台配置重启。
2. 客户端
2.1 客户端命令
2.1.1 启动客户端
- 连接本机 zk 服务器
zkCli.sh
- 连接其它 zk 服务器
zkCli.sh -server ip:port
2.1.2 查看子节点-ls
查看根节点及/brokers 节点下所包含的所有子节点列表。
2.1.3 创建节点-create
创建持久节点:创建一个名称为 china 的 znode,其值为 999。
创建持久顺序节点:在/china 节点下创建了顺序子节点 beijing、 shanghai、 guangzhou,它们的数据内容分别为 bj、 sh、 gz。
创建临时节点
创建临时顺序节点
2.1.4 获取节点信息-get
- 获取持久节点数据
- 获取顺序节点信息
- 获取临时节点信息
2.1.5 更新节点数据内容-set
更新前:
更新:
2.1.6 删除节点-delete
2.1.7 ACL 理论
(1) ACL 简介 ACL 全称为 Access Control List(访问控制列表),是一种细粒度的权限管理策略
,可以针对任意用户
与组进行细粒度的权限控制。 zk 利用 ACL 控制 znode 节点的访问权限,如节点数据读写、节点创建、节点删除、读取子节点列表、设置节点权限等。
(2) ACL 与 UGO 对比 Linux 系统采用的权限控制机制是 UGO(User、 Group、 Other)。 UGO 是一种粗粒度的文件系统权限控制模式,其只能对这三类用户进行权限设置。 例如,若要对李四用户进行权限设置,李四与当前的创建者不是同组,但也不是 Other(Other 中包含王五、赵六等,但不 包含李四)。那么使用 UGO 是无法进行设置的,但使用 ACL 是可以的。
(3) zk 的 ACL 维度 目前大多数 Unix 已经支持了 ACL, Linux 也从 2.6 版本开始支持了 ACL。 Unix/Linux 系统的 ACL 分为两个维度:组与权限,且目录的子目录或文件能够继承父目录的 ACL 的。而Zookeeper 的 ACL 分为三个维度:授权策略 scheme、授权对象 id、用户权限 permission, 子znode 不会继承父 znode 的权限
。
- 授权策略 scheme :授权策略用于确定权限验证过程中使用的检验策略(简单来说就是,通过什么来验证权限,如何验证其身份),在 zk 中最常用的有四种策略。
- IP:根据 IP 地址进行权限验证。
- digest:根据用户名与密码进行验证。
- world:对所有用户不做任何验证。 但有些系统节点是不能操作的。
- super:超级用户可以对任意节点进行任意操作。
- 授权对象 id : 授权对象指的是权限赋予的用户。不同的授权策略具有不同类型的授权对象。下面是各个授权模式对应的授权对象 id。
- ip:授权对象是 IP 地址。
- digest:授权对象是“用户名 + 密码”。
- world:其授权对象只有一个,即 anyone。
- Super:与 digest 相同,授权对象为“用户名 + 密码”。
- 权限 Permission : 权限指的是通过验证的用户可以对 znode 执行的操作。共有五种权限,不过 zk 支持自定义权限。
- c: Create,允许授权对象在当前节点下创建子节点。
- d: Delete,允许授权对象删除当前节点。
- r: Read,允许授权对象读取当前节点的数据内容,及子节点列表。
- w: Write,允许授权对象修改当前节点的数据内容,及子节点列表。
- a: Acl,允许授权对象对当前节点进行 ACL 相关的设置。
2.1.8 ACL 操作
- 查看权限-getAcl
- 设置权限 下面的命令是,首先增加了一个认证用户 zs,密码为 123,然后为/china 节点指定只有zs 用户才可访问该节点,而访问权限为所有权限。
2.2可视化客户端
zk 常见的可视化客户端有两个: ZooView 与 ZooInspector
。
2.2.1 ZooView
- 下载地址 : https://github.com/HelloKittyNII/ZooViewer
- 用法 : 解压后直接双击运行 startup.bat 即可。
2.2.2 ZooInspector
- 下载地址: https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2.4 ZKClient 客户端
2.4.1 简介
ZkClient 是一个开源客户端,在 Zookeeper 原生 API 接口的基础上进行了包装,更便于 开发人员使用。内部实现了 Session 超时重连, Watcher 反复注册等功能。像 dubbo 等框架对其也进行了集成使用。
2.4.2 API 介绍
以下 API 方法均是 ZkClient 类中的方法。
- (1)创建会话 ZkClient 中提供了九个构造器用于创建会话。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
zkServers | 指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成 |
connectionTimeout | 设置连接创建超时时间,单位毫秒。在此时间内无法创建与 zk 的连接,则直接放弃连接,并抛出异常 |
sessionTimeout | 设置会话超时时间,单位毫秒 |
zkSerializer | 为会话指定序列化器。 zk 节点内容仅支持字节数组(byte[])类型,且 zk 不负责序列化。在创建 zkClient 时需要指定所要使用的序列化器,例如 Hessian或 Kryo。默认使用 Java 自带的序列化方式进行对象的序列化。当为会话指定了序列化器后,客户端在进行读写操作时就会自动进行序列化与反序列化。 |
connection | IZkConnection 接口对象,是对 zk 原生 API 的最直接包装,是和 zk最直接的交互层,包含了增删改查等一系列方法。该接口最常用的实现类是 zkClient 默认的实现类 ZkConnection,其可以完成绝大部分的业务需求。 |
operationRetryTimeout | 设置重试超时时间,单位毫秒 |
- (2)创建节点
ZkClient 中提供了 15 个方法用于创建节点。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要创建的节点完整路径 |
data | 节点的初始数据内容,可以传入 Object 类型及 null。 zk 原生 API中只允许向节点传入 byte[]数据作为数据内容,但 zkClient 中具有自定义序列化器,所以可以传入各种类型对象。 |
mode | 节点类型, CreateMode 枚举常量,常用的有四种类型。PERSISTENT :持久型;PERSISTENT_SEQUENTIAL :持久顺序型;EPHEMERAL :临时型;EPHEMERAL_SEQUENTIAL :临时顺序型 |
acl | 节点的 ACL 策略 |
callback | 回调接口 |
context | 执行回调时可以使用的上下文对象 |
createParents | 是否级递归创建节点。 zk 原生 API 中要创建的节点路径必须存在,即要创建子节点,父节点必须存在。但 zkClient 解决了这个问题,可以做递归节点创建。没父节点,可以先自动创建了父节点,然后再在其下创建子节点。 |
- (3) 删除节点
ZkClient 中提供了 3 个方法用于创建节点。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要删除的节点的完整路径 |
version | 要删除的节点中包含的数据版本 |
- (4) 更新数据
ZkClient 中提供了 3 个方法用于修改节点数据内容。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为: |参数名 |意义| |path| 要更新的节点的完整路径| |data| 要采用的新的数据值| |expectedVersion| 数据更新后要采用的数据版本号|
- (5) 检测节点是否存在
ZkClient 中提供了 2 个方法用于判断指定节点的存在性,但 public 方法就一个:只有一 个参数的 exists()方法。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要判断存在性节点的完整路径 |
watch | 要判断存在性节点及其子孙节点是否具有 watcher 监听 |
- (6) 获取节点数据内容
ZkClient 中提供了 4 个方法用于获取节点数据内容,但 public 方法就三个。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要读取数据内容的节点的完整路径 |
watch | 指定节点及其子孙节点是否具有 watcher 监听 |
returnNullIfPathNotExists | 这是个 boolean 值。默认情况下若指定的节点不存在,则会抛出 KeeperException$NoNodeException 异常。设置该值为 true,若指定节点不存在,则直接返回 null 而不再抛出异常。 |
stat | 指定当前节点的状态信息。不过,执行过后该 stat 值会被最新获取到的 stat 值给替换。 |
- (7) 获取子节点列表 ZkClient 中提供了 2 个方法用于获取节点的子节点列表,但 public 方法就一个:只有一 个参数的 getChildren()方法。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要获取子节点列表的节点的完整路径 |
watch | 要获取子节点列表的节点及其子孙节点是否具有 watcher 监听 |
- (8) watcher 注册 ZkClient
采用 Listener 来实现 Watcher 监听
。客户端可以通过注册相关监听器来实现对zk 服务端事件的订阅。
可以通过 subscribeXxx()方法实现 watcher 注册
,即相关事件订阅;通过 unsubscribeXxx()方法取消相关事件的订阅。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 | 意义 |
---|---|
path | 要操作节点的完整路径 |
IZkChildListener | 子节点数量变化监听器 |
IZkDataListener | 数据内容变化监听器 |
IZkStateListener | 客户端与zk的会话连接状态变化监听器,可以监听新会话的创建、会话创建出错、连接状态改变。连接状态是系统定义好的枚举类型 Event.KeeperState 的常量 |
2.4.3 代码演示
(1) 创建工程 创建一个 Maven 的 Java 工程,并导入以下依赖。
<!--zkClient 依赖-->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
(2) 代码
public class ZKClientTest {
private static final String CLUSTER = "zkOS:2181";
private static final String PATH = "/xxx";
public static void main(String[] args) {
// ---------------- 创建会话 -----------
ZkClient zkClient = new ZkClient(CLUSTER);
// ---------------- 创建节点 -----------
CreateMode mode = CreateMode.PERSISTENT;
String nodeName = zkClient.create(PATH, "first log", mode);
System.out.println("新创建的节点名称为: " + nodeName);
// ---------------- 获取数据内容 -----------
Object readData = zkClient.readData(PATH);
System.out.println("节点的数据内容为: " + readData);
// ---------------- 注册数据变更 watcher -----------
zkClient.subscribeDataChanges(PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exceptiopn {
System.out.println("节点" + dataPath + "的数据已经更新为了" + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath + "的数据内容被删除");
}
});
// ---------------- 更新数据内容 -----------
zkClient.writeData(PATH, "second log");
String updatedData = zkClient.readData(PATH);
System.out.println("更新过的数据内容为: " + updatedData);
// ---------------- 删除节点 -----------
zkClient.delete(PATH);
// ---------------- 判断节点存在性 -----------
boolean isExists = zkClient.exists(PATH);
System.out.println(PATH + "节点仍存在吗? " + isExists);
}
}
2.5 Curator 客户端
2.5.1 简介
Curator 是 Netflix 公司开源的一套 zk 客户端框架,与 ZkClient 一样,其也封装了 zk 原生API。其目前已经成为 Apache 的顶级项目。同时, Curator 还提供了一套易用性、可读性更强的 Fluent 风格的客户端 API 框架。
2.5.2 API 介绍
这里主要以 Fluent 风格客户端 API 为主进行介绍。
(1) 创建会话
普通 API 创建 newClient() 在 CuratorFrameworkFactory 类中提供了两个静态方法用于完成会话的创建。
查看这些方法的源码可以看到具体的参数名称,这些参数的意义为:
参数名 意义 connectString 指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成 sessionTimeoutMs 设置会话超时时间,单位毫秒,默认 60 秒 connectionTimeoutMs 设置连接超时时间,单位毫秒,默认 15 秒 retryPolicy 重试策略, 内置有四种策略,分别由以下四个类的实例指定:ExponentialBackoffRetry、 RetryNTimes、 RetryOneTime、RetryUntilElapsed Fluent 风格创建
(2) 创建节点 create()
下面以满足各种需求的举例方式分别讲解节点创建的方法。
说明:下面所使用的 client 为前面所创建的 Curator 客户端实例。
- 创建一个节点,初始内容为空
- 语句: client.create().
forPath(path)
; - 说明:默认创建的是持久节点,数据内容为空。
- 语句: client.create().
- 创建一个节点,附带初始内容
- 语句: client.create().forPath(path,
“mydata”.getBytes()
); - 说明: Curator 在指定数据内容时,只能使用 byte[]作为方法参数。
- 语句: client.create().forPath(path,
- 创建一个临时节点,初始内容为空
- 语句: client.create().
withMode(CreateMode.EPHEMERAL)
.forPath(path); - 说明: CreateMode 为枚举类型。
- 语句: client.create().
- 创建一个临时节点,并自动递归创建父节点
- 语句: client.create().
createingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL).forPath(path); - 说明:若指定的节点多级父节点均不存在,则会自动创建。
- 语句: client.create().
(3) 删除节点 delete()
- 删除一个节点
- 语句: client.
delete()
.forPath(path); - 说明:只能将叶子节点删除,其父节点不会被删除。
- 语句: client.
- 删除一个节点,并递归删除其所有子节点
- 语句: client.delete().
deletingChildrenIfNeeded()
.forPath(path); - 说明:该方法在使用时需谨慎。
- 语句: client.delete().
(4) 更新数据 setData()
- 设置一个节点的数据内容
- 语句: client.
setData()
.forPath(path,newData
); - 说明:该方法具有返回值,返回值为 Stat 状态对象。
- 语句: client.
(5) 检测节点是否存在 checkExits()
- 设置一个节点的数据内容
- 语句: Stat stat = client.
checkExists()
.forPath(path); - 说明:该方法具有返回值,返回值为 Stat 状态对象。若 stat 为 null,说明该节点不存在,否则说明节点是存在的。
- 语句: Stat stat = client.
(6) 获取节点数据内容 getData()
- 读取一个节点的数据内容
- 语句: byte[] data = client.
getDate()
.forPath(path); - 说明:其返回值为 byte[]数组。
- 语句: byte[] data = client.
(7) 获取子节点列表 getChildren()
- 读取一个节点的所有子节点列表
- 语句: List childrenNames = client.
getChildren()
.forPath(path); - 说明:其返回值为 byte[]数组。
- 语句: List childrenNames = client.
(8) watcher 注册 usingWatcher()
curator 中绑定 watcher 的操作有三个: checkExists()、 getData()、getChildren()。这三个方法的共性是,它们都是用于获取的。这三个操作用于 watcher 注册的方法是相同的,都是usingWatcher()方法。
这两个方法中的参数 CuratorWatcher 与 Watcher 都为接口
。这两个接口中均包含一个 process()方法,它们的区别是, CuratorWatcher 中的 process()方法能够抛出异常,这样的话,该异常就可以被记录到日志中。
- 监听节点的存在性变化
Stat stat = client.checkExists().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点存在性发生变化");
}).forPath(path);
- 监听节点的内容变化
byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点数据内容发生变化");
}).forPath(path);
- 监听节点子节点列表变化
List<String> sons = client.getChildren().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点的子节点列表发生变化");
}).forPath(path);
2.5.3 代码演示
(1) 创建工程
创建一个 Maven 的 Java 工程,并导入以下依赖。
<!--curator 依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
(2) 代码
public class FluentTest {
private static final String CLUSTER = "zookeeperOS:2181";
private static final String ROOT_PATH = "mytest";
public static void main(String[] args) throws Exception {
// ---------------- 创建会话 -----------
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(CLUSTER)
.sessionTimeoutMs(15000)
.connectionTimeoutMs(13000)
.retryPolicy(retryPolicy)
.namespace(ROOT_PATH)
.build();
client.start();
String nodePath = "/host";
// ---------------- 创建节点 -----------
String nodeName = client.create().forPath(nodePath);
System.out.println("新创建的节点名称为: " + nodeName);
// ---------------- 获取数据内容并注册 watcher -----------
byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println(event.getPath() + "数据内容发生变化");
}).forPath(nodePath);
System.out.println("节点的数据内容为: " + new String(data));
// ---------------- 获取子节点列表并注册 watcher -----------
List<String> children = client.getChildren().usingWatcher(
(CuratorWatcher) event -> {
System.out.println(event.getPath() + "子节点列表发生变化");
}).forPath(nodePath);
System.out.println("当前节点的子节点列表为: " + children);
// ---------------- 更新数据内容 -----------
client.setData().forPath(nodePath, "newhost".getBytes());
byte[] newData = client.getData().forPath(nodePath);
System.out.println("更新过的数据内容为: " + new String(newData));
// ---------------- 删除节点 -----------
if(client.checkExists().forPath(nodePath) != null) {
client.delete().forPath(nodePath);
}
}
}