privatevoidrefreshInvoker(List<URL> invokerUrls){ Assert.notNull(invokerUrls, "invokerUrls should not be null"); //invokerUrls长度为1,并且协议为empty,则销毁所有invoker if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { //标记为禁止访问 this.forbidden = true; // Forbid to access //销毁所有invoker实例 this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } //如果invokerUrls为空,并且cachedInvokerUrls不为空,则使用cachedInvokerUrls if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { //缓存invokerUrls this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } this.forbidden = false; // Allow to access //将url转换为invoker实例 Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/** * If the calculation is wrong, it is not processed. * * 1. The protocol configured by the client is inconsistent with the protocol of the server. * eg: consumer protocol = dubbo, provider only has other protocol services(rest). * 2. The registration center is not robust and pushes illegal specification data. * */ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls .toString())); return; }
//更新服务目录中的invoker列表 List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap;
//销毁无用的invoker // Close the unused Invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
private Map<URL, Invoker<T>> toInvokers(List<URL> urls) { Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(); if (CollectionUtils.isEmpty(urls)) { return newUrlInvokerMap; } Set<URL> keys = new HashSet<>(); //获取消费端支持的协议 String queryProtocols = this.queryMap.get(PROTOCOL_KEY); for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } //是否支持消费端协议,不支持则忽略 if (!accept) { continue; } } //忽略empty协议的URL if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } //通过SPI的方式检测消费端是否存在对应的扩展实现 if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } //合并URL,按一定优先级合并配置 URL url = mergeUrl(providerUrl);
if (keys.contains(url)) { // Repeated url continue; } keys.add(url); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url); if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; //根据URL参数决定是否创建invoker if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } //通过Protocol#refer方法创建invoker实例 if (enabled) { invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(url, invoker); } } else { newUrlInvokerMap.put(url, invoker); } } keys.clear(); return newUrlInvokerMap; }
private ExchangeClient[] getClients(URL url) { // 是否共享连接 int connections = url.getParameter(CONNECTIONS_KEY, 0); // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { /* * The xml configuration should have a higher priority than properties. */ String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); //获取连接数 connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); return getSharedClient(url, connections).toArray(new ExchangeClient[0]); } else { //初始化新的客户端 ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { clients[i] = initClient(url); } return clients; }