13. 源码| 会话管理源码分析(分桶策略)
1. 客户端连接源码分析
ZKClient 客户端,Curator 客户端,详情见:Zookeeper安装和客户端使用
先下结论:
Client 要创建一个连接,其首先会在
本地创建一个 ZooKeeper 对象
,用于表示其所连接上的 Server。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中
。连接关闭后,这个代表 Server 的 zk 对象会被删除。
我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)
1.1 ZKClient源码分析:
下面是ZKClient使用的DEMO:
public class ZKClientTest {
// 指定 zk 集群
private static final String CLUSTER = "zkOS:2181";
// 指定节点名称
private static final String PATH = "/mylog";
public static void main(String[] args) {
// ---------------- 创建会话 -----------
// 创建 zkClient
ZkClient zkClient = new ZkClient(CLUSTER);
// 为 zkClient 指定序列化器
zkClient.setZkSerializer(new SerializableSerializer());
// ---------------- 创建节点 -----------
// 指定创建持久节点
CreateMode mode = CreateMode.PERSISTENT;
// 指定节点数据内容
String data = "first log";
// 创建节点
String nodeName = zkClient.create(PATH, data, mode);
...
追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:
public class ZkClient implements Watcher {
...
public ZkClient(String serverstring) {
this(serverstring, Integer.MAX_VALUE);
}
public ZkClient(String zkServers, int connectionTimeout) {
//关键点 看到创建了ZkConnection对象
this(new ZkConnection(zkServers), connectionTimeout);
}
...
//构造一直走,会走到下面该方法
public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
//将创建的ZkConnection,赋值到_connection 成员变量上
_connection = zkConnection;
_zkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isZkSaslEnabled = isZkSaslEnabled();
connect(connectionTimeout, this);
}
public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
boolean started = false;
acquireEventLock();
try {
setShutdownTrigger(false);
_eventThread = new ZkEventThread(_connection.getServers());
_eventThread.start();
//调用ZkConnection.connect进行连接
_connection.connect(watcher);
LOG.debug("Awaiting connection to Zookeeper server");
boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
if (!waitSuccessful) {
throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
}
started = true;
} finally {
getEventLock().unlock();
// we should close the zookeeper instance, otherwise it would keep
// on trying to connect
if (!started) {
close();
}
}
}
}
通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection
public class ZkConnection implements IZkConnection {
...
//关键对象ZooKeeper
private ZooKeeper _zk = null;
...
public ZkConnection(String zkServers, int sessionTimeOut) {
_servers = zkServers;
_sessionTimeOut = sessionTimeOut;
}
@Override
public void connect(Watcher watcher) {
_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
//!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象
_zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + _servers, e);
}
} finally {
_zookeeperLock.unlock();
}
}
}
1.2 Curator 源码分析:
下面是Curator使用的DEMO:
public class FluentTest {
public static void main(String[] args) throws Exception {
// ---------------- 创建会话 -----------
// 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("zkOS:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(13000)
.retryPolicy(retryPolicy)
//namespace:根路径,所有操作都是基于该路径之上
.namespace("logs")
.build();
// 开启客户端
client.start();
...
追踪Curator源码,看下是如何连接的,从client.start()开始:
public class CuratorFrameworkImpl implements CuratorFramework{
...
@Override
public void start(){
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){
throw new IllegalStateException("Cannot be started more than once");
}
try{
...
this.getConnectionStateListenable().addListener(listener);
client.start();
...
}catch ( Exception e ){
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
}
关注client.start();这个方法:
public class CuratorZookeeperClient implements Closeable{
...
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
state.start();
}
...
}
继续追踪state.start();
class ConnectionState implements Watcher, Closeable{
...
void start() throws Exception{
log.debug("Starting");
ensembleProvider.start();
reset();
}
synchronized void reset() throws Exception{
log.debug("reset");
instanceIndex.incrementAndGet();
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
handleHolder.closeAndReset();
handleHolder.getZooKeeper(); // initiate connection
}
...
}
关键点看handleHolder.getZooKeeper()方法:
class HandleHolder{
...
ZooKeeper getZooKeeper() throws Exception{
return (helper != null) ? helper.getZooKeeper() : null;
}
...
}
class Helper{
private final Data data;
...
ZooKeeper getZooKeeper() throws Exception{
return data.zooKeeperHandle;
}
...
}
直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset
,看handleHolder.closeAndReset()
方法:
class HandleHolder{
...
void closeAndReset() throws Exception{
internalClose(0);
Helper.Data data = new Helper.Data();
helper = new Helper(data){
@Override
ZooKeeper getZooKeeper() throws Exception{
synchronized(this){
if ( data.zooKeeperHandle == null ){
resetConnectionString(ensembleProvider.getConnectionString());
data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper(data);
return super.getZooKeeper();
}
}
};
}
...
}
我们看下data.zooKeeperHandle到底是怎么创建的:
public class NonAdminZookeeperFactory implements ZookeeperFactory{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象
,接下来我们分析ZK源码中的ZooKeeper对象
1.3 ZK源码中客户端对象ZooKeeper
我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):
// 跟构造
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly) throws IOException {
// createDefaultHostProvider() 解析给出的server的址址,并对解析结果进行第一次shuffle
this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
}
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null);
}
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider hostProvider,
ZKClientConfig clientConfig
) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
this.hostProvider = hostProvider;
// 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
// 创建一个对server的连接
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this.clientConfig,
watcher,
getClientCnxnSocket(),
canBeReadOnly);
// 开始连接
cnxn.start();
}
ConnectStringParser connectStringParser = new ConnectStringParser(connectString)
创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
public ConnectStringParser(String connectString) {
// parse out chroot, if any
// 解析chroot,如果有的话(其他的一种写法,不用关注)
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
//逗号分割
String hostsList[] = connectString.split(",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
//解析出主机和端口后生成地址并添加到serverAddresses
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
createDefaultHostProvider(connectString)
创建主机提供者,把将缓存集合中的地址打散
// default hostprovider
private static HostProvider createDefaultHostProvider(String connectString) {
// ConnectStringParser() 用于解析指定的server地址列表字符串
// getServerAddresses() 获取到所有解析出来的地址集合
return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
// init() 的第三个参数是创建了一个地址处理器
// init()中进行了第一次地址的shuffle
init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() {
// 根据指定的主机名,获取到所有其对应的ip地址
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
});
}
private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
this.sourceOfRandomness = new Random(randomnessSeed);
this.resolver = resolver;
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException("A HostProvider may not be empty!");
}
// 对地址的第一次打散(shuffle)
this.serverAddresses = shuffle(serverAddresses);
currentIndex = -1;
lastIndex = -1;
}
打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个
cnxn = new ClientCnxn(…);
创建一个连接实例 cnxn.start();
启动连接
public void start() {
// 启动连接线程
sendThread.start();
// 启动事件线程
eventThread.start();
}
查看启动连接线程sendThread的run方法
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// 只要连接没有关闭,也没有验证失败,就一直循环
while (state.isAlive()) {
try {
// 处理没有连接上的情况
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// 获取要连接的server的址
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
// 开始连接
startConnect(serverAddress);
// 更新交互(连接请求/读写请求)时间戳
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
// 获取已经有多久没有收到交互响应了
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
// 获取已经有多久没有收到连接请求的响应了
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) { // 处理会话超时的情况
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
// 抛出会话超时异常
throw new SessionTimeoutException(warnInfo);
}
...
}
}
while (state.isAlive())
判断当前连接对象是否处于激活状态
// 一共有这么多状态
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
/**
* Returns whether we are connected to a server (which
* could possibly be read-only, if this client is allowed
* to go to read-only mode)
* */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
serverAddress = hostProvider.next(1000);
获取要连接的zkServer的地址
// 轮询获取要连接的server的地址
// 如果所有主机都尝试过一次,则等待spinDelay毫秒。
public InetSocketAddress next(long spinDelay) {
boolean needToSleep = false;
InetSocketAddress addr;
synchronized (this) {
// 动态扩缩容reconfigMode
if (reconfigMode) {
// 获取一个server地址
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
// 对获取到的地址进行再次处理:
// 获取到hostname对应的所有ip,进行第二次shuffle,
// 并返回shuflle过后,第一个ip构成的地址
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
}
// reconfigMode为false的情况,即没有扩容,或扩容已经完成,新的配置已经成功加载,
// 即变为了一个普通的zk集合了
// 轮询
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
addr = serverAddresses.get(currentIndex);
// currentIndex == lastIndex 的情况只有一种:zk是单机而不是集群
needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
}
// 是否需要等待
if (needToSleep) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
return resolve(addr);
}
private InetSocketAddress nextHostInReconfigMode() {
boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew);
// take one of the new servers if it is possible (there are still such
// servers we didn't try),
// and either the probability tells us to connect to one of the new
// servers or if we already
// tried all the old servers
// oldServers是存放原来server的集合,
// newServers是存放扩容server的集合。
// 尝试过程与原理:
// 1)若当前已经将所有老集合中的server全部尝试了一遍,都没有连接成功。此时若添加进了扩容server的新集合,
// 那么就从新集合中进行逐个尝试。
// 2)若当前老集合中的server还没有全部尝试完毕,而此时又扩容了新集合,也会先继续从老集合中进行尝试。
// 3)若新集合中的也都尝试了一遍,还没有连接成功,则会再从老集合中进行逐个尝试
if (((currentIndexNew + 1) < newServers.size()) && (takeNew || (currentIndexOld + 1) >= oldServers.size())) {
++currentIndexNew;
return newServers.get(currentIndexNew);
}
// start taking old servers
if ((currentIndexOld + 1) < oldServers.size()) {
++currentIndexOld;
return oldServers.get(currentIndexOld);
}
return null;
}
// 对地址解析
private InetSocketAddress resolve(InetSocketAddress address) {
try {
String curHostString = address.getHostString();
// 根据hostname,获取其对应的所有ip地址
List<InetAddress> resolvedAddresses =
new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
if (resolvedAddresses.isEmpty()) {
return address;
}
// 对所有ip地址进行shuffle,这是第二次shuffle
Collections.shuffle(resolvedAddresses);
// 获取shuffle过后的第一个ip地址
return new InetSocketAddress(resolvedAddresses.get(0), address.getPort());
} catch (UnknownHostException e) {
LOG.error("Unable to resolve address: {}", address.toString(), e);
return address;
}
}
startConnect(serverAddress);
开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接):
private void startConnect(InetSocketAddress addr) throws IOException {
...
// 修改server状态
changeZkState(States.CONNECTING);
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
// 设置连接名称
setName(getName().replaceAll("\(.*\)", "(" + hostPort + ")"));
// 判断是否开启了SASL的客户端验证机制(C/S模式的验证机制)
if (clientConfig.isSaslClientEnabled()) {
...
}
logStartConnect(addr);
// 连接
clientCnxnSocket.connect(addr);
}
@Override
void connect(InetSocketAddress addr) throws IOException {
// 创建一个NIO的channel
SocketChannel sock = createSock();
try {
// 将channel注册到selector
registerAndConnect(sock, addr);
} catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
// 将channel注册到selector,并指定其关注事件为客户端连接发起成功
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
// 连接指定地址
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
/**
* Connects this channel's socket.
*
* <p> If this channel is in non-blocking mode then an invocation of this
* method initiates a non-blocking connection operation. If the connection
* is established immediately, as can happen with a local connection, then
* this method returns <tt>true</tt>. Otherwise this method returns
* <tt>false</tt> and the connection operation must later be completed by
* invoking the {@link #finishConnect finishConnect} method.
*
*/
// 如果该通道处于非阻塞模式,则调用方法启动非阻塞连接操作。如果连接是本地连接,立即建立此方法返回true。否则,该方法返回false,并且连接操作必须稍后由调用finishConnect方法来完成连接。
public abstract boolean connect(SocketAddress remote) throws IOException;
1.4 Zk对象创建流程图
2. 服务端连接源码分析
2.1 ZooKeeper会话理论知识
会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。
2.1.1 会话状态
常见的会话状态有三种:
CONNECTING
:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。CONNECTED
:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。CLOSED
:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。
2.1.2 会话连接超时管理—客户端维护
我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。
ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。
2.1.3 会话连接事件
客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:
CONNECTION_LOSS
:连接丢失SESSION_MOVED
:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。SESSION_EXPIRED
:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。
2.1.4 会话空闲超时管理—服务端维护
会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的
服务器为每一个客户端的会话都记录着上一次交互后空闲的时长
,及从上一次交互结束开始会话空闲超时的时间点
。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略
。
分桶策略
分桶策略是指,将空闲超时时间相近的会话
放到同一个桶中来进行管理
,以减少管理的复杂度
。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。
zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。
分桶依据
分桶的计算依据为:
CurrentTime
:当前时间(这是时间轴上的时间)SessionTimeout
:会话超时时间(这是一个时间范围)ExpirationTime
:当前会话下一次超时的时间点(这是时间轴上的时间)ExpirationInterval
:桶的大小(这是一个时间范围)BucketTime
:代表的是当前会话下次超时的时间点所在的桶
从以上公式可知,一个桶的大小为 ExpirationInterval 时间
。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。
2.2 服务端连接源码分析
找到ZooKeeperServer.startup
方法,一但Server启动就会触发该方法
// zk启动时会在服务端创建一个单例的zooKeeperServer实例,
// 并调用该方法
public synchronized void startup() {
startupWithServerState(State.RUNNING);
}
private void startupWithServerState(State state) {
// 若sessionTracker为null,则创建一个
if (sessionTracker == null) {
createSessionTracker();
}
// 开启sessionTracker线程,即调用该线程的run()
startSessionTracker();
setupRequestProcessors();
// 启动请求流控器线程,即调用该线程的run()
startRequestThrottler();
...
}
createSessionTracker();
创建一个sessionTracker(Session跟踪器)线程:
protected void createSessionTracker() {
// 从磁盘中获取具有timeout的所有session会话
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());
}
//SessionTrackerImpl调用的构造
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime, long serverId, ZooKeeperServerListener listener) {
super("SessionTracker", listener);
this.expirer = expirer;
// 创建并初始化会话过期队列
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
this.nextSessionId.set(initializeNextSessionId(serverId));
//从内存中取出之前保存的会话数据,重新加载到sessionTracker(应该是选举后重启之类的场景)
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
// sessionsWithTimeout是一个map,key为sessionId,value为该会话对应的timeout
trackSession(e.getKey(), e.getValue());
}
EphemeralType.validateServerId(serverId);
}
public ExpiryQueue(int expirationInterval) {
// 会话桶大小
this.expirationInterval = expirationInterval;
// roundToNextInterval(Time.currentElapsedTime()) 计算当前时间所在的会话桶,
// 每个会话桶都有一个标识id,即其所包含的时间范围的最大边界时间点
// nextExpirationTime 用于记录下次要进行过期会话清理的时间点
nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}
// 计算指定时间所在的会话桶
private long roundToNextInterval(long time) {
// 利用整型除整型结果仍为整型来计算会话桶的
return (time / expirationInterval + 1) * expirationInterval;
}
@Override
public synchronized boolean trackSession(long id, int sessionTimeout) {
boolean added = false;
// 从缓存map中获取当前指定id的session,若为null,则创建一个
SessionImpl session = sessionsById.get(id);
if (session == null) {
session = new SessionImpl(id, sessionTimeout);
}
...
// 更新session的过期时间(放入相应的会话桶)
updateSessionExpiry(session, sessionTimeout);
return added;
}
private void updateSessionExpiry(SessionImpl s, int timeout) {
logTraceTouchSession(s.sessionId, timeout, "");
// 更新过期时间
sessionExpiryQueue.update(s, timeout);
}
// 当client与server有交互时(连接请求/读写操作/心跳),该方法就会被调用
// 当zk server启动时会将磁盘中的session恢复到内存,也会调用该方法
// 该方法在做的是会话换桶操作
public Long update(E elem, int timeout) {
// elemMap集合的key为session,value为该session的过期时间,
// 即该session当前所在的会话桶id
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
// 计算本次交互应该将会话放入到哪个会话桶
Long newExpiryTime = roundToNextInterval(now + timeout);
// 若之前所在会话桶id与本次交互计算的会话桶id相同,
// 则无需换桶,即什么也不用做
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
// ---------- 代码能走到这里,说明需要换桶了。 --------------
// 换桶由两步操作完成:将会话放入到新桶;将会话从老桶中清除
// First add the elem to the new expiry time bucket in expiryMap.
// expiryMap是会话桶集合,其value为会话桶(Set集合),key为该会话桶id(其表示的时间范围的最大边界值)
// 从会话桶集合中获取当前的会话桶,若为null,则创建一个新的会话桶
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
// 创建会话桶set
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
// 将新建的会话桶放入到会话桶集合
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
// 将会话放入到会话桶
set.add(elem);
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
// 将会话与会话桶id的对应关系放入到elemMap,并获取到该会话之前所在的会话桶id
prevExpiryTime = elemMap.put(elem, newExpiryTime);
// 若当前会话桶id与之前会话桶id不相同,说明需要换桶。
// 而前面已经将会话放到了新的会话桶,所以这里要将会话从老桶中清除
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
// 获取到之前的会话桶
Set<E> prevSet = expiryMap.get(prevExpiryTime);
// 将会话从老会话桶中清除
if (prevSet != null) {
prevSet.remove(elem);
}
}
// 返回当前交互引发的会话所在的会话桶id,
// 即当前会话的真正过期时间点
return newExpiryTime;
}
上面updateSessionExpiry方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:
我们先来看一下touchSession方法:底层也是调用的updateSessionExpiry()
方法,所以调用touchSession方法也会更新会话桶
。
public synchronized boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
// 更新过期时间
updateSessionExpiry(s, timeout);
return true;
}
我们来看一下有哪些场景也在调用touchSession方法:
- 会话与当前Server交互时
private void startupWithServerState(State state) {
// 若sessionTracker为null,则创建一个
if (sessionTracker == null) {
createSessionTracker();
}
// 开启sessionTracker线程,即调用该线程的run()
startSessionTracker();
setupRequestProcessors();
// 启动请求流控器线程,即调用该线程的run()
startRequestThrottler();
...
}
protected void startRequestThrottler() {
// 为当前server创建一个请求流控器
requestThrottler = new RequestThrottler(this);
// 启动请求流控器线程,即执行其run()
requestThrottler.start();
}
// RequestThrottler.run()方法
@Override
public void run() {
try {
while (true) {
if (killed) {
break;
}
// 从队列中获取一个请求
Request request = submittedRequests.take();
if (Request.requestOfDeath == request) {
break;
}
...
// A dropped stale request will be null
if (request != null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
// 将请求提交到server进行处理
zks.submitRequestNow(request);
}
}
}
...
}
public void submitRequestNow(Request si) {
...
try {
// 处理请求连接
touch(si.cnxn);
...
}
...
}
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
// 从连接中获取到sessionId与timeout
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
// 若指定id的session有效,则将会话进行换桶,否则抛出异常
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException("No session with sessionid 0x"
+ Long.toHexString(id)
+ " exists, probably expired and removed");
}
}
- 发生会话丢失后,客户端重新发起连接请求时
// 处理client的连接请求
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throws IOException, ClientCnxnLimitException {
// 从请求中获取到sessionId
long sessionId = connReq.getSessionId();
...
// 若sessionId为0,则说明本次连接是第一次连接,那么需要创建一个session
if (sessionId == 0) {
// 创建session,并将session放入到相应的会话桶
long id = createSession(cnxn, passwd, sessionTimeout);
...
} else { // 若sessionId不为0,则说明本次连接属于client会话丢失后的重连
validateSession(cnxn, sessionId);
...
cnxn.setSessionId(sessionId);
// 重新打开会话,若连接成功,则需要判断本次成功的连接属于“连接转移”还是“会话失效”
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
// 若当前连接密码正确,则验证会话,否则关闭会话
if (checkPasswd(sessionId, passwd)) {
// 验证会话
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
// 关闭会话
finishSessionInit(cnxn, false);
}
}
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
// 判断当前sessionId会话是否还有效。若有效,则将会话放入到相应的会话桶
// rc为true表示会话有效,为false表示会话不存在或已关闭(无效)
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
// 根据会话的状态来决定是否关闭会话
finishSessionInit(cnxn, rc);
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
// register with JMX
try {
if (valid) { // 若会话有效,则重新注册原来的连接,即原来的连接仍是有效的
if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
serverCnxnFactory.registerConnection(cnxn);
} else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
secureServerCnxnFactory.registerConnection(cnxn);
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
try {
...
if (valid) { // 处理会话有效的情况
// 启用接收到的连接,client会发生会话转移事件
cnxn.enableRecv();
} else { // 处理会话无效的情况
// 向client发送关闭连接响应,client会发生会话失效事件
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
}
...
}
上面只说了桶的创建和换桶,过期的处理没说,过期处理的操作在startSessionTracker()
中。
startSessionTracker();
开启sessionTracker线程,我们看下SessionTrackerImpl的run方法:
// 该run()方法就是用于对过期会话及会话桶进行清理的,
// 而这个清理工作是定时进行的。但这个定时功能不是通过
// 定时器完成的,而是通过等待完成的。
@Override
public void run() {
try {
// 只要当前SessionTrancker没有关闭,该while()永远不会停止,
// 其就没有break
while (running) {
// 获取等待时间,即当前时间距离清理时间点的差值
long waitTime = sessionExpiryQueue.getWaitTime();
// 若waitTime > 0,说明清理时间还没有到,需要等待
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
// sessionExpiryQueue.poll() 用于清理当前过期的会话桶
// for()用于遍历这个会话桶中的所有会话
for (SessionImpl s : sessionExpiryQueue.poll()) {
// 统计
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
// 修改当前会话的关闭状态为true
setSessionClosing(s.sessionId);
// 关闭当前会话
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
public long getWaitTime() {
long now = Time.currentElapsedTime();
// 获取本次要进行过期清理的时间点 nextExpirationTime是此时还在使用的那个桶的边界时间
// 刚启动的时候会为nextExpirationTime赋值
long expirationTime = nextExpirationTime.get();
// 计算当前时间距离清理时间点还有多久
return now < expirationTime ? (expirationTime - now) : 0L;
}
// sessionExpiryQueue.poll()
public Set<E> poll() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
// 若当前时间小于清理时间点,说明还没有到清理时间,直接返回空集合,无需清理
if (now < expirationTime) {
return Collections.emptySet();
}
Set<E> set = null;
// 计算下次清理时间点
long newExpirationTime = expirationTime + expirationInterval;
// 通过CAS更新清理时间点
if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
// 将当前清理时间点作为会话桶id的会话桶从会话桶集合中remove掉
set = expiryMap.remove(expirationTime);
}
if (set == null) {
return Collections.emptySet();
}
return set;
}