前言
在 Dubbo 中,默认协议使用单一长连接进行通信,这可以避免每次调用都新建 TCP 连接,从而降低延迟、提高调用的响应速度,但长连接的稳定性受环境、服务端等影响可能会断开,所以如何及时检测连接是否可用并进行重连(保活),对于通信框架而言是非常重要的话题。
以下内容基于Dubbo 2.7.12版本
连接保活
为什么需要保活
在了解 Dubbo 的方案前,我们可以先思考下有哪些因素会影响长连接的的稳定性:
- 在网络通信中,客户端和服务端之间的数据传输通常会经过多个中间设备(如NAT网关、防火墙等)。这些设备为了节省资源或遵循安全策略,可能会在一段时间内没有数据传输时主动删除 TCP 会话信息;
- 网络链路故障,或者进入不稳定或者拥堵的网络环境,可能会导致连接被关闭;
- 服务端应用进程被关闭。
如何保活
那为了确保连接的可用性,以及及时发现异常断线情况并进行重连,我们可以采取哪些措施呢?
提到保活,我们都知道 TCP keepalive机制,但这个其实是操作系统实现的一个功能,在开启这个机制后,当长连接无数据交互一定时间间隔时,连接的一方会向对端发送保活探测包,在连接正常时,对端将对此作出回应。
那既然如此,是不是只要开启 keepalive 就行了呢?
其实不够,因为 keepalive 只是在网络层进行保活,假使网络本身没问题,但是由于服务端进程假死等异常情况时,此时 keepalive 就无法探测到了,在这种情况下,往往需要应用层去实现心跳机制,按照一定的时间间隔,发送心跳包到对端,以此确认连接对端是否可达。而相对 TCP 的 keepalive,实现业务心跳:
- 不仅可以检测连接是否正常,也可以检测对端进程是否正常
- 具有更大的灵活性,可以自己控制检测的间隔,检测的方式等;
- 另外,心跳包还可以附带业务信息,定时在服务端和客户端之间同步。
但在应用层实现心跳也需要考虑资源占用情况,心跳检测过于频繁会增加系统的负担,而间隔太长又可能会导致不可用的连接不能尽早的被发现,这是一个需要权衡的问题。
本文将一起来学习下,在 Dubbo 中,连接是如何保活的。
Dubbo 的心跳机制
服务消费端
ExchangeClient 是 Dubbo 对服务消费端信息交换层的抽象,而 HeaderExchangeClient 是其默认实现,我们直接来到其构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client);
if (startTimer) { URL url = client.getUrl(); startReconnectTask(url); startHeartBeatTask(url); } }
|
startReconnectTask 开启了一个重连定时任务:
1 2 3 4 5 6 7 8 9 10 11 12
| private void startReconnectTask(URL url) { if (shouldReconnect(url)) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); int idleTimeout = getIdleTimeout(url); long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout); this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout); IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); } }
|
重连定时任务 ReconnectTimerTask#doTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long now = now();
if (!channel.isConnected()) { try { logger.info("Initial connection to " + channel); ((Client) channel).reconnect(); } catch (Exception e) { logger.error("Fail to connect to " + channel, e); } } else if (lastRead != null && now - lastRead > idleTimeout) { logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: " + idleTimeout + "ms"); try { ((Client) channel).reconnect(); } catch (Exception e) { logger.error(channel + "reconnect failed during idle time.", e); } } } catch (Throwable t) { logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t); } }
|
startHeartBeatTask 开启了一个心跳定时任务:
1 2 3 4 5 6 7 8 9 10 11 12
| private void startHeartBeatTask(URL url) { if (!client.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); int heartbeat = getHeartbeat(url); long heartbeatTick = calculateLeastDuration(heartbeat); this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); } }
|
在获取心跳任务的执行时间间隔时,这里跟重连任务一样,同时是将时间除以3得到,这是为什么呢?这其实是 Dubbo 缩短了检测间隔时间,增大了及时发现死链的概率。
心跳定时任务 HeartbeatTimerTask#doTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } }
|
另外,我们还注意到 ,在 startHeartBeatTask 开启心跳任务前,有个前提条件的判断:
查看实现得知,这是 Netty 对空闲连接的检测提供了天然支持,在使用 Netty 时,可以通过 IdleStateHandler 很方便的实现空闲连接检测。在 NettyClientHandler 中,提供了对空闲事件的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); if (logger.isDebugEnabled()) { logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(HEARTBEAT_EVENT); channel.send(req); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } else { super.userEventTriggered(ctx, evt); } }
|
有了 IdleStateHandler 的支持,Dubbo 可以减少一个心跳任务的定时器,从而降低资源消耗。
服务提供端
在服务提供端,HeaderExchangeServer 是 Dubbo 信息交互层的默认实现在其构造函数中:
1 2 3 4 5 6
| public HeaderExchangeServer(RemotingServer server) { Assert.notNull(server, "server == null"); this.server = server; startIdleCheckTask(getUrl()); }
|
会开启一个空闲检测任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private void start客户端是否能够处理空闲事件IdleCheckTask(URL url) { if (!server.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); this.closeTimerTask = closeTimerTask;
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } }
|
关闭连接定时任务 CloseTimerTask#doTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); Long now = now(); if ((lastRead != null && now - lastRead > idleTimeout) || (lastWrite != null && now - lastWrite > idleTimeout)) { logger.warn("Close channel " + channel + ", because idleCheck timeout: " + idleTimeout + "ms"); channel.close(); } } catch (Throwable t) { logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t); } }
|
可以看到,在服务提供端这边,如果在指定的空闲超时时间内没有发生读写,会直接关闭连接。
而对于 NettyServer 的空闲处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { logger.info("IdleStateEvent triggered, close channel " + channel); channel.close(); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } super.userEventTriggered(ctx, evt); }
|
也是对连接直接关闭。
总结
在 Dubbo 服务消费端,会启动两个定时任务,IdleStateHandler主要用于定时发送心跳请求,而 ReconnectTimerTask 用于3次心跳未回复之后进行重连的逻辑;而在服务提供端,只启动了一个 CloseTimerTask,用于检测空闲超时时间关闭连接。另外对于 Netty 的实现客户端/服务端,则使用了 IdleStateHandler 代替了其中的 IdleStateHandler 及 CloseTimerTask。