在之前的文章中,我们了解到了 Dubbo 服务导出和服务引用的过程。本篇文章一起来看下服务远程调用的过程。
以下内容基于Dubbo 2.7.12版本
服务消费端 在服务引用一篇中,我们了解到服务引用对象是通过动态代理对象生成的,而远程调用也通过该代理对象发起:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class proxy0 implements ClassGenerator .DC ,EchoService ,Destroyable ,HelloService { public static Method[] methods; private InvocationHandler handler; @Override public Object $echo(Object object) { Object[] objectArray = new Object[]{object}; Object object2 = this .handler.invoke(this , methods[0 ], objectArray); return object2; } public String sayHello () { Object[] objectArray = new Object[]{}; Object object = this .handler.invoke(this , methods[1 ], objectArray); return (String)object; } @Override public void $destroy() { Object[] objectArray = new Object[]{}; Object object = this .handler.invoke(this , methods[2 ], objectArray); } public proxy0 () { } public proxy0 (InvocationHandler invocationHandler) { this .handler = invocationHandler; } }
该代理类实现了服务提供接口的方法,并将请求都转发到了 InvocationHandler#invoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0 ) { if ("toString" .equals(methodName)) { return invoker.toString(); } else if ("$destroy" .equals(methodName)) { invoker.destroy(); return null ; } else if ("hashCode" .equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals" .equals(methodName)) { return invoker.equals(args[0 ]); } RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); RpcContext.setRpcContext(invoker.getUrl()); if (consumerModel != null ) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } return invoker.invoke(rpcInvocation).recreate(); }
这里针对一些不需要远程调用的一些方法,使用本地调用。继续来到 MigrationInvoker#invoke :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public Result invoke (Invocation invocation) throws RpcException { if (!checkInvokerAvailable(serviceDiscoveryInvoker)) { if (logger.isDebugEnabled()) { logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + (invoker.getDirectory().getAllInvokers() == null ? "is null" : invoker.getDirectory().getAllInvokers().size())); } return invoker.invoke(invocation); } if (!checkInvokerAvailable(invoker)) { if (logger.isDebugEnabled()) { logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + (serviceDiscoveryInvoker.getDirectory().getAllInvokers() == null ? " is null " : serviceDiscoveryInvoker.getDirectory().getAllInvokers().size())); } return serviceDiscoveryInvoker.invoke(invocation); } return currentAvailableInvoker.invoke(invocation); }
这里将优先使用应用级服务发现invoker,应用级服务发现invoker不可用时则使用接口级服务发现invoker。来到 MockClusterInvoker#invoke :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public Result invoke (Invocation invocation) throws RpcException { Result result = null ; String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false" .equalsIgnoreCase(value)) { result = this .invoker.invoke(invocation); } else if (value.startsWith("force" )) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl()); } result = doMockInvoke(invocation, null ); } else { try { result = this .invoker.invoke(invocation); if (result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if (rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e); } result = doMockInvoke(invocation, e); } } return result; }
这里出现了mock参数的判断及处理,有3各取值情况(false、force、fail),这其实是Dubbo远程调用的mock机制,通过它我们可以实现服务降级,或者用于测试过程中的模拟调用。具体的doMockInvoke实现处理,不在这里具体展开。继续来到 AbstractCluster.InterceptorInvokerNode#invoke :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public Result invoke (Invocation invocation) throws RpcException { Result asyncResult; try { interceptor.before(next, invocation); asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; } finally { interceptor.after(next, invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; if (t == null ) { listener.onMessage(r, clusterInvoker, invocation); } else { listener.onError(t, clusterInvoker, invocation); } } }); }
这里主要是ClusterInterceptor的处理:
调用 ClusterInterceptor 的前置处理;
调用 ClusterInterceptor 的 intercept 方法,该方法将调用 AbstractClusterInvoker 的 invoke 方法;
调用 ClusterInterceptor 的后置处理。
来到 AbstractClusterInvoker#invoke :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public Result invoke (final Invocation invocation) throws RpcException { checkWhetherDestroyed(); Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0 ) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
将attachments参数 添加到 invocation,并通过 Directory#list 方法获取 invoker 列表,以及加载并获取负载均衡器,最后调用 FailoverClusterInvoker#doInvoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = calculateInvokeTimes(methodName); RpcException le = null ; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { } return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } }
在该方法中,首先获取重试次数,以在调用 invoker 失败的时候进行重试操作,在选中具体的 invoker 实例时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected Invoker<T> select (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null ; } String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName(); boolean sticky = invokers.get(0 ).getUrl() .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY); if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null ; } if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; }
这里包含了粘滞调用的处理,粘滞调用是指消费端在确保 invoker 可用时,将尽可能使用同一个 invoker 进行调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private Invoker<T> doSelect (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null ; } if (invokers.size() == 1 ) { return invokers.get(0 ); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rInvoker != null ) { invoker = rInvoker; } else { int index = invokers.indexOf(invoker); try { invoker = invokers.get((index + 1 ) % invokers.size()); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore." , e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url" , t); } } return invoker; }
这里主要做了两件事:
通过 LoadBalance 选择 invoker 实例;
如果选出来的 invoker 不可用,则使用 reselect 进行重选。
在选中 invoker 实例后,回到 FailoverClusterInvoker#doInvoke ,将进行 invoker 的调用。在经过Dubbo Filter链的调用后,来到AsyncToSyncInvoker :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { //如果是同步调用,则阻塞等待结果 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { /** * NOTICE! * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop. */ asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { //... } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; }
这是一个将异步调用转换为同步调用的 Invoker ,值得注意的是,在 Dubbo 中的远程调用都是以异步方式进行调用的。
来到 DubboInvoker :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Override public Result invoke (Invocation inv) throws RpcException { if (destroyed.get()) { } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this ); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addObjectAttachmentsIfAbsent(attachment); } Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addObjectAttachments(contextAttachments); } invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION)); if (serializationId != null ) { invocation.put(SERIALIZATION_ID_KEY, serializationId); } AsyncRpcResult asyncResult; try { asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { } catch (RpcException e) { } catch (Throwable e) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; }
这里主要是将 attachment 信息添加到 invocation ,然后来到 DubboInvoker#doInvoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); invocation.put(TIMEOUT_KEY, timeout); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { } catch (RemotingException e) { } }
这里首先以轮询的方式获取到客户端,然后发起远程请求,结合 AsyncToSyncInvoker 的处理逻辑可以看出,在非单向调用的情况下,Dubbo 的调用返回结果都是异步结果,对于同步的调用,调用结果由内部 AsyncToSyncInvoker 阻塞并获取调用结果返回,而对于异步调用则返回 AsyncRpcResult ,最终结果的获取时机则交由用户决定。
来到请求的发起,HeaderExchangeChannel#request :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public CompletableFuture<Object> request (Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this .getLocalAddress(), null , "Failed to send request " + request + ", cause: The channel " + this + " is closed!" ); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true ); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
首先是构建 Request 请求对象,然后通过 DefaultFuture#newFuture 构造一个 DefaultFuture :
1 2 3 4 5 6 7 8 9 10 11 public static DefaultFuture newFuture (Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } timeoutCheck(future); return future; }
1 2 3 4 5 6 7 8 9 10 11 private DefaultFuture (Channel channel, Request request, int timeout) { this .channel = channel; this .request = request; this .id = request.getId(); this .timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); FUTURES.put(id, this ); CHANNELS.put(id, channel); }
在 DefaultFuture 的构造函数中,可以看到,在 DefaultFuture 中,维护了 请求id ->DefaultFuture 的映射关系,而这个是 Dubbo 实现异步的关键。
当该次调用接收到结果时,通过这个映射关系可以直接找到 DefaultFuture 对象,然后将调用结果填充回去,整个异步调用的请和结果的关联在此处得以体现,DefaultFuture#received :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { Timeout t = future.timeoutCheckTask; if (!timeout) { t.cancel(); } future.doReceived(response); } else { } } finally { CHANNELS.remove(response.getId()); } }
拿到 Request 请求对象后,最后发送请求,Dubbo 默认使用 Netty作为底层通信框架,此时 channel 实例类型为 NettyClient ,即通过 NettyClient#send 方法发送请求,后续就是根据 Dubbo 协议对该请求对象进行编码然后将请求发送出去。
关于 Dubbo消费端请求的发起调用,本篇就简单看到这里,这里附上一个调用的关键路径:
1 2 3 4 5 6 7 8 9 10 11 12 13 InvokerInvocationHandler#invoke MigrationInvoker#invoke MockClusterInvoker#invoke AbstractCluster$InterceptorInvokerNode#invoke (ConsumerContextClusterInterceptor)ClusterInterceptor#intercept AbstractClusterInvoker#invoke -> FailoverClusterInvoker#doInvoke FilterNode#invoke(Filter链执行) AsyncToSyncInvoker#invoke AbstractInvoker#invoke -> DubboInvoker#invoke ReferenceCountExchangeClient#request HeaderExchangeClient#request HeaderExchangeChannel#request NettyClient#send
服务提供端 前面我们提到,Dubbo 使用 Netty 作为底层通信框架,当 Netty 接收到新请求后,首先会根据 Dubbo 协议通过解码器进行 Request 请求对象的解码,在得到 Request 请求对象后,接着便是调用服务了,我们直接看下 ChannelEventRunnable#run :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break ; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break ; default : logger.warn("unknown state: " + state + ", message is " + message); } } }
这里针对不同类型的事件,调用 DecodeHandler 的不同方法,对于接收请求,则来到 DecodeHandler#received :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }
解码后, Request 对象传递到 HeaderExchangeHandler#received :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0 ) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } }
这里区分了双向通信和单向通信,对于双向通信,则先来到 HeaderExchangeHandler#handleRequest :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null ) { msg = null ; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return ; } Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
来到 DubboProtocol$ExchangeHandlerAdapter#reply :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply (ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods" ); boolean hasMethod = false ; if (methodsStr == null || !methodsStr.contains("," )) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split("," ); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true ; break ; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } }
这里首先通过 DubboProtocol#getInvoker 是获取对应的 invoker 实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false ; boolean isStubServiceInvoke = false ; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null ) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }
然后调用 Invoker#invoke 方法,我们在服务导出中了解到,服务导出生成的 invoker 实例实际是动态代理对象,在经过服务提供端拦截器链的处理后,会调用该代理对象的 invoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class JavassistProxyFactory extends AbstractProxyFactory { @Override public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
而 invoke 方法又通过 Wrapper 对象,使用反射调用具体的业务实现方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Wrapper1 extends Wrapper implements ClassGenerator .DC { public Object invokeMethod (Object object, String string, Class[] classArray, Object[] objectArray) throws InvocationTargetException { HelloServiceImpl helloServiceImpl; try { helloServiceImpl = (HelloServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { if ("sayHello" .equals(string) && classArray.length == 0 ) { return helloServiceImpl.sayHello(); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"" ).append(string).append("\" in class laixiaoming.service.HelloServiceImpl." ).toString()); } }
在拿到服务调用返回后,将其封装为 Response 对象,最后通过 HeaderExchangeChannel#send 回写 Response 对象到调用方。
关于服务提供端的处理链路,这里附上一个调用中的关键的路径:
1 2 3 4 5 6 7 8 ChannelEventRunnable#run DecodeHandler#received HeaderExchangeHandler#received HeaderExchangeHandler#handleRequest DubboProtocol$ExchangeHandlerAdapter#reply AbstractProxyInvoker#invoke Wrapper1#invokeMethod HelloServiceImpl#sayHello
调用结果的处理 在服务提供方将 Response 对象返回时,同样有一个编码的过程,在服务消费方接收到响应数据后,也有一个解码的过程,最终得到 Response 对象。过程我们直接看下 HeaderExchangeHandler#received 中的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { } else { handler.received(exchangeChannel, message); } }
HeaderExchangeHandler#handleResponse :
1 2 3 4 5 static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
来到 DefaultFuture#received :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static void received (Channel channel, Response response) { received(channel, response, false ); } public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { Timeout t = future.timeoutCheckTask; if (!timeout) { t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS" ).format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result." ); } } finally { CHANNELS.remove(response.getId()); } }
通过请求 id 获取到对应的 DefaultFuture 对象后,DefaultFuture#doReceived :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void doReceived (Response res) { if (res == null ) { throw new IllegalStateException("response cannot be null" ); } if (res.getStatus() == Response.OK) { this .complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this .completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this .completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception." )); } } }
最后将响应结果设置到 Future 中,消费端用户线程被唤起,Future#get 就可以获取到调用结果了。至此,一次 dubbo 调用就大致结束了。
小结 本文主要就 Dubbo 调用的整个过程,从源码的角度作了简单分析,包括:服务消费端的请求发起、服务提供端处理请求的处理、服务提供端响应的返回、以及服务消费端响应的处理。