12. 源码| 心跳检测流程源码分析
心跳检测的源码流程并不算长,但是要真的了解这个流程必须对ZK的各种参数及作用有所了解,否则这个流程理解不到核心思想。如果对大致流程以及参数意义解析有兴趣可以看上一篇心跳检测流程及Session时间参数解析
1. Client发送ping请求
1.1 SendThread心跳检测发起者
心跳检测的发送逻辑是在这个线程对象中完成的,会判断每次ping的时间间隔以及具体什么时候需要发送ping请求,下一篇有机会将会详细分析一下具体的参数意义。
class SendThread extends ZooKeeperThread {
// 客户端连接Server端的负责对象,默认采用的是NIO方式连接
private final ClientCnxnSocket clientCnxnSocket;
// 是否为第一次连接,默认是true
private boolean isFirstConnect = true;
// 发送队列,当Client端有请求需要发送时将会封装成Packet包添加到这里面,在
// SendThread线程轮询到有数据时将会取出第一个包数据进行处理发送。使用的也是
// FIFO模式
private final LinkedList<Packet> outgoingQueue =
new LinkedList<Packet>();
@Override
public void run() {
// 更新clientCnxnSocket的发送事件以及关联SendTreahd,这里sessionId
// 没有值,就是0
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
// 上次ping和现在的时间差
int to;
// 时间为10s
final int MAX_SEND_PING_INTERVAL = 10000;
// 如果ZK是存活的就一直轮询
while (state.isAlive()) {
try {
// 未连接的情况忽略
...
if (state.isConnected()) {
// 后面的关于zooKeeperSaslClient处理流程略过
...
// 连接上之后使用的属性变成了readTimeout,getIdleRecv()
// 方法使用的属性为lastHeard,即最后一次监听到服务端响应
// 的时间戳
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
// 未连接时会进入,因此ping流程这里不会使用,可以得出结论
// connectTime属性只会在新建连接时被使用
// 连接上之后失去作用
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
// 如果进入到这里面,说明readTimeout或者connectTimeout
// 要小于上次监听到Server端的时间间隔,意味着时间过期
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
// 获取下次ping的时间,也可以说获取select()最大阻塞时间
// 这个公式分两个情况:
// 1、lastSend距今超过1000ms(1s),则固定减去1000ms
// 具体公式表现为:(readTimeout / 2) - idleSend - 1000
// 2、lastSend距今小于等于1000ms,则不做任何操作
// 具体公式表现为:(readTimeout / 2) - idleSend
int timeToNextPing = readTimeout / 2 -
clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000)
? 1000 : 0);
// 如果timeToNextPing小于等于0或者idleSend间隔超过10s
// 说明是时候该发送ping请求确认连接了
if (timeToNextPing <= 0 ||
clientCnxnSocket.getIdleSend() >
MAX_SEND_PING_INTERVAL) {
// 发送ping请求包
sendPing();
// 更新lastSend属性
clientCnxnSocket.updateLastSend();
} else {
// to在前面设的值
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// 中间是只读连接CONNECTEDREADONLY,略过
...
// 要发送ping请求这个方法可能将会被调用两次,第一次是在
// sendPing()之后调用,如果是OP_WRITE操作则可以立马进行写操作
// 如果不是则会在第一次调用时开启OP_WRITE操作,轮询第二次的时候
// 再调用一次用来发送ping数据包
clientCnxnSocket.doTransport(to, pendingQueue,
outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
// 处理异常的暂不做分析
...
}
}
// 跑到这里说明ZK已经关闭了,后面会做一些善后的工作,如发送关闭事件
// 清除连接的缓存数据等
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
}
private void sendPing() {
lastPingSentNs = System.nanoTime();
// 创建xid为PING_XID的RequestHeader对象
RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
// ping请求只有RequestHeader有值,其它的都是null
queuePacket(h, null, null, null, null, null, null, null, null);
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath,Object ctx,WatchRegistration watchRegistration){
// 方法的大致作用便是将前面传进来的RequestHeader对象封装成Packet对象
// 并最终放入outgoingQueue数组等待下次发送数据包时发送
Packet packet = null;
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response,
watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 正常流程会进行到这里,前面的流程可以略过
outgoingQueue.add(packet);
}
}
// 调用selector.wakeup()方法来唤醒select()方法,调用这个方法的作用
// 便是防止将ping数据包放到outgoingQueue后再次被select()方法阻塞从而
// 直接调用阻塞方法的后面逻辑
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
}
1.2 ClientCnxnSocket套接字交互类
和Socket进行交互的类,负责向Socket中写入数据和读取数据。在ping流程的第一步中只会执行写操作,因此接下来只需要关注写操作的源码即可。
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
// NIO的多路复用选择器
private final Selector selector = Selector.open();
// 本Socket对应的SelectionKey
private SelectionKey sockKey;
// 是否已经初始化,默认false,到发送ping操作时该值一定为true
protected boolean initialized;
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue,
LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
// 最多休眠waitTimeOut时间获取NIO事件,调用wake()方法、有可读IO事件和
// 有OP_WRITE写事件可触发
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
// 获取IO事件绑定的SelectionKey对象
selected = selector.selectedKeys();
}
// 更新now属性为当前时间戳
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 先判断SelectionKey事件是否是连接事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
// 略过
...
} else if ((k.readyOps() &
(SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 再判断是否是OP_READ或者OP_WRITE事件,在ping流程的第一步中
// 执行到这里则一定是OP_WRITE事件
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
// 如果这个代码块发生了作用并开启了OP_WRITE事件说明在前面调用sendPing()
// 方法后并没有NIO事件发生,导致outgoingQueue只有一个ping数据包,需要
// 在这里手动判断一次开启OP_WRITE。当然也不排除前面有NIO事件,并且通过
// 前面的NIO事件产生了新的数据包导致需要开启OP_WRITE事件
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
// 查看是否有可发送的Packet包数据
if (findSendablePacket(outgoingQueue, cnxn.sendThread
.clientTunneledAuthenticationInProgress())!=null) {
// 打开OP_WRITE操作
enableWrite();
}
}
}
// 清除SelectionKey集合
selected.clear();
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 这里有处理OP_READ类型的判断,即处理ZK的Server端传过来的请求
// 在第一步中不会走到这里面去,因此忽略
if (sockKey.isReadable()) {
...
}
// 处理OP_WRITE类型事件,即处理要发送到ZK的Server端请求包数据
if (sockKey.isWritable()) {
// 保证线程安全
synchronized(outgoingQueue) {
// 获取最新的需要发送的数据包,这里获取的便是前面SendThread
// 放进去的只有ping操作的Packet包对象
Packet p = findSendablePacket(outgoingQueue, cnxn
.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
// 更新最后的发送时间
updateLastSend();
// 如果Packet包的ByteBuffer为空则调用createBB()创建
// 连接时ByteBuffer是一定为空的,因此这里会一定进入
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
// createBB方法的作用便是序列化请求并将byte[]数组
// 添加到ByteBuffer中
p.createBB();
}
// 使用获取的SocketChannel写入含有序列化数据的ByteBuffer
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
// 发送成功并删除第一个Packet包对象
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
// ping的requestHeader一定不为空,但也是被排除了
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
// 如果outgoingQueue为空或者尚未连接成功且本次的Packet包对象
// 已经发送完毕则关闭OP_WRITE操作,ping操作只是一次通知操作
// 因此就算这里被关闭了写操作也无所谓
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null &&
!p.bb.hasRemaining()) {
disableWrite();
} else {
// 为了以防万一打开OP_WRITE操作
enableWrite();
}
}
}
}
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
boolean clientTunneledAuthenticationInProgress) {
synchronized (outgoingQueue) {
// 判断outgoingQueue是否为空
if (outgoingQueue.isEmpty()) {
return null;
}
// 两种条件:
// 如果第一个的ByteBuffer不为空
// 如果传入进来的clientTunneledAuthenticationInProgress为false
// 参数为false说明认证尚未配置或者尚未完成
if (outgoingQueue.getFirst().bb != null
|| !clientTunneledAuthenticationInProgress) {
return outgoingQueue.getFirst();
}
// 跑到这里说明认证已完成,需要遍历outgoingQueue数组,把连接的
// 请求找到并放到队列的第一个,以保证下次读取会读取到连接请求
ListIterator<Packet> iter = outgoingQueue.listIterator();
while (iter.hasNext()) {
Packet p = iter.next();
// 只有连接的requestHeader是空的,因此只需要判断这个条件即可
// 其它类型的包数据header肯定是不为空的
if (p.requestHeader == null) {
// 先删除本包,随后放到第一位
iter.remove();
outgoingQueue.add(0, p);
return p;
}
}
// 执行到这里说明确实没有包需要发送
return null;
}
}
}
2. Server端接收处理响应数据
其实在第一步调用SocketChannel.connect()方法时,第二步就已经接收新建连接的通信并且生成了session信息了,但为了便于理解,我们还是把第二步当成依赖于第一步。后面在源码会详细说明。
2.1 NIOServerCnxnFactory接收NIO请求
NIOServerCnxnFactory负责使用Selector多路复用选择器来从多个Client端获取Socket的新建和发送数据,因此在交互流程中,此类为Server端的起始点,也是通过线程轮询的方式不断地获取其它Socket发送的请求数据。
这块跟新建连接分析一样,具体可参照:新建连接交互流程源码分析(单机Server服务端与Client客户端)
2.2 连接对象NIOServerCnxn
这个代表着Client端在Server端的连接对象,新连接在Server端的表现便是一个NIOServerCnxn对象。并且这个对象会和对应的SelectionKey、Socket进行绑定。这个类里面最重要的便是doIO()方法,在这个方法中会判断读写事件,并根据相应的值进行处理,在新建连接流程中,只会分析读事件。关键源码如下:
这块跟新建连接分析一样,具体可参照:新建连接交互流程源码分析(单机Server服务端与Client客户端)
2.3 单机运行的ZooKeeperServer
前面文章解释过,这个类就是ZK的Server实例,每个ZK服务器上对应着一个ZooKeeperServer实例,这里面有诸多服务器方面的属性配置,但前面分析过,因此本次流程代码便不做过多的介绍了,有兴趣的可以翻看前面的文章。
在Client端有ping心跳检测间隔时间,在Server端有tickTime存活检测时间,这两个属性代表的意思是不一样的,Client端的ping心跳检测间隔时间是轮询隔一段时间后向Server端发送ping请求,而Server端的tickTime间隔时间作用是每隔一段时间就判断在Server端的Client连接对象是否已经死亡,如果已经过期死亡则将连接对象进行清除关闭。所以ping心跳检测的意义是Client端告诉服务器我还活着,tickTime意义是定期清除没有告诉Server端还存活的连接。
这块跟新建连接分析一样,具体可参照:新建连接交互流程源码分析(单机Server服务端与Client客户端)
2.4 SessionTracker校验Session时间
既然前面分析过了更新Session时间,那么这里便不再分析,仅仅看一下更新过后的Session时间是在哪里被使用的。
在Client端有着一套机制来保持Session会话,而Server端肯定也是会有的。这个类的作用便是一直轮询查看哪些Session将要过期,如果过期了就进行相应的处理。
Server端保存着很多Client端的连接,Server端判断这些Client端是否依然存活的方法便是以ZK配置的tickTime属性为间隔,每个间隔时间点上都分布了若干个Client端对应的Session,而这些个间隔点上的Session就代表着这个时间点将会过期的Session,这样SessionTracker便只需要每隔一个tickTime单元时间遍历一次需要删除的Session将其设置过期即可,而无需每次轮询都遍历一次Session集合逐个判断,这种思想不可谓不妙啊。
public class SessionTrackerImpl extends ZooKeeperCriticalThread
implements SessionTracker {
// 保存sessionId和对应的Session对象
HashMap<Long, SessionImpl> sessionsById;
// key为某一个过期时间,value为这一个时间点对应要过期的Session对象
// 比如在1610539095000时间戳有3个Session要过期,key就是这个时间戳
// 而value则保存的是这三个要过期的Session对象
HashMap<Long, SessionSet> sessionSets;
// key为sessionId,value为这个session的过期时间
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
// 下一次新建session时的id
long nextSessionId = 0;
// 下一次session过期的时间戳,计算公式为:
// (某一时间戳 / expirationInterval + 1) * expirationInterval
// 因此就是以tickTime为单位往上加一次tickTime,并且能够为tickTime整除
long nextExpirationTime;
// 每次轮询的间隔时间,值就是tickTime
int expirationInterval;
@Override
synchronized public void run() {
// 这个方法很简单,只需要每隔一个expirationInterval时间便从待删除
// Session集合sessionSets中取出Session进行过期操作就行
// 当然,这里很简单就说明在另外一个地方进行了SessionTimeout更新操作
// 上一篇源码便介绍过本类中的touchSession,因此有兴趣的去翻看上一篇
// 分析SessionTrackerImpl的源码解析,结合起来分析便可以知道ZK的巧妙
try {
while (running) {
// 获取当前时间戳
currentTime = System.currentTimeMillis();
// 判断一下当前时间是否已经到达了下次过期时间点
if (nextExpirationTime > currentTime) {
// 如果未到则直接阻塞剩余等待
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
// 将nextExpirationTime时间点将要过期的Session全部取出来
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 将这些Session逐个关闭并进行过期操作
setSessionClosing(s.sessionId);
// 这里面的过期操作实际上就是向客户端发送一个
// closeSession类型的响应
expirer.expire(s);
}
}
// 增加至下一个过期时间点
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
...
}
}
}
2.5 RequestProcessor请求处理链
跳过中间的SessionTracker插曲,接下来看看RequestProcessor链对于ping操作做了什么处理。
前面介绍过,在单机运行时RequestProcessor处理链只有三个:PrepRequestProcessor、SyncRequestProcessor和FinalRequestProcessor,其中前两个是线程对象,最后一个是普通的对象,至于原因前面的文章介绍过。接下来的三个RequestProcessor大致作用不做分析,有兴趣可以看下以前的文章。
2.5.1 PrepRequestProcessor
public class PrepRequestProcessor extends ZooKeeperCriticalThread
implements RequestProcessor {
// 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理
LinkedBlockingQueue<Request> submittedRequests =
new LinkedBlockingQueue<Request>();
// 本RequestProcessor的下一个RequestProcessor对象
RequestProcessor nextProcessor;
ZooKeeperServer zks;
@Override
public void processRequest(Request request) {
// RequestProcessor的实现方法,由于内部使用轮询方式从submittedRequests
// 集合获取数据,因此在这里直接把Request添加到集合中即可
submittedRequests.add(request);
}
@Override
public void run() {
try {
while (true) {
// 轮询从submittedRequests集合中获取Request对象
Request request = submittedRequests.take();
// 如果requestOfDeath代表ZK已经关闭,因此退出循环
if (Request.requestOfDeath == request) {
break;
}
// 开始处理正常的Request
pRequest(request);
}
}...
}
protected void pRequest(Request request)
throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
// 与连接无关的case情况忽略
...
case OpCode.ping:
// 判断sessionId对应的Session是否是同一个
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
...
} ...
request.zxid = zks.getZxid();
// 在调用下一个RequestProcessor前先来分析一下ping请求的具体属性
// request.cnxn为连接对象,request.type为ping
// request.request为ping的数据,request.txn为null,request.hdr为null
// 调用下个RequestProcessor来处理Request
nextProcessor.processRequest(request);
}
}
2.5.2 SyncRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread
implements RequestProcessor {
// 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
// 保存的是已经被写入磁盘但是待刷新的事务
private final LinkedList<Request> toFlush = new LinkedList<Request>();
// 本RequestProcessor的下一个RequestProcessor对象
private final RequestProcessor nextProcessor;
// Server端快照的数量
private static int snapCount = ZooKeeperServer.getSnapCount();
// 在回滚前的log数量,随机生成的
private static int randRoll;
public void processRequest(Request request) {
// 类似于PrepRequestProcessor,内部使用轮询方式从submittedRequests
// 集合获取数据,因此在这里直接把Request添加到集合中即可
queuedRequests.add(request);
}
@Override
public void run() {
try {
int logCount = 0;
// 避免服务都在同一时间获取快照snapshot,这里面设置的是randRoll属性
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
// 从queuedRequests获取Request
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
// 如果已经结束则退出循环
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 将Request写入到log中
if (zks.getZKDatabase().append(si)) {
logCount++;
// 如果日志的数量大于某个临界点,则生成一次快照
if (logCount > (snapCount / 2 + randRoll)) {
// 途中会异步生成快照,过程忽略,操作完之后
// logCount 归零
...
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// 如果所有的事务都处理完则使用nextProcessor
// 开始进行下一步处理
if (nextProcessor != null) {
// 进行处理
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
// 如果前面两个条件都不满足,则把Request添加到待刷新的
// 事务集合中
toFlush.add(si);
if (toFlush.size() > 1000) {
// 当待刷事务到达了1000个,则把集合中的所有事务全都
// 刷掉并使用nextProcessor依次进行处理
flush(toFlush);
}
}
}
} ...
}
}
2.5.3 FinalRequestProcessor
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
public void processRequest(Request request) {
// 直接开始处理Request请求
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
// ping请求outstandingChanges数组一定为空,因此循环略过
...
// ping请求的hdr为空,因此略过
if (request.hdr != null) {
...
}
// ping请求判断为false
if (Request.isQuorum(request.type)) {
...
}
}
// 关闭session的操作略过
...
// 如果执行到这里连接对象还为空则直接退出
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
// 执行中的数量减一
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
boolean closeSession = false;
try {
// 无关紧要的略过
...
// 开始根据Request的操作类型进行相应的处理
switch (request.type) {
// 与连接无关的case忽略
...
case OpCode.ping: {
// 更新ZK服务器的状态
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
// 更新Client的连接对象属性
cnxn.updateStatsForResponse(request.cxid, request.zxid,
lastOp, request.createTime,
System.currentTimeMillis());
// 对ping请求进行响应
cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase()
.getDataTreeLastProcessedZxid(), 0), null,
"response");
return;
}
}
}// 异常忽略
// ping请求不会执行到这里的代码来,因此略过
...
}
}
2.6 NIOServerCnxn发送ping响应
正常的流程走到这里,Server端的处理便是基本上要结束了,最后的步骤也是十分的简单,如果ByteBuffer空间足够则直接发送完成,如果不足够在NIOServerCnxnFactory中再进行一次NIO操作发送一次即可。
public class NIOServerCnxn extends ServerCnxn {
@Override
public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
int responseSize = 0;
try {
ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
responseSize = bb[0].getInt();
bb[0].rewind();
// 发送ByteBuffer对象数据
sendBuffer(bb);
decrOutstandingAndCheckThrottle(h);
} catch (Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
return responseSize;
}
public void sendBuffer(ByteBuffer... buffers) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid());
}
synchronized (outgoingBuffers) {
// 添加到outgoingBuffers集合中交给doIO()方法里面的write方法
// 类型处理,该逻辑在前面已经分析过了,可以直接回头看
for (ByteBuffer buffer : buffers) {
outgoingBuffers.add(buffer);
}
outgoingBuffers.add(packetSentinel);
}
requestInterestOpsUpdate();
}
}
3.Client端接收响应
当第二步走完后便进入到了第三步Client接收Server端响应并进行处理的阶段了,但ping请求收到响应并不会做些什么。
3.1 SendThread接收通知
前面已经说了,SendThread负责发送和接收包数据,当Server端发送了新建连接响应后该类就会接收并进行相应的处理。本次分析只会分析经过的逻辑部分,其它的逻辑不做分析。
class SendThread extends ZooKeeperThread {
@Override
public void run() {
...
while (state.isAlive()) {
try {
...
// 还是老地方,调用doTransport()方法处理NIO的事件
clientCnxnSocket.doTransport(to, pendingQueue,
outgoingQueue, ClientCnxn.this);
}
}
...
}
}
3.2 ClientCnxnSocketNIO处理读事件
这次进入到该类处理的便是OP_READ类型的NIO事件。
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue,
LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
// 老逻辑,不再分析
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() &
(SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 针对客户端的响应均会进入到该方法中
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
// 后面略
...
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 开始处理读事件
if (sockKey.isReadable()) {
// 从Socket中读取数据
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException();
}
// incomingBuffer已经读取完毕
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
// 读取ping响应时连接肯定完成了,因此initialized为true
// 此逻辑不会生效,略过
...
} else {
// ping响应以及其它的普通请求将会跑到这里
sendThread.readResponse(incomingBuffer);
// 还原ByteBuffer对象
lenBuffer.clear();
incomingBuffer = lenBuffer;
// 更新lastHeard属性,表示已经处理完Server端的响应
updateLastHeard();
}
}
}
// 后面的处理写事件忽略
}
}
3.3 SendThread处理ping响应
这里只需要分析SendThread的readResponse()方法,这个方法用来处理普通请求响应。
class SendThread extends ZooKeeperThread {
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
// 将从Server端获取的ByteBuffer数据反序列化得到ReplyHeader
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
// ping的xid为-2,因此会进入到这里面
// ping操作在这里面不会进行任何操作,而是直接退出,因此
// readResponse()对ping没有任何作用
return;
...
}
}
}
至此,ping的交互流程便已经结束。