10. 源码| 交互流程源码分析
1. 前言
经过上一篇文章的流程图,对于ZK新建连接的大致流程应该了解的差不多了,接下来开始进行详细的代码分析,同样是三步走,在进行阅读时可以根据前面的流程图一步一步跟着源码走,这样阅读起来会更加的清晰方便。
需要注意的是,ZK的很多代码构成都是通过内部类完成的,因此等下分析源码时可能方法名不会按源码的方式组排,只是简单的展示源码的大致流程和作用。
上篇文章分析了client端发起连接的源码分析,本篇继续看后面的两个步骤。
2. Server端接收处理响应数据
其实在第一步调用SocketChannel.connect()方法时,第二步就已经接收新建连接的通信并且生成了session信息了,但为了便于理解,我们还是把第二步当成依赖于第一步。后面在源码会详细说明。
2.1 NIOServerCnxnFactory接收NIO请求
NIOServerCnxnFactory负责使用Selector多路复用选择器来从多个Client端获取Socket的新建和发送数据,因此在交互流程中,此类为Server端的起始点,也是通过线程轮询的方式不断地获取其它Socket发送的请求数据。
这里面有几个内部类如下:
private abstract class AbstractSelectThread extends ZooKeeperThread;
// 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接
private class AcceptThread extends AbstractSelectThread;
/**
* 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。
* 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。
*/
public class SelectorThread extends AbstractSelectThread;
/**
* IOWorkRequest是一个小的包装类,允许执行doIO()调用
* 使用WorkerService在连接上运行。
*/
private class IOWorkRequest extends WorkerService.WorkRequest
/**
* 此线程负责关闭过时的连接,以便未建立会话的连接已正确过期。
*/
private class ConnectionExpirerThread extends ZooKeeperThread
AcceptThread 线程负责接收来自客户端的连接,并将SocketChannel放入到SelectorThread的acceptedQueue队列中。
SelectorThread 线程负责将读写事件交给workerPool.schedule(workRequest);
处理,然后IOWorkRequest.doWork()方法处理,交给NIOServerCnxn.doIO()
处理。详细代码如下:
public class NIOServerCnxnFactory extends ServerCnxnFactory {
// NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据
ServerSocketChannel ss;
// NIO的多路复用选择器
final Selector selector = Selector.open();
// 保存某一IP和其IP下的所有NIO连接对象
final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
new HashMap<InetAddress, Set<NIOServerCnxn>>( );
// 同一个IP下默认的最大客户端连接数
int maxClientCnxns = 60;
private abstract class AbstractSelectThread extends ZooKeeperThread {
protected final Selector selector;
public AbstractSelectThread(String name) throws IOException {
super(name);
// Allows the JVM to shutdown even if this thread is still running.
setDaemon(true);
this.selector = Selector.open();
}
public void wakeupSelector() {
selector.wakeup();
}
...
}
// 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接
private class AcceptThread extends AbstractSelectThread {
private final ServerSocketChannel acceptSocket;
private final SelectionKey acceptKey;
private final RateLogger acceptErrorLogger = new RateLogger(LOG);
private final Collection<SelectorThread> selectorThreads;
private Iterator<SelectorThread> selectorIterator;
private volatile boolean reconfiguring = false;
public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
super("NIOServerCxnFactory.AcceptThread:" + addr);
this.acceptSocket = ss;
this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
selectorIterator = this.selectorThreads.iterator();
}
// 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
//调用select,将连接加入队列中
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
// 这将唤醒选择器线程,并告诉工作线程池将开始关闭.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}
public void setReconfiguring() {
reconfiguring = true;
}
// 在select()函数中,会调用java的nio库中的函数:
// selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。
private void select() {
try {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 未获取key即无读写事件发生,阻塞
if (!key.isValid()) {
continue;
}
// 获取到key,即有读写事件发生
if (key.isAcceptable()) {
// todo
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
// 如果无法从服务器上拔出新连接,请接受
// 排队,暂停接受,给我们自由时间
// 启动文件描述符,因此接受线程
// 不会在一个紧密的循环中旋转。
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
/**
* 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp
* 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false);
* 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。
* 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection
* 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。
*/
private boolean doAccept() {
// 阻塞
boolean accepted = false;
SocketChannel sc = null;
try {
//完成和客户端的三次握手,建立起tcp连接
sc = acceptSocket.accept();
//非阻塞
accepted = true;
if (limitTotalNumberOfCnxns()) {
throw new IOException("Too many connections max allowed is " + maxCnxns);
}
InetAddress ia = sc.socket().getInetAddress();
// 从ipMap中获取IP对应的连接对象,并判断是否超过了
// 当前IP最大连接数量
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
// 如果超过则抛异常提示已超过并关闭Socket连接
throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
}
LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
// 设置成非阻塞
sc.configureBlocking(false);
// Round-robin assign this connection to a selector thread
// 循环将此连接分配给选择器线程
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
//唤醒对应的selector线程
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException("Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
// 接受,maxClientCnxns,配置阻止
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
}
/**
* 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。
* 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。
*/
public class SelectorThread extends AbstractSelectThread {
private final int id;
// 接收队列,接收来自客户端的连接请求
private final Queue<SocketChannel> acceptedQueue;
private final Queue<SelectionKey> updateQueue;
public SelectorThread(int id) throws IOException {
super("NIOServerCxnFactory.SelectorThread-" + id);
this.id = id;
acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
updateQueue = new LinkedBlockingQueue<SelectionKey>();
}
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
public void run() {
try {
while (!stopped) {
try {
// todo
select();
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
closeSelector();
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}
private void select() {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
// 随机打乱已经获取到的selectedList集合,至于为什么要打乱
// 估计是为了一定程度上保证各个Client端的请求都能被随机处理
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
// 获取选择key
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
//如果key无效
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
//拥有key且有可读或者可写事件
if (key.isReadable() || key.isWritable()) {
// todo
handleIO(key);
} else {
LOG.warn("Unexpected ops in select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
/**
* 在handleIO中,会启动woker线程池中的一个worker来处理这个事件,
* 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。
*
* 计划与关联的连接上处理的I/O
* 给定的SelectionKey。如果未使用工作线程池,
* I/O直接由该线程运行
*/
private void handleIO(SelectionKey key) {
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its
// connection
//在处理其连接时停止选择此键
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
workerPool.schedule(workRequest);
}
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
// 将Socket 注册到Selector中生成SelectionKey
key = accepted.register(selector, SelectionKey.OP_READ);
// 生成对应的NIO连接对象
NIOServerCnxn cnxn = createConnection(accepted, key, this);
// 将连接对象和SelectionKey进行绑定
key.attach(cnxn);
// 这里面会保存IP和连接对象集合,一个IP对应着系列
// 的连接对象,因为一台机器可能有多个连接对象
addCnxn(cnxn);
} catch (IOException e) {
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}
}
/**
* IOWorkRequest是一个小的包装类,允许执行doIO()调用
* 使用WorkerService在连接上运行。
*/
private class IOWorkRequest extends WorkerService.WorkRequest {
private final SelectorThread selectorThread;
private final SelectionKey key;
private final NIOServerCnxn cnxn;
IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
this.selectorThread = selectorThread;
this.key = key;
this.cnxn = (NIOServerCnxn) key.attachment();
}
// 在IOWorkRequest.doWork()中会判断key的合法性,
// 然后调用NIOServerCnxn.doIO(key)来处理事件
public void doWork() throws InterruptedException {
//判断key的合法性
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
if (key.isReadable() || key.isWritable()) {
// todo
cnxn.doIO(key);
// Check if we shutdown or doIO() closed this connection
// 检查是否关闭或doIO()是否关闭了此连接
if (stopped) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
touchCnxn(cnxn);
}
// Mark this connection as once again ready for selection
//将此连接再次标记为可供选择
cnxn.enableSelectable();
// Push an update request on the queue to resume selecting
// on the current set of interest ops, which may have changed
// as a result of the I/O operations we just performed.
// 在队列上推送更新请求以继续选择
// 在当前感兴趣的操作集上,可能已更改
// 作为我们刚才执行的I/O操作的结果。
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
}
}
}
}
2.2 连接对象NIOServerCnxn
这个代表着Client端在Server端的连接对象,新连接在Server端的表现便是一个NIOServerCnxn对象。并且这个对象会和对应的SelectionKey、Socket进行绑定。这个类里面最重要的便是doIO()方法,在这个方法中会判断读写事件,并根据相应的值进行处理,在新建连接流程中,只会分析读事件。关键源码如下:
public class NIOServerCnxn extends ServerCnxn {
// 这三个对象便不用做过多介绍了
NIOServerCnxnFactory factory;
final SocketChannel sock;
private final SelectionKey sk;
// 用来读取请求长度的buffer对象
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
// 实际接受请求长度的buffer对象
ByteBuffer incomingBuffer = lenBuffer;
// 是否已经初始化,默认值为false
boolean initialized;
private final ZooKeeperServer zkServer;
// 本连接对应的sessionId,刚开始sessionId不会有,只有当ZK的Server端处理了
// ConnectRequest之后才会被赋值
long sessionId;
// 写操作使用的ByteBuffer集合
LinkedBlockingQueue<ByteBuffer> outgoingBuffers;
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
...
// 前面的赋值可以忽略,当创建本对象时将会默认开启读事件
sk.interestOps(SelectionKey.OP_READ);
}
void doIO(SelectionKey k) throws InterruptedException {
try {
// 进行操作前需要判断Socket是否被关闭
if (isSocketOpen() == false) {
return;
}
// 判断读事件
if (k.isReadable()) {
// 从Socket中先读取数据,注意的是incomingBuffer容量只有4字节
int rc = sock.read(incomingBuffer);
// 读取长度异常
if (rc < 0) {
throw new EndOfStreamException();
}
// 读取完毕开始进行处理
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
// 当这两个完全相等说明已经是下一次连接了,新建时无需分析
if (incomingBuffer == lenBuffer) {
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
isPayload = true;
}
if (isPayload) {
// 读取具体连接的地方
readPayload();
}
else {
return;
}
}
}
// 写事件类型
if (k.isWritable()) {
// 如果ByteBuffer集合不为空才进入,新建连接时如果响应没有一次性
// 发送完剩余的会被放在outgoingBuffers集合中依次发送出去
if (outgoingBuffers.size() > 0) {
// 给发送的ByteBuffer对象分配空间,大小为64 * 1024字节
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
// 这里执行的操作是把已经发送过的数据剔除掉
// 留下未发送的数据截取下来重新发送
if (directBuffer.remaining() < b.remaining()) {
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
int p = b.position();
// 将未发送的数据放入directBuffer中
directBuffer.put(b);
// 更新outgoingBuffers中的ByteBuffer对象属性,以便
// 后续使用
b.position(p);
// 如果directBuffer的空间都被占用光了,则直接停止从
// outgoingBuffers集合中获取
if (directBuffer.remaining() == 0) {
break;
}
}
directBuffer.flip();
// 发送directBuffer中的数据
int sent = sock.write(directBuffer);
ByteBuffer bb;
// 这部分的循环便是再次判断前面使用过的对象
// 看这些对象是否已经发送完,根据position信息判断如果发送完
// 则从outgoingBuffers集合中移除
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException();
}
// 获取ByteBuffer的剩余数据
int left = bb.remaining() - sent;
// 如果到此大于0,说明前面的数据已经填充满
// 直接退出循环
if (left > 0) {
bb.position(bb.position() + sent);
break;
}
// 执行到这里说明ByteBuffer对象已经发送完毕,可以更新
// 发送状态并从将其从outgoingBuffers中移除
packetSent();
sent -= bb.remaining();
outgoingBuffers.remove();
}
}
synchronized(this.factory){
if (outgoingBuffers.size() == 0) {
// 如果outgoingBuffers已经全部被消化完了便把
// OP_WRITE操作关闭
if (!initialized && (sk.interestOps()
& SelectionKey.OP_READ) == 0) {
throw new CloseRequestException();
}
sk.interestOps(sk.interestOps()
& (~SelectionKey.OP_WRITE));
} else {
// 如果还剩余一些没有发送完,则继续打开OP_WRITE操作
// 接着下次轮询发送
sk.interestOps(sk.interestOps()
| SelectionKey.OP_WRITE);
}
}
}
}
// 异常处理忽略
...
}
private void readPayload() throws IOException, InterruptedException {
// 前面已经判断过,这里一定不会成立
if (incomingBuffer.remaining() != 0) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException();
}
}
if (incomingBuffer.remaining() == 0) {
// 进行接收报文数量+1和更新Server端接收报文数量+1的操作
packetReceived();
incomingBuffer.flip();
// 第一次进来肯定是false
if (!initialized) {
// 因此这里肯定会进入调用处理ConnectRequest的方法中
readConnectRequest();
} else {
// 这里是处理其它Request的方法,此次暂不分析,后续分析ping和
// 其它操作时再来分析此方法中的流程
readRequest();
}
lenBuffer.clear();
// 处理完这次请求后再将incomingBuffer复原
incomingBuffer = lenBuffer;
}
}
private void readConnectRequest()
throws IOException, InterruptedException {
if (zkServer == null) {
throw new IOException("ZooKeeperServer not running");
}
// 调用ZooKeeperServer的方法处理连接请求
zkServer.processConnectRequest(this, incomingBuffer);
// 当前面执行完毕后说明已经初始化完成了
initialized = true;
}
}
2.3 单机运行的ZooKeeperServer
前面文章解释过,这个类就是ZK的Server实例,每个ZK服务器上对应着一个ZooKeeperServer实例,这里面有诸多服务器方面的属性配置,但前面分析过,因此本次流程代码便不做过多的介绍了,有兴趣的可以翻看前面的文章。
在Client端有ping心跳检测间隔时间,在Server端有tickTime存活检测时间,这两个属性代表的意思是不一样的,Client端的ping心跳检测间隔时间是轮询隔一段时间后向Server端发送ping请求,而Server端的tickTime间隔时间作用是每隔一段时间就判断在Server端的Client连接对象是否已经死亡,如果已经过期死亡则将连接对象进行清除关闭。所以ping心跳检测的意义是Client端告诉服务器我还活着,tickTime意义是定期清除没有告诉Server端还存活的连接。
public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
// 默认3S检测一次客户端存活情况
public static final int DEFAULT_TICK_TIME = 3000;
// 实际设置的检测存活时间间隔
protected int tickTime = DEFAULT_TICK_TIME;
// Server端可接受的最小Client端sessionTimeout,如果未设置则值为tickTime*2
protected int minSessionTimeout = -1;
// Server端可接受的最大Client端sessionTimeout,如果未设置则值为tickTime*20
protected int maxSessionTimeout = -1;
// 处理客户端请求RequestProcessor的第一个实现类对象
protected RequestProcessor firstProcessor;
public void processConnectRequest(ServerCnxn cnxn,
ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive
.getArchive(new ByteBufferInputStream(incomingBuffer));
// 反序列化ByteBuffer对象为ConnectRequest对象
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
boolean readOnly = false;
try {
// 是否只可读
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
...
}
// 只有ReadOnlyZooKeeperServer类型的Server只接收readOnly为true的
if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
...
throw new CloseRequestException(msg);
}
// 获取的zxid需要小于Server端最大的zxid
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
...
throw new CloseRequestException(msg);
}
// 这段代码便是Server和Client端协商具体的sessionTimeout值
// 1、获取客户端传来的sessionTimeout
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
// 2、先判断sessionTimeout是否小于Server端可接受的最小值
// 如果小于Server端可接受最小值则设置成Server端的最小sessionTimeout
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
// 3、再判断sessionTimeout是否大于Server端可接受的最大值
// 如果大于Server端可接受最大值则设置成Server端的最大sessionTimeout
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
// 最后把满足协商范围的sessionTimeout设置到Client连接对象中
cnxn.setSessionTimeout(sessionTimeout);
// 设置该连接对象不再从Client端接收数据
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
// 第一次连接不手动设置sessionId都是0
if (sessionId != 0) {
// 如果不是0则需要关闭原来的session并且重新打开sessionId
// 这种情况不常见,只需要知道处理的代码逻辑在这里便行,暂不详细分析
long clientSessionId = connReq.getSessionId();
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
// 开始创建新的session信息
createSession(cnxn, passwd, sessionTimeout);
}
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
// 根据失效时间创建一个新的session信息并返回唯一ID
long sessionId = sessionTracker.createSession(timeout);
// 设置失效时间和sessionId
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
// 调用该方法使用刚刚获取到的属性去生成Request请求
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
// 根据参数生成Request对象,并调用submitRequest()方法开始使用
// RequestProcessor链对Request进行处理
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
public void submitRequest(Request si) {
// 这个方法功能很简单:
// 1、判断Server端是否初始化完成,如果未完成则一直持续等待
// 2、在调用RequestProcessor链前先更新session在Server端的过期时间
// 3、调用firstProcessor对象的processRequest方法开始处理请求
if (firstProcessor == null) {
synchronized (this) {
try {
// 一直轮询直到Server端的各种组件初始化完成
while (state == State.INITIAL) {
wait(1000);
}
} ...
// 如果未初始化成功则抛出异常
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
// todo 将请求放到requestThrottler的submittedRequests队列中
requestThrottler.submitRequest(si);
}
public void submitRequest(Request request) {
if (stopping) {
LOG.debug("Shutdown in progress. Request cannot be processed");
dropRequest(request);
} else {
request.requestThrottleQueueTime = Time.currentElapsedTime();
// todo
submittedRequests.add(request);
}
}
将请求放到requestThrottler的submittedRequests队列中,然后在requestThrottler 的run()方法中调用zks.submitRequestNow(request);
public void submitRequestNow(Request si) {
..
try {
// 更新session的过期时间
touch(si.cnxn);
// 校验请求类型是否有效
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
// todo 开始调用firstProcessor对象的processRequest()方法处理请求
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
// 如果处理类型校验不通过则发送无法处理请求并关闭连接
new UnimplementedRequestProcessor().processRequest(si);
}
}
...
}
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
// 获取sessionId和sessionTimeout属性调用sessionTracker去更新session
// 在Server端的过期时间
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException();
}
}
}
2.4 session追踪类SessionTracker
取名为SessionTracker,实际上这个类的功能就是维护session生命周期
,主要进行session过期判断和更新session状态的操作,判断session过期还是放到后面分析ping流程再看吧,新建连接时就看其如何更新session状态。
public class SessionTrackerImpl extends ZooKeeperCriticalThread
implements SessionTracker {
// 保存sessionId和对应的Session对象
HashMap<Long, SessionImpl> sessionsById;
HashMap<Long, SessionSet> sessionSets;
// key为sessionId,value为这个session的过期时间
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
// 下一次新建session时的id
long nextSessionId = 0;
public long createSession(int sessionTimeout) {
long sessionId = nextSessionId.getAndIncrement();
// 在使用RequestProcessor处理请求前会调用该方法为客户端创建一个session
trackSession(sessionId, sessionTimeout);
return sessionId;
}
@Override
public synchronized boolean trackSession(long id, int sessionTimeout) {
boolean added = false;
// 如果没有保存对应的Session对象则创建一个并添加
SessionImpl session = sessionsById.get(id);
if (session == null) {
session = new SessionImpl(id, sessionTimeout);
}
// findbugs2.0.3 complains about get after put.
// long term strategy would be use computeIfAbsent after JDK 1.8
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null) {
session = existedSession;
} else {
added = true;
LOG.debug("Adding session 0x{}", Long.toHexString(id));
}
if (LOG.isTraceEnabled()) {
String actionStr = added ? "Adding" : "Existing";
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- " + actionStr
+ " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
}
// 添加完session后更新session的过期时间
updateSessionExpiry(session, sessionTimeout);
return added;
private void updateSessionExpiry(SessionImpl s, int timeout) {
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
}
// 当client与server有交互时(连接请求/读写操作/心跳),该方法就会被调用
// 当zk server启动时会将磁盘中的session恢复到内存,也会调用该方法
// 该方法在做的是会话换桶操作
public Long update(E elem, int timeout) {
// elemMap集合的key为session,value为该session的过期时间,
// 即该session当前所在的会话桶id
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
// 计算本次交互应该将会话放入到哪个会话桶
Long newExpiryTime = roundToNextInterval(now + timeout);
// 若之前所在会话桶id与本次交互计算的会话桶id相同,
// 则无需换桶,即什么也不用做
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
// ---------- 代码能走到这里,说明需要换桶了。 --------------
// 换桶由两步操作完成:将会话放入到新桶;将会话从老桶中清除
// First add the elem to the new expiry time bucket in expiryMap.
// 从会话桶集合中获取当前的会话桶,若为null,则创建一个新的会话桶
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
// 创建会话桶set
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
// 将新建的会话桶放入到会话桶集合
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
// 将会话放入到会话桶
set.add(elem);
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
// 将会话与会话桶id的对应关系放入到elemMap,并获取到该会话之前所在的会话桶id
prevExpiryTime = elemMap.put(elem, newExpiryTime);
// 若当前会话桶id与之前会话桶id不相同,说明需要换桶。
// 而前面已经将会话放到了新的会话桶,所以这里要将会话从老桶中清除
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
// 获取到之前的会话桶
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
// 将会话从老会话桶中清除
prevSet.remove(elem);
}
}
// 返回当前交互引发的会话所在的会话桶id,
// 即当前会话的真正过期时间点
return newExpiryTime;
}
}
2.5 RequestProcessor请求处理链
前面介绍过,在单机运行时RequestProcessor处理链只有三个:PrepRequestProcessor、SyncRequestProcessor和FinalRequestProcessor,其中前两个是线程对象,最后一个是普通的对象,至于原因前面的文章介绍过。接下来的三个RequestProcessor大致作用不做分析,有兴趣可以看下以前的文章。
2.5.1 PrepRequestProcessor
public class PrepRequestProcessor extends ZooKeeperCriticalThread
implements RequestProcessor {
// 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理
LinkedBlockingQueue<Request> submittedRequests =
new LinkedBlockingQueue<Request>();
// 本RequestProcessor的下一个RequestProcessor对象
RequestProcessor nextProcessor;
ZooKeeperServer zks;
@Override
public void processRequest(Request request) {
// RequestProcessor的实现方法,由于内部使用轮询方式从submittedRequests
// 集合获取数据,因此在这里直接把Request添加到集合中即可
submittedRequests.add(request);
}
@Override
public void run() {
try {
while (true) {
// 轮询从submittedRequests集合中获取Request对象
Request request = submittedRequests.take();
// 如果requestOfDeath代表ZK已经关闭,因此退出循环
if (Request.requestOfDeath == request) {
break;
}
// 开始处理正常的Request
pRequest(request);
}
}...
}
protected void pRequest(Request request)
throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
// todo
pRequestHelper(request);
}
request.zxid = zks.getZxid();
// 调用下个RequestProcessor来处理Request
nextProcessor.processRequest(request);
}
private void pRequestHelper(Request request) throws RequestProcessorException {
try {
switch (request.type) {
...
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
// 直接处理事务
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
...
}
}
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
// 为请求创建事务头TxnHeader对象
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
}
switch (type) {
...
// 创建session
case OpCode.createSession:
request.request.rewind();
// 此时的to实际上就是sessionTimeout
int to = request.request.getInt();
// 使用sessionTimeout创建CreateSessionTxn对象
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
// only add the global session tracker but not to ZKDb
// 根据sessionid和sessionTimeout再次新增session信息
zks.sessionTracker.trackSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
...
}
}
protected void pRequest2Txn(int type, long zxid, Request request,
Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException{
// 为请求创建事务头TxnHeader对象
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
zks.getTime(), type);
switch (type) {
// 无关的case情况忽略
...
case OpCode.createSession:
request.request.rewind();
// 此时的to实际上就是sessionTimeout
int to = request.request.getInt();
// 使用sessionTimeout创建CreateSessionTxn对象
request.txn = new CreateSessionTxn(to);
request.request.rewind();
// 根据sessionid和sessionTimeout再次新增session信息
zks.sessionTracker.addSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
...
}
}
2.5.2 SyncRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread
implements RequestProcessor {
// 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
// 保存的是已经被写入磁盘但是待刷新的事务
private final LinkedList<Request> toFlush = new LinkedList<Request>();
// 本RequestProcessor的下一个RequestProcessor对象
private final RequestProcessor nextProcessor;
// Server端快照的数量
private static int snapCount = ZooKeeperServer.getSnapCount();
// 在回滚前的log数量,随机生成的
private static int randRoll;
public void processRequest(Request request) {
// 类似于PrepRequestProcessor,内部使用轮询方式从submittedRequests
// 集合获取数据,因此在这里直接把Request添加到集合中即可
queuedRequests.add(request);
}
@Override
public void run() {
try {
int logCount = 0;
// 避免服务都在同一时间获取快照snapshot,这里面设置的是randRoll属性
setRandRoll(r.nextInt(snapCount/2));
while (true) {
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
// 从queuedRequests获取Request
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
// 如果已经结束则退出循环
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 将Request写入到log中
if (zks.getZKDatabase().append(si)) {
logCount++;
// 如果日志的数量大于某个临界点,则生成一次快照
if (logCount > (snapCount / 2 + randRoll)) {
// 途中会异步生成快照,过程忽略,操作完之后
// logCount 归零
...
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// 如果所有的事务都处理完则使用nextProcessor
// 开始进行下一步处理
if (nextProcessor != null) {
// 进行处理
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
// 如果前面两个条件都不满足,则把Request添加到待刷新的
// 事务集合中
toFlush.add(si);
if (toFlush.size() > 1000) {
// 当待刷事务到达了1000个,则把集合中的所有事务全都
// 刷掉并使用nextProcessor依次进行处理
flush(toFlush);
}
}
}
} ...
}
}
2.5.2 FinalRequestProcessor
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
public void processRequest(Request request) {
// 直接开始处理Request请求
ProcessTxnResult rc = null;
if (!request.isThrottled()) {
rc = applyRequest(request);
}
// 如果执行到这里连接对象还为空则直接退出
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
// 执行中的数量减一
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
int responseSize = 0;
try {
// 如果发生了异常则直接抛出
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
AuditHelper.addAuditLog(request, rc, true);
// 如果是单个的操作发生了异常抛出
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
}
...
// 开始根据Request的操作类型进行相应的处理
switch (request.type) {
...
case OpCode.createSession: {
// 最后的操作类型
lastOp = "SESS";
// 更新状态
updateStats(request, lastOp, lastZxid);
// 最后调用这个方法来完成session的初始化以及响应
zks.finishSessionInit(request.cnxn, true);
// 直接退出方法
return;
}
}
}
}
}
2.6 ZooKeeperServer新建连接生成响应对象
又再次回到了ZooKeeperServer类中,这里面执行了Server端针对新建连接的最后响应,其实我也搞不懂为什么要把新建连接单独的抽出来放到ZooKeeperServer类中来,或许唯一能解释的便是方便处理已存在session重新创建这个流程。
public class ZooKeeperServer implements
SessionExpirer, ServerStats.Provider {
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
// 使用JMX监控注册连接对象cnxn
try {
// valid指的是是否成功创建session信息
if (valid) {
serverCnxnFactory.registerConnection(cnxn);
}
}...
try {
// 如果valid为true,则使用cnxn连接对象的sessionTimemout,否则为0
// 如果valid为true,则使用cnxn连接对象的ssessionId,否则为0
// 如果valid为true,则使用cnxn连接对象的ssessionId生成密码,否则空
ConnectResponse rsp = new ConnectResponse(0,
valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0,
valid ? generatePasswd(cnxn.getSessionId())
: new byte[16]);
// 生成响应的字节对象
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
// 根据刚刚生成的字节数组申城ByteBuffer
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
// 发送ByteBuffer对象内容
cnxn.sendBuffer(bb);
// 如果valid失效则关掉连接
if (!valid) {
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
} else {
// 如果成功则确保能读取到Client端发送过来的数据
cnxn.enableRecv();
}
} catch (Exception e) {
cnxn.close();
}
}
}
2.7 NIOServerCnxn发送新建连接响应
执行到这一步已经到了新建连接的尾声了,这一步只有发送ByteBuffer对象的数据,其它的操作相对而言并不是很重要。
public class NIOServerCnxn extends ServerCnxn {
public void sendBuffer(ByteBuffer bb) {
try {
// 只有非关闭连接的操作才能使用Socket发送数据
if (bb != ServerCnxnFactory.closeConn) {
// 确保SelectionKey的OP_WRITE没有被开启,以确保等下wake唤醒
// Selector可以进行重试
if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
try {
// 发送缓存数据
sock.write(bb);
} catch (IOException e) {
}
}
if (bb.remaining() == 0) {
// 如果缓存数据发送完毕则更新ZK的Server状态
packetSent();
return;
}
}
// 如果跑到这里说明ByteBuffer并未全部发送,因此需要唤醒Selector
// 把剩余的ByteBuffer数据发送出去
synchronized(this.factory){
sk.selector().wakeup();
// 添加到outgoingBuffers集合中交给doIO()方法里面的write方法
// 类型处理,该逻辑在前面已经分析过了,可以直接回头看
outgoingBuffers.add(bb);
if (sk.isValid()) {
// 将OP_WRITE打开
sk.interestOps(
sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}
}
// hdr和txn都是和连接相关的对象,里面的方法执行的操作为添加
// session信息,到这里已经是新建连接的第三次调用新增session信息
// 当然这里面还会调用DataTree.processTxn()方法,只是不会执行
// 很重要的逻辑代码
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
//
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
// 新建连接流程outstandingChanges是空的,因此这里的循环逻辑暂不分析
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
}
3. Client端接收响应
当第二步走完后便进入到了第三步Client接收Server端响应并调用监听器的步骤了。
3.1 SendThread接收通知
前面已经说了,SendThread负责发送和接收包数据,当Server端发送了新建连接响应后该类就会接收并进行相应的处理。本次分析只会分析经过的逻辑部分,其它的逻辑不做分析。
class SendThread extends ZooKeeperThread {
@Override
public void run() {
...
while (state.isAlive()) {
try {
...
// 还是老地方,调用doTransport()方法处理NIO的事件
clientCnxnSocket.doTransport(to, pendingQueue,
outgoingQueue, ClientCnxn.this);
}
}
...
}
}
3.2 ClientCnxnSocketNIO处理读事件
这次进入到该类处理的便是OP_READ类型的NIO事件。
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue,
LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
// 老逻辑,不再分析
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() &
(SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 针对客户端的响应均会进入到该方法中
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
// 后面略
...
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 开始处理读事件
if (sockKey.isReadable()) {
// 从Socket中读取数据
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException();
}
// incomingBuffer已经读取完毕
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
// 新建连接将会跑到这里来,因为此时Client端的initialized
// 还是为false,尚未初始化完成
// 开始读取连接响应结果
readConnectResult();
// 开启Socket的OP_READ操作
enableRead();
// 查看outgoingQueue队列是否有可读包数据
if (findSendablePacket(outgoingQueue, cnxn.sendThread
.clientTunneledAuthenticationInProgress())!=null){
// 如果有的话则开启OP_WRITE操作,准备下次轮询时处理
// 写事件
enableWrite();
}
// 设置initialized属性初始化完成并更新lastHeard属性
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 这里是当新建连接成功后普通的操作响应处理逻辑
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// 后面的处理写事件忽略
}
void readConnectResult() throws IOException {
// 使用读取到的ByteBuffer对象反序列化得到ConnectResponse响应
ByteBufferInputStream bbis =
new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
boolean isRO = false;
try {
// 读取readOnly属性
isRO = bbia.readBool("readOnly");
}...
// 开始进行连接成功的操作
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
}
3.3 ClientCnxn处理连接成功
执行到这里基本上就已经算成功了,接下来的事情便是触发ZK的监听器。
public class ClientCnxn {
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
// _negotiatedSessionTimeout便是Client端和Server端互相协商获得的
// sessionTimeout过期时间
negotiatedSessionTimeout = _negotiatedSessionTimeout;
// 时间小于等于0说明连接失败了
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
// 发送ZK过期事件
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
// 并且发送停止服务事件
eventThread.queueEventOfDeath();
throw new SessionExpiredException(warnInfo);
}
// 接下来便是设值了,具体的值在这里都可以看到
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
// 根据Server端传来的属性设值状态
state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
// 确定等下要发送的事件类型
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
// 使用EventThread线程对象发布监听事件
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
}
3.4 EventThread监听事件
前面说过SendThread负责和ZK的Server端进行交互,完成发送数据包和接收响应的任务
,而EventThread则是根据SendThread接收到响应类型产生的事件类型进行轮询处理
。也就是说SendThread负责和Server端对接,EventThread则是负责和SendThread对接,处理Client自己产生的ZK事件。
class EventThread extends ZooKeeperThread {
// 将要处理的ZK事件集合
private final LinkedBlockingQueue<Object> waitingEvents;
// 客户端的Watcher管理类
private final ClientWatchManager watcher;
public void queueEvent(WatchedEvent event) {
// SendThread就是调用这个方法将对应的ZK事件传入进来开始ZK事件的生命周期
// 如果session状态和当前一样且事件类型没有则直接退出,无需处理
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// 使用传入的ZK事件和ClientWatchManager生成事件和监听器的绑定对象
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// 将事件和监听器的绑定对象添加到waitingEvents集合中,这个集合类型只
// 会是WatcherSetEventPair或者Packet
waitingEvents.add(pair);
}
@Override
public void run() {
try {
isRunning = true;
while (true) {
// 轮询waitingEvents集合,取出其中的事件对象
Object event = waitingEvents.take();
// eventOfDeath为关闭事件
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 不是关闭事件则开始处理事件
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
// 如果是关闭事件则会等waitingEvents全部处理之后再把
// EventThread设置为停止运行且退出循环
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
}// 异常处理忽略
...
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// 如果是正常的WatcherSetEventPair类型则直接取出里面所有的
// 监听器传入绑定的事件依次执行,这个步骤便是对应我们自己开发
// 的Watcher回调
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
}...
}
}// 后面是针对Packet事件类型进行的处理,回调类型是异步回调
...
}// 异常处理忽略
...
}
}
执行到这里新建连接的流程已经执行完毕了,接下来看下ClientWatchManager是如何将ZK的事件和Watcher进行绑定的。
3.5 ClientWatchManager监听器管理类
这个类会管理四种逻辑类型的监听器,至于具体的类型可以看以前的文章。接下来简单的看下其materialize方法的实现。
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath) {
// 将要返回的监听器集合
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
// 新建连接相关的事件类型都是None,不管是连接成功还是连接失败超时
// 将默认监听器defaultWatcher添加到result中,这也就是为什么在
// 新建ZooKeeper连接时传入Watcher新建连接相关的事件这个都会收到
result.add(defaultWatcher);
// 判断是否使用完之后删除,需要开关打开且ZK状态不是SyncConnected
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;
// 将dataWatches中的监听器添加到result集合中
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
// 如果需要删除则把缓存的全删了
dataWatches.clear();
}
}
// 后面的其它两种都是同样的操作,略过
return result;
case NodeDataChanged:
case NodeCreated:
// 节点变更类型事件,只有dataWatches和existWatches会参与
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
// 子节点变更事件,只有childWatches参与
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
// 节点被删除三种类型都会受到影响,操作方式和前面类似直接略过
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
...
break;
default:
throw new RuntimeException(msg);
}
return result;
}
}
不得不说这是一个庞大的工程量,阅读完ZK的源码后对平时使用以及某些配置都有更加深刻的理解了,只是对于ZK的ByteBuffer空间大小的4字节分配还有些犯迷糊。后续再补回来。
能耐心看到这里的想必也是决定了把ZK琢磨透的秀儿吧。