前言 在微服务架构中,能快速更新和迭代是其最大的特点之一,而在重启或更新的过程中,为了确保现有请求的完整性以及最小化对业务的影响,“优雅关闭”就显得十分重要了。
本文将结合源码一起来学习下在 Dubbo 中,“优雅关闭”是如何实现的。
以下内容基于Dubbo 2.7.12版本
Shutdown Hook 在 Linux 中,终止一个线程通常使用 kill [参数] [PID]
kill pid
,等同于 kill -15 pid
,向指定进程发送 SIGTERM 信号;
kill -9 pid
,向指定进程发送 SIGKILL 立即终止信号;
kill -2 pid
,向指定进程发送 SIGINT 中断信号,等同于 CTRL + C
而在 Java 中,JVM 提供了 Shutdown Hook 钩子,在 JVM 接收到 SIGTERM 信号通知时,将会触发已注册的 Shutdown Hook 关闭钩子的执行,而注册关闭钩子函数则可以通过 Java.Runtime.addShutdownHook
完成,该方法接收一个 java.lang.Thread
Dubbo 的优雅关闭 Dubbo 的优雅关闭同样了依赖于 Shutdown Hook,这也表明了只有在对 Java 进程发送 SIGTERM (kill pid
DubboShutdownHook 在 Dubbo 启动时,将在 DubboBootstrap 中的构造方法中注册了 DubboShutdownHook :
1 2 3 4 5 6 7 8 private DubboBootstrap () { DubboShutdownHook.getDubboShutdownHook().register(); ShutdownHookCallbacks.INSTANCE.addCallback(DubboBootstrap.this ::destroy); }
在收到进程关闭信号时,将启动对应的钩子线程,执行 run 方法:
1 2 3 4 5 6 7 8 public void run () { if (logger.isInfoEnabled()) { logger.info("Run shutdown hook now." ); } callback(); doDestroy(); }
在 callback 方法中,会执行所有注册的回调方法,包含 DubboBootstrap#destroy :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void destroy () { if (destroyLock.tryLock()) { try { if (started.compareAndSet(true , false ) && destroyed.compareAndSet(false , true )) { unregisterServiceInstance(); unexportMetadataService(); unexportServices(); unreferServices(); destroyRegistries(); destroyServiceDiscoveries(); destroyExecutorRepository(); clear(); shutdown(); release(); ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class); exts.getSupportedExtensionInstances().forEach(ext -> ext.onStop(this )); } DubboShutdownHook.destroyAll(); } finally { destroyLock.unlock(); } } }
在这里,我们主要学习下取消服务导出、销毁注册中心、以及关闭 Protocol 这几个流程。
取消服务导出 取消服务导出的流程包含在 unexportServices 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void unexportServices () { exportedServices.forEach((serviceName, sc) -> { configManager.removeConfig(sc); sc.unexport(); }); asyncExportingFutures.forEach(future -> { if (!future.isDone()) { future.cancel(true ); } }); asyncExportingFutures.clear(); exportedServices.clear(); }
它会针对每一个已导出的服务,调用其ServiceConfig#unexport 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void unexport() { if (!exported) { return; } if (unexported) { return; } if (!exporters.isEmpty()) { // 调用 Exporter#unexport 方法 for (Exporter<?> exporter : exporters) { try { exporter.unexport(); } catch (Throwable t) { logger.warn("Unexpected error occurred when unexport " + exporter, t); } } exporters.clear(); } unexported = true; // dispatch a ServiceConfigUnExportedEvent since 2.7.4 dispatch(new ServiceConfigUnexportedEvent(this)); }
最终来到 RegistryProtocol.ExporterChangeableWrapper#unexport:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public void unexport () { if (!unexported.compareAndSet(false ,true )) { return ; } String key = getCacheKey(this .originInvoker); bounds.remove(key); Registry registry = RegistryProtocol.this .getRegistry(originInvoker); try { registry.unregister(registerUrl); } catch (Throwable t) { LOGGER.warn(t.getMessage(), t); } try { NotifyListener listener = RegistryProtocol.this .overrideListeners.remove(subscribeUrl); registry.unsubscribe(subscribeUrl, listener); ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension() .removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX, serviceConfigurationListeners.get(subscribeUrl.getServiceKey())); } catch (Throwable t) { LOGGER.warn(t.getMessage(), t); } executor.submit(() -> { try { int timeout = ConfigurationUtils.getServerShutdownTimeout(); if (timeout > 0 ) { LOGGER.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. " + "Usually, this is called when you use dubbo API" ); Thread.sleep(timeout); } exporter.unexport(); } catch (Throwable t) { LOGGER.warn(t.getMessage(), t); } }); }
首先向注册中心注销当前服务,以 Nacos 注册中心为例,会下线当前服务,同时停掉与注册的心跳任务,然后取消订阅;
在等待 10 s 后(等待时间可以通过 dubbo.service.shutdown.wait 配置),销毁对应的 exporter 实例(异步执行)。
销毁注册中心 在 destroyRegistries 方法中,包含了注销注册中心的逻辑,AbstractRegistryFactory#destroyAll:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static void destroyAll () { if (!destroyed.compareAndSet(false , true )) { return ; } if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } LOCK.lock(); try { for (Registry registry : getRegistries()) { try { registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } REGISTRIES.clear(); } finally { LOCK.unlock(); } }
以 nacos 注册中心为例,会来到 org.apache.dubbo.registry.support.AbstractRegistry#destroy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void destroy () { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } Set<URL> destroyRegistered = new HashSet<>(getRegistered()); if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<>(destroyRegistered)) { if (url.getParameter(DYNAMIC_KEY, true )) { try { unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } AbstractRegistryFactory.removeDestroyedRegistry(this ); }
关闭 Protocol 在 DubboShutdownHook.destroyAll 方法中,包含了销毁注册中心和关闭 Protocol 两个事情,其中销毁注册中心在前面执行过,不会重复执行,我们直接来看下 Protocol 的关闭:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void destroyProtocols () { ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class); for (String protocolName : loader.getLoadedExtensions()) { try { Protocol protocol = loader.getLoadedExtension(protocolName); if (protocol != null ) { protocol.destroy(); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }
在运行时,loader.getLoadedExtension(protocolName) 会返回两种 Protocol:InjvmProtocol 和 DubboProtocol,而 DubboProtocol 用于提供网络间的调用,我们主要来看下 DubboProtocol#destroy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void destroy () { for (String key : new ArrayList<>(serverMap.keySet())) { ProtocolServer protocolServer = serverMap.remove(key); if (protocolServer == null ) { continue ; } RemotingServer server = protocolServer.getRemotingServer(); try { if (logger.isInfoEnabled()) { logger.info("Close dubbo server: " + server.getLocalAddress()); } server.close(ConfigurationUtils.getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } for (String key : new ArrayList<>(referenceClientMap.keySet())) { Object clients = referenceClientMap.remove(key); if (clients instanceof List) { List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients; if (CollectionUtils.isEmpty(typedClients)) { continue ; } for (ReferenceCountExchangeClient client : typedClients) { closeReferenceCountExchangeClient(client); } } } super .destroy(); }
包含了两部分逻辑:server 和 client,server 端对应了当前应用的服务提供者,而 client 对应当前应用的服务消费者。
关闭 server 端 server 端的关闭会来到 HeaderExchangeServer#close(int):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void close (final int timeout) { startClose(); if (timeout > 0 ) { final long max = timeout; final long start = System.currentTimeMillis(); if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true )) { sendChannelReadOnlyEvent(); } while (isRunning() && System.currentTimeMillis() - start < max) { try { Thread.sleep(10 ); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } doClose(); server.close(timeout); }
首先会向服务消费端发送 READ_ONLY 事件,服务消费端在接收到此事件后不会再调用该服务提供者。前面在销毁注册中心时,实际已经向注册中心下线服务了,这里为什么又通知一次呢?这其实还是延迟的考虑,这样可以进一步降低注册中心延迟带来的影响。
来到 AbstractServer#close(int):
1 2 3 4 5 6 public void close (int timeout) { ExecutorUtil.gracefulShutdown(executor, timeout); close(); }
最后是关闭底通信服务 NettyServer。
关闭 client 端 client 端的关闭逻辑会首先来到 HeaderExchangeClient#close(int):
1 2 3 4 5 6 7 8 public void close (int timeout) { startClose(); doClose(); channel.close(timeout); }
来到 HeaderExchangeChannel#close(int):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void close (int timeout) { if (closed) { return ; } closed = true ; if (timeout > 0 ) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(channel) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10 ); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); }
关闭 client 时,如果存在未收到响应的请求,并且等待的时长小于超时时长,将进行等待。这里主要是尽可能的保证了发送出去但未收到响应的请求能在关闭前处理完。
在 Spring 容器下的兼容 在 Spring 环境下,由于 Spring 也有着自己的 ShutdownHook,而 DubboShutdownHook 的执行与 Spring 的不在同一个线程内,因此执行的顺序无人保证。假设 Spring 先销毁了,其容器内的 bean 也将被销毁,如果 Dubbo 依赖了 Spring 中的 bean(如数据源等),即使此时 Dubbo 的 DubboShutdownHook 未执行,此时也无法正常使用,从而对业务也会产生中断。针对这种情况,Dubbo 也做了兼容的处理。
如果当前是 Spring 环境,SpringExtensionFactory 会被加载,并通过 SpringExtensionFactory#addApplicationContext:
1 2 3 4 5 6 7 8 9 10 public static void addApplicationContext (ApplicationContext context) { CONTEXTS.add(context); if (context instanceof ConfigurableApplicationContext) { ((ConfigurableApplicationContext) context).registerShutdownHook(); DubboShutdownHook.getDubboShutdownHook().unregister(); } }
将 Dubbo 的 ShutdownHook 移除,同时新增了 DubboBootstrapApplicationListener ,增加了对 Spring 容器关闭事件的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void onApplicationContextEvent (ApplicationContextEvent event) { if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null ) { DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext(); } if (event instanceof ContextRefreshedEvent) { onContextRefreshedEvent((ContextRefreshedEvent) event); } else if (event instanceof ContextClosedEvent) { onContextClosedEvent((ContextClosedEvent) event); } } private void onContextClosedEvent (ContextClosedEvent event) { DubboShutdownHook.getDubboShutdownHook().run(); }
在 ContextClosedEvent 事件处理逻辑中,调用了 DubboShutdownHook 的处理方法。
总结 总的来说,Dubbo 的优雅关闭主要分为几个部分:取消服务导出、销毁注册中心、以及关闭 Protocol 。主要确保了: