Dubbo3 Triple协议启动及调用流程
我源码分析的文章写得比较烂QAQ
所以这篇文章只能叫源码阅读过程。
调试环境准备
下载 Dubbo 的代码
git clone https://github.com/apache/dubbo
构建代码(如果是 arm 架构机器需要带上 os.detected.classifier=osx-x86_64
)
mvn clean install -DskipTests -Dos.detected.classifier=osx-x86_64
构建完成后,Dubbo Compiler 会根据 proto 文件生成代码,位于相应模块的 build 目录下,如下图。
右键java目录将生成的代码标记为Ganerated Sources Root。
接着运行zookeeper,在2181端口。
本文将以 dubbo-demo-triple
的 org.apache.dubbo.demo.provider.ApiProvider
和 org.apache.dubbo.demo.consumer.ApiConsumer
为入口,分析 Triple 协议的调用流程。
生产者服务启动流程
org.apache.dubbo.demo.provider.ApiProvider
的 main 方法中调用了 DubboBootstrap#start
,开启了生产者服务。
而bootstrap的start方法主要是调用了 DefaultApplicationDeployer#start
。我们接下来主要看这个方法(请看注释)。
/**
* Start the bootstrap
*
* @return
*/
@Override
public Future start() {
synchronized (startLock) {
if (isStopping() || isStopped() || isFailed()) {
throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
}
try {
// 检查开启标志
...
// 如果已经开启过了就直接返回
if (isStarted() && !hasPendingModule) {
return CompletableFuture.completedFuture(false);
}
// pending -> starting : first start app
// started -> starting : re-start app
// 设置开启标志
onStarting();
// 初始化
initialize();
// * 开启逻辑
doStart();
} catch (Throwable e) {
onFailed(getIdentifier() + " start failure", e);
throw e;
}
return startFuture;
}
}
这里最重要的是 doStart 方法。
doStart -> startModules -> moduleModel.getDeployer().start();
// DefaultModuleDeployer#start
@Override
public synchronized Future start() throws IllegalStateException {
if (isStopping() || isStopped() || isFailed()) {
throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
}
try {
if (isStarting() || isStarted()) {
return startFuture;
}
// 标记开始
onModuleStarting();
// 初始化
applicationDeployer.initialize();
initialize();
// 1. 暴露服务(消费者不会调用)
exportServices();
// prepare application instance
// exclude internal module to avoid wait itself
// 先去开启子模块
if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
applicationDeployer.prepareInternalModule();
}
// 这个地方生产者不会调用
referServices();
// 设置已经开启
...
} catch (Throwable e) {
onModuleFailed(getIdentifier() + " start failed: " + e, e);
throw e;
}
return startFuture;
}
-
暴露服务的代码如下所示。
private void exportServiceInternal(ServiceConfigBase sc) { ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc; if (!serviceConfig.isRefreshed()) { // 刷新配置 serviceConfig.refresh(); } if (sc.isExported()) { return; } if (exportAsync || sc.shouldExportAsync()) { // 异步暴露 ExecutorService executor = executorRepository.getServiceExportExecutor(); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { if (!sc.isExported()) { sc.export(); exportedServices.add(sc); } } catch (Throwable t) { logger.error(getIdentifier() + " export async catch error : " + t.getMessage(), t); } }, executor); asyncExportingFutures.add(future); } else { // 同步暴露 if (!sc.isExported()) { // * 这个地方会把服务信息写到注册中心 sc.export(); exportedServices.add(sc); } } }
上面 * 的地方有点复杂。。。核心调用链路如下:
ServiceConfig#export -> ServiceConfig#doExporter -> ServiceConfig#doExportUrls -> ServiceConfig#doExportUrlsFor1Protocol -> ServiceConfig#exportUrl -> ServiceConfig#exportRemote -> ServiceConfig#doExportUrl,这个地方是我觉得服务者暴露最最最最重要的一个方法。
吐槽一下,这个地方的调用链路,太绕了吧 QAQ… 迷路了无数次
private void doExportUrl(URL url, boolean withMetaData) { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } Exporter<?> exporter = protocolSPI.export(invoker); exporters.add(exporter); }
这里的doExportUrl,首先利用 proxyFactory 得到一个 invoker 对象,再通过 DelegateProviderMetaDataInvoker 包装一下(不重要),然后利用 Protocol 类对象的 export 方法导出 invoker(重点)。
-
TripleProtocol#export -> PortUnificationExchanger#bind -> PortUnificationServer#bind -> PortUnificationServer#doOpen
protected void doOpen() { bootstrap = new ServerBootstrap(); bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), EVENT_LOOP_WORKER_POOL_NAME); final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false); final SslContext sslContext; if (enableSsl) { sslContext = SslContexts.buildServerSslContext(url); } else { sslContext = null; } bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // Do not add idle state handler here, because it should be added in the protocol handler. final ChannelPipeline p = ch.pipeline(); final PortUnificationServerHandler puHandler; puHandler = new PortUnificationServerHandler(url, sslContext, true, protocols, channels); p.addLast("negotiation-protocol", puHandler); } }); // bind String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort); ChannelFuture channelFuture = bootstrap.bind(bindAddress); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
这里调用了netty的相关api,建立tcp监听,而netty服务端最重要的是各种handler,这里就是
PortUnificationServerHandler
-
这个handler主要看detect方法。
-
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // Will use the first five bytes to detect a protocol. if (in.readableBytes() < 5) { return; } if (isSsl(in)) { enableSsl(ctx); } else { for (final WireProtocol protocol : protocols) { in.markReaderIndex(); final ProtocolDetector.Result result = protocol.detector().detect(ctx, in); in.resetReaderIndex(); switch (result) { case UNRECOGNIZED: continue; case RECOGNIZED: protocol.configServerPipeline(url, ctx.pipeline(), sslCtx); ctx.pipeline().remove(this); case NEED_MORE_DATA: return; default: return; } } byte[] preface = new byte[in.readableBytes()]; in.readBytes(preface); Set<String> supported = url.getApplicationModel() .getExtensionLoader(WireProtocol.class) .getSupportedExtensions(); LOGGER.error(String.format("Can not recognize protocol from downstream=%s . " + "preface=%s protocols=%s", ctx.channel().remoteAddress(), Bytes.bytes2hex(preface), supported)); // Unknown protocol; discard everything and close the connection. in.clear(); ctx.close(); } }
经过ssl、长度等校验后,进入
protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
public void configServerPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { final List<HeaderFilter> headFilters; if (filtersLoader != null) { headFilters = filtersLoader.getActivateExtension(url, HEADER_FILTER_KEY); } else { headFilters = Collections.emptyList(); } final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer() .gracefulShutdownTimeoutMillis(10000) .initialSettings(new Http2Settings().headerTableSize( config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE)) .maxConcurrentStreams( config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE)) .initialWindowSize( config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE)) .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE)) .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE))) .frameLogger(SERVER_LOGGER) .build(); final Http2MultiplexHandler handler = new Http2MultiplexHandler( new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { final ChannelPipeline p = ch.pipeline(); p.addLast(new TripleCommandOutBoundHandler()); p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url), headFilters)); } }); pipeline.addLast(codec, new TripleServerConnectionHandler(), handler, new TripleTailHandler()); }
这里加入了几个handler,分别是:EpollServerSocketChannel/NioServerSocketChannel、Http2FrameCodec、TripleServerConnectionHandler、Http2MultiplexHandler (包含TripleCommandOutBoundHandler、TripleHttp2FrameServerHandler)、TripleTailHandler
-
Http2FrameCodec(Netty自带)
它的主要作用是将HTTP/2中的frames和Http2Frame对象进行映射。Http2Frame是netty中对应所有http2 frame的封装,这样就可以在后续的handler中专注于处理Http2Frame对象即可,从而摆脱了http2协议的各种细节,可以减少使用者的工作量。[1]
在后续的handler中,我们可以看到各种 Frame ,应该就是经过这个handler的处理
-
TripleServerConnectionHandler
这个handler没什么特别,就是对 Http2PingFrame、Http2GoAwayFrame 等特殊 Frame 做处理。
-
Http2MultiplexHandler
这个handler加载了TripleCommandOutBoundHandler、TripleHttp2FrameServerHandler
TripleHttp2FrameServerHandler 继承了 ChannelDuplexHandler,是 ChannelInboundHandler 和 ChannelOutboundHandler 的混合。下面是它的 channelRead 方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Http2HeadersFrame) { onHeadersRead(ctx, (Http2HeadersFrame) msg); } else if (msg instanceof Http2DataFrame) { onDataRead(ctx, (Http2DataFrame) msg); } else if (msg instanceof ReferenceCounted) { // ignored ReferenceCountUtil.release(msg); } }
会分别对http2 header 帧和 数据帧做处理。
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception { TripleServerStream tripleServerStream = new TripleServerStream(ctx.channel(), frameworkModel, executor, pathResolver, acceptEncoding, filters); ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream); tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream()); }
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception { final TripleServerStream tripleServerStream = ctx.channel().attr(SERVER_STREAM_KEY) .get(); tripleServerStream.transportObserver.onData(msg.content(), msg.isEndStream()); }
onHeadersRead 新建了一个
TripleServerStream
对象,然后将其放进上下文中。onDataRead 将给定的数据添加到 deframer 并尝试传递给 listener。这个地方的 listener 比较复杂,后面再看,总之就是把消息吃进来了。(*最后面会再来详细看)
-
TripleTailHandler
处理未处理的消息以避免内存泄漏和 netty 的 unhandled exception
-
-
消费者服务引用
消费者启动的核心是 referenceConfig#get()
。
@Override
public T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
这里主要是调用了 init
方法进行初始化。
主要逻辑:init -> createProxy -> ReferenceConfig#createInvokerForRemote -> TripleProtocol#refer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
ExecutorService streamExecutor = getOrCreateStreamExecutor(
url.getOrDefaultApplicationModel());
TripleInvoker<T> invoker = new TripleInvoker<>(type, url, acceptEncodings,
connectionManager, invokers, streamExecutor);
invokers.add(invoker);
return invoker;
}
这里将远程服务抽象成了一个invoker对象
接下来的主要逻辑:TripleInvoker的构造方法 -> connectionManager.connect(url)
@Override
public Connection connect(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
return connections.compute(url.getAddress(), (address, conn) -> {
if (conn == null) {
final Connection created = new Connection(url);
created.getClosePromise().addListener(future -> connections.remove(address, created));
return created;
} else {
conn.retain();
return conn;
}
});
}
在 new Connection(url) 这一行,调用了 Connection 的构造方法
public Connection(URL url) {
url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
this.url = url;
this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class)
.getExtension(url.getProtocol());
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY,
Constants.DEFAULT_CONNECT_TIMEOUT);
this.remote = getConnectAddress();
this.bootstrap = create();
}
这里的 create 代码如下。
private Bootstrap create() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.remoteAddress(remote)
.channel(socketChannelClass());
final ConnectionHandler connectionHandler = new ConnectionHandler(this);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
final ChannelPipeline pipeline = ch.pipeline();
SslContext sslContext = null;
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
pipeline.addLast("negotiation", new SslClientTlsHandler(url));
}
//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
// TODO support IDLE
// int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
pipeline.addLast(connectionHandler);
protocol.configClientPipeline(url, pipeline, sslContext);
// TODO support Socks5
}
});
return bootstrap;
}
这里设置了 netty 客户端。
这里添加的 handler 有:EpollSocketChannel /。NioSocketChannel、SslClientTlsHandler、ConnectionHandler、Http2FrameCodec、Http2MultiplexHandler、TripleTailHandler,后面三个是 protocol.configClientPipeline(url, pipeline, sslContext)
添加的。
-
SslClientTlsHandler
这个 handler 就是在握手的时候判断有没有出错,然后进行响应处理。
-
ConnectionHandler
主要用来处理连接相关的,比如断连了就尝试重连。
-
Http2FrameCodec
这个跟服务端的一样,用来将HTTP/2中的frames和Http2Frame对象进行映射。
这样其实就是把 bootstrap 层层包装,包装进 invoker
回到 ReferenceConfig#init -> ReferenceConfig#createProxy。
在 refer 完成后,接着根据 invoker 创建代理对象
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
这里经过尝试会发现进入 AbstractProxyFactory#getProxy。另外这里的 invoker 大概长下面这样。
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
// when compiling with native image, ensure that the order of the interfaces remains unchanged
LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
ClassLoader classLoader = getClassLoader(invoker);
String config = invoker.getUrl().getParameter(INTERFACES);
if (StringUtils.isNotEmpty(config)) {
String[] types = COMMA_SPLIT_PATTERN.split(config);
for (String type : types) {
try {
interfaces.add(ReflectUtils.forName(classLoader, type));
} catch (Throwable e) {
// ignore
}
}
}
Class<?> realInterfaceClass = null;
if (generic) {
try {
// find the real interface from url
String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
realInterfaceClass = ReflectUtils.forName(classLoader, realInterface);
interfaces.add(realInterfaceClass);
} catch (Throwable e) {
// ignore
}
if (GenericService.class.equals(invoker.getInterface()) || !GenericService.class.isAssignableFrom(invoker.getInterface())) {
interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
}
}
interfaces.add(invoker.getInterface());
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
try {
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
} catch (Throwable t) {
if (generic) {
if (realInterfaceClass != null) {
interfaces.remove(realInterfaceClass);
}
interfaces.remove(invoker.getInterface());
logger.error("Error occur when creating proxy. Invoker is in generic mode. Trying to create proxy without real interface class.", t);
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
} else {
throw t;
}
}
}
这个地方收集到的 interfaces 如下图。第 2、3 个是 interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES)) 加进来的。
(打断点第一次拦截到)
(第二次拦截到)
最终调用 getProxy 方法,这里有 javassist 和 jdk 两种实现。
我还不清楚什么时机用哪个,这里默认是 javassist ,如果 javassist 失败了会尝试 jdk 代理。
JavassistProxyFactory#getProxy -> Proxy#getProxy
public static Proxy getProxy(Class<?>... ics) {
if (ics.length > MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}
// ClassLoader from App Interface should support load some class from Dubbo
ClassLoader cl = ics[0].getClassLoader();
ProtectionDomain domain = ics[0].getProtectionDomain();
// use interface class name list as key.
String key = buildInterfacesKey(cl, ics);
// get cache by class loader.
final Map<String, Proxy> cache;
synchronized (PROXY_CACHE_MAP) {
cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new ConcurrentHashMap<>());
}
Proxy proxy = cache.get(key);
if (proxy == null) {
synchronized (ics[0]) {
proxy = cache.get(key);
if (proxy == null) {
// create Proxy class.
proxy = new Proxy(buildProxyClass(cl, ics, domain));
cache.put(key, proxy);
}
}
}
return proxy;
}
这里的 buildProxyClass 方法可以看见用到了很多 javassist 的 API 做字节码增强,手动实现代理类 (orz,好强好强)
代理类构造完成后,利用反射 API 构造实例,然后返回。
客户端调用流程
现在问题来了,我们在客户端调用实例,为什么会调用远程服务呢?
肯定是代理类上动了手脚!那么动了什么手脚呢?
其实,我对这里 javassist 实现代理不太懂,但是感觉跟 jdk 代理应该是差不多的,二者都有一个 InvokerInvocationHandler,对 invoker 进行了封装。然后这个 InvokerInvocationHandler 继承了 jdk 的原生类 InvocationHandler。
当使用代理类时就会调用 InvocationHandler 的 invoker 方法。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果调用了 Object 类的方法则直接调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 如果不是一些常见方法的话,就是远程调用了
// 根据调用信息构建一个 RpcInvocation 对象
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
// 执行远程调用逻辑
return InvocationUtil.invoke(invoker, rpcInvocation);
}
InvocationUtil.invoke(invoker, rpcInvocation) 里面还会统计一些 metrics 信息。
InvocationUtil#invoke -> MigrationInvoker#invoke -> MockClusterInvoker#invoke -> AbstractCluster$ClusterFilterInvoker#invoke -> … -> TripleInvoker#doInvoke
中间经过了非常非常复杂的调用链路,在 TripleInvoker 调用前,会先经过 filter 链,然后各种 Invoker。(发现这条链路的方法是自下而上,预判会到这个位置,然后在 TripleInvoker#doInvoke 打个断点,然后就可以看到整个调用链路了。这样比起自上而下会简单很多。)
@Override
protected Result doInvoke(final Invocation invocation) {
if (!connection.isAvailable()) {
CompletableFuture<AppResponse> future = new CompletableFuture<>();
RpcException exception = TriRpcStatus.UNAVAILABLE.withDescription(
String.format("upstream %s is unavailable", getUrl().getAddress()))
.asException();
future.completeExceptionally(exception);
return new AsyncRpcResult(future, invocation);
}
ConsumerModel consumerModel = (ConsumerModel) (invocation.getServiceModel() != null
? invocation.getServiceModel() : getUrl().getServiceModel());
ServiceDescriptor serviceDescriptor = consumerModel.getServiceModel();
final MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(
invocation.getMethodName(),
invocation.getParameterTypes());
ClientCall call = new TripleClientCall(connection, streamExecutor,
getUrl().getOrDefaultFrameworkModel());
AsyncRpcResult result;
try {
switch (methodDescriptor.getRpcType()) {
case UNARY:
result = invokeUnary(methodDescriptor, invocation, call);
break;
case SERVER_STREAM:
result = invokeServerStream(methodDescriptor, invocation, call);
break;
case CLIENT_STREAM:
case BI_STREAM:
result = invokeBiOrClientStream(methodDescriptor, invocation, call);
break;
default:
throw new IllegalStateException("Can not reach here");
}
return result;
} catch (Throwable t) {
final TriRpcStatus status = TriRpcStatus.INTERNAL.withCause(t)
.withDescription("Call aborted cause client exception");
RpcException e = status.asException();
try {
call.cancelByLocal(e);
} catch (Throwable t1) {
LOGGER.error("Cancel triple request failed", t1);
}
CompletableFuture<AppResponse> future = new CompletableFuture<>();
future.completeExceptionally(e);
return new AsyncRpcResult(future, invocation);
}
}
AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation,
ClientCall call) {
ExecutorService callbackExecutor = getCallbackExecutor(getUrl(), invocation);
int timeout = calculateTimeout(invocation, invocation.getMethodName());
invocation.setAttachment(TIMEOUT_KEY, timeout);
final AsyncRpcResult result;
DeadlineFuture future = DeadlineFuture.newFuture(getUrl().getPath(),
methodDescriptor.getMethodName(), getUrl().getAddress(), timeout, callbackExecutor);
RequestMetadata request = createRequest(methodDescriptor, invocation, timeout);
final Object pureArgument;
if (methodDescriptor.getParameterClasses().length == 2
&& methodDescriptor.getParameterClasses()[1].isAssignableFrom(
StreamObserver.class)) {
StreamObserver<Object> observer = (StreamObserver<Object>) invocation.getArguments()[1];
future.whenComplete((r, t) -> {
if (t != null) {
observer.onError(t);
return;
}
if (r.hasException()) {
observer.onError(r.getException());
return;
}
observer.onNext(r.getValue());
observer.onCompleted();
});
pureArgument = invocation.getArguments()[0];
result = new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()),
invocation);
} else {
if (methodDescriptor instanceof StubMethodDescriptor) {
pureArgument = invocation.getArguments()[0];
} else {
pureArgument = invocation.getArguments();
}
result = new AsyncRpcResult(future, invocation);
result.setExecutor(callbackExecutor);
FutureContext.getContext().setCompatibleFuture(future);
}
ClientCall.Listener callListener = new UnaryClientCallListener(future);
final StreamObserver<Object> requestObserver = call.start(request, callListener);
requestObserver.onNext(pureArgument);
requestObserver.onCompleted();
return result;
}
即使这里是 Unary 调用方式,也会涉及到 StreamObserver,通过 requestObserver.onNext 来发送数据,然后异步返回。
如果是同步调用,那么会在 AbstractInvoker#waitForResultIfSync 中根据等待返回(有超时时间)。
接下来探究一下 StreamObserver 是什么东西。
StreamObserver 探究
ClientCall.Listener callListener = new UnaryClientCallListener(future);
final StreamObserver<Object> requestObserver = call.start(request, callListener);
requestObserver.onNext(pureArgument);
requestObserver.onCompleted();
return result;
这是 triple 协议远程调用 (unary) 的最后面的一段代码。
这里的 call#start 返回了一个 StreamObserver 对象,我们重点关注一下。
@Override
public StreamObserver<Object> start(RequestMetadata metadata,
ClientCall.Listener responseListener) {
this.requestMetadata = metadata;
this.listener = responseListener;
this.stream = new TripleClientStream(frameworkModel, executor, connection.getChannel(),
this);
return new ClientCallToObserverAdapter<>(this);
}
这里返回了一个 ClientCallToObserverAdapter,ClientCallToObserverAdapter 的继承关系图如下。
那么后续调用 onNext 方法时,就是调用 ClientCallToObserverAdapter 实现的 onNext 方法。
@Override
public void onNext(Object data) {
if (terminated) {
throw new IllegalStateException(
"Stream observer has been terminated, no more data is allowed");
}
call.sendMessage(data);
}
接下来看一看 TripleClientCall#sendMessage
@Override
public void sendMessage(Object message) {
if (canceled) {
throw new IllegalStateException("Call already canceled");
}
if (!headerSent) {
headerSent = true;
stream.sendHeader(requestMetadata.toHeaders());
}
final byte[] data;
try {
data = requestMetadata.packableMethod.packRequest(message);
int compressed =
Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding())
? 0 : 1;
final byte[] compress = requestMetadata.compressor.compress(data);
stream.sendMessage(compress, compressed, false)
.addListener(f -> {
if (!f.isSuccess()) {
cancelByLocal(f.cause());
}
});
} catch (Throwable t) {
LOGGER.error(String.format("Serialize triple request failed, service=%s method=%s",
requestMetadata.service,
requestMetadata.method), t);
cancelByLocal(t);
listener.onClose(TriRpcStatus.INTERNAL.withDescription("Serialize request failed")
.withCause(t), null);
}
}
// stream listener end
首先,如果还没发送 header,就先发送 header。
然后,这里将 message 转化成了一个 byte 数组,然后进行压缩处理。
然后调用一个 TripleClientStream 对象的 sendMessage 方法发送数据。
@Override
public ChannelFuture sendMessage(byte[] message, int compressFlag, boolean eos) {
final DataQueueCommand cmd = DataQueueCommand.createGrpcCommand(message, false,
compressFlag);
return this.writeQueue.enqueue(cmd)
.addListener(future -> {
if (!future.isSuccess()) {
cancelByLocal(
TriRpcStatus.INTERNAL.withDescription("Client write message failed")
.withCause(future.cause())
);
transportException(future.cause());
}
}
);
}
这里先将数据封装成一个 DataQueueCommand 对象,然后放入 writeQueue 队列。
然后这个队列里面的任务会异步执行,因为在 enqueue 方法的最后会执行 scheduleFlush 方法,执行 flush 操作。
public void scheduleFlush() {
if (scheduled.compareAndSet(false, true)) {
channel.eventLoop().execute(this::flush);
}
}
private void flush() {
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
// 见下面 QueuedCommand#run 方法
cmd.run(channel);
i++;
if (i == DEQUE_CHUNK_SIZE) {
i = 0;
channel.flush();
flushedOnce = true;
}
}
if (i != 0 || !flushedOnce) {
channel.flush();
}
} finally {
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush();
}
}
}
// QueuedCommand#run
public void run(Channel channel) {
if (channel.isActive()) {
channel.write(this, promise);
} else {
promise.trySuccess();
}
}
往 channel 中写数据后,flush channel。
流调用原理探究(包含反压实现)
Dubbo3 的 Triple 协议有三种调用模式:unary、server_stream (服务端流)、bi_stream (双向流) 。有了上面的基础后,看 Stream RPC 这块就比较简单了。
AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, Invocation invocation,
ClientCall call) {
RequestMetadata request = createRequest(methodDescriptor, invocation, null);
// 多了一个 responseObserver
StreamObserver<Object> responseObserver = (StreamObserver<Object>) invocation.getArguments()[1];
final StreamObserver<Object> requestObserver = streamCall(call, request, responseObserver);
requestObserver.onNext(invocation.getArguments()[0]);
requestObserver.onCompleted();
return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);
}
这里跟上面 unary 方式不同之处是:从第二个参数取了 responseObserver。这样,在客户端实现一个 StreamObserver,然后作为第二个参数传给方法即可对服务端流进行消费。比如官网给的demo:
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
然后生产者就可以发消息。
@Override
public void greetServerStream(String request, StreamObserver<String> response) {
for (int i = 0; i < 10; i++) {
response.onNext("hello," + request);
}
response.onCompleted();
}
所以,我们发现流调用跟普通调用的区别在使用了 streamCall。
StreamObserver<Object> streamCall(ClientCall call,
RequestMetadata metadata,
StreamObserver<Object> responseObserver) {
if (responseObserver instanceof CancelableStreamObserver) {
final CancellationContext context = new CancellationContext();
((CancelableStreamObserver<Object>) responseObserver).setCancellationContext(context);
context.addListener(context1 -> call.cancelByLocal(new IllegalStateException("Canceled by app")));
}
ObserverToClientCallListenerAdapter listener = new ObserverToClientCallListenerAdapter(
responseObserver);
return call.start(metadata, listener);
}
在 streamCall 中,将消费者用户定义的 responseObserver (StreamObserver) 封装成了 ObserverToClientCallListenerAdapter(与 unray 的不同之处),然后调用 TripleClientCall 的 start 方法。
@Override
public StreamObserver<Object> start(RequestMetadata metadata,
ClientCall.Listener responseListener) {
this.requestMetadata = metadata;
this.listener = responseListener;
this.stream = new TripleClientStream(frameworkModel, executor, connection.getChannel(),
this);
return new ClientCallToObserverAdapter<>(this);
}
接下来看一下 TripleClientStream 的构造方法。
public TripleClientStream(FrameworkModel frameworkModel,
Executor executor,
Channel parent,
ClientStream.Listener listener) {
super(executor, frameworkModel);
this.parent = parent;
this.listener = listener;
this.writeQueue = createWriteQueue(parent);
}
这里有个很重要的方法就是 createWriteQueue。
private WriteQueue createWriteQueue(Channel parent) {
final Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
final Future<Http2StreamChannel> future = bootstrap.open().syncUninterruptibly();
if (!future.isSuccess()) {
throw new IllegalStateException("Create remote stream failed. channel:" + parent);
}
final Http2StreamChannel channel = future.getNow();
channel.pipeline()
.addLast(new TripleCommandOutBoundHandler())
.addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));
channel.closeFuture()
.addListener(f -> transportException(f.cause()));
return new WriteQueue(channel);
}
在这里给 channel 加上了 TripleHttp2ClientResponseHandler
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
transportListener.onHeader(headers.headers(), headers.isEndStream());
} else if (msg instanceof Http2DataFrame) {
final Http2DataFrame data = (Http2DataFrame) msg;
transportListener.onData(data.content(), data.isEndStream());
} else {
super.channelRead(ctx, msg);
}
}
这是 TripleHttp2ClientResponseHandler#channelRead0,对于 Http2HeadersFrame 和 Http2DataFrame 进行不同的处理。
-
Header
@Override public void onHeader(Http2Headers headers, boolean endStream) { executor.execute(() -> { if (endStream) { if (!halfClosed) { writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL), true); } onTrailersReceived(headers); } else { onHeaderReceived(headers); } }); }
void onHeaderReceived(Http2Headers headers) { if (transportError != null) { transportError.appendDescription("headers:" + headers); return; } if (headerReceived) { transportError = TriRpcStatus.INTERNAL.withDescription("Received headers twice"); return; } Integer httpStatus = headers.status() == null ? null : Integer.parseInt(headers.status().toString()); if (httpStatus != null && Integer.parseInt(httpStatus.toString()) > 100 && httpStatus < 200) { // ignored return; } headerReceived = true; transportError = validateHeaderStatus(headers); // todo support full payload compressor CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader()); if (null != messageEncoding) { String compressorStr = messageEncoding.toString(); if (!Identity.IDENTITY.getMessageEncoding().equals(compressorStr)) { DeCompressor compressor = DeCompressor.getCompressor(frameworkModel, compressorStr); if (null == compressor) { throw TriRpcStatus.UNIMPLEMENTED.withDescription(String.format( "Grpc-encoding '%s' is not supported", compressorStr)).asException(); } else { decompressor = compressor; } } } TriDecoder.Listener listener = new TriDecoder.Listener() { @Override public void onRawMessage(byte[] data) { TripleClientStream.this.listener.onMessage(data); } public void close() { finishProcess(statusFromTrailers(trailers), trailers); } }; deframer = new TriDecoder(decompressor, listener); TripleClientStream.this.listener.onStart(); }
总之就是进行了一系列初始化工作,然后定义了一个 TriDecoder.Listener,当 listener 的 onRawMessage 方法被调用时,会调用 TripleClientStream.this.listener.onMessage(data)。
最后调用了 TripleClientStream.this.listener.onStart()。
这个地方的 TripleClientStream.this.listener 就是 stream rpc 与 unary 的不同之处。
unary 传进来的 listener 是 UnaryClientCallListener,两种 stream rpc 传进来的是 ObserverToClientCallListenerAdapter(对消费者定义的 StreamObserver 的封装)。
// ObserverToClientCallListenerAdapter#onStart @Override public void onStart(ClientCall call) { this.call = call; if (call.isAutoRequest()) { call.request(1); } }
// UnaryClientCallListener#onStart @Override public void onStart(ClientCall call) { future.addTimeoutListener( () -> call.cancelByLocal(new IllegalStateException("client timeout"))); call.request(2); }
这个地方 call#request 会去调用 netty 的 CompositeByteBuf(零拷贝相关) 去接收 n 个消息。具体的代码在 TriDecoder#deliver 中。
private void deliver() { // We can have reentrancy here when using a direct executor, triggered by calls to // request more messages. This is safe as we simply loop until pendingDelivers = 0 if (inDelivery) { return; } inDelivery = true; try { // Process the uncompressed bytes. while (pendingDeliveries > 0 && hasEnoughBytes()) { switch (state) { case HEADER: processHeader(); break; case PAYLOAD: // Read the body and deliver the message. processBody(); // Since we've delivered a message, decrement the number of pending // deliveries remaining. pendingDeliveries--; break; default: throw new AssertionError("Invalid state: " + state); } } if (closing) { if (!closed) { closed = true; accumulate.clear(); accumulate.release(); listener.close(); } } } finally { inDelivery = false; } }
-
Data
当收到消息时会进入onData
@Override public void onData(ByteBuf data, boolean endStream) { executor.execute(() -> { if (transportError != null) { transportError.appendDescription( "Data:" + data.toString(StandardCharsets.UTF_8)); ReferenceCountUtil.release(data); if (transportError.description.length() > 512 || endStream) { handleH2TransportError(transportError); } return; } if (!headerReceived) { handleH2TransportError(TriRpcStatus.INTERNAL.withDescription( "headers not received before payload")); return; } deframer.deframe(data); }); }
这里调用了 deframer.deframe(data) 。**这里的 accumulate 是 CompositeByteBuf 类型,用来处理零拷贝。**这里的 addComponent 方法就是用来动态增加 ByteBuf 。
@Override public void deframe(ByteBuf data) { if (closing || closed) { // ignored return; } accumulate.addComponent(true, data); deliver(); }
总结为下图(可能有点乱),如果是流的方式,则会不断拉数据。pendingDeliveries 会在 1 - 2 之间不断跳动(不会变成 0),最总流结束后,调用 TriDecoder#close 结束调用。
这里其实跟反压很像。(这个是不是就是dubbo实现的反压?)
unary 其实也类似,不过在 onData 时不会增加 pendingDeliveries。
bi-stream 也类似,与 serverStream 不同的是返回的是一个 StreamObserver,然后用户可以直接通过 onNext 向生产者发送数据。
客户端是这样发的,但是服务端是怎么接收的呢?
这个问题是前面遗留的,主要逻辑在生产者的 TripleHttp2FrameServerHandler 中。
惯例看 channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else if (msg instanceof ReferenceCounted) {
// ignored
ReferenceCountUtil.release(msg);
}
}
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
final TripleServerStream tripleServerStream = ctx.channel().attr(SERVER_STREAM_KEY)
.get();
tripleServerStream.transportObserver.onData(msg.content(), msg.isEndStream());
}
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
TripleServerStream tripleServerStream = new TripleServerStream(ctx.channel(),
frameworkModel, executor,
pathResolver, acceptEncoding, filters);
ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);
tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream());
}
发现其实 server 端接收跟 client 端接收是差不多的,都是利用 TriDecoder 的相关方法实现消息的接收。
另外,如果我们使用 stub 来调用,那么会调用自动生成的 DubboXXXXTriple 类中的方法,然后调用 StubInvocationUtil -> InvocationUtil 实现远程调用。
总结
由于 Triple协议 是建立在 http2 的基础上,因此天然具备了 stream 和 全双工 的能力。
通过Triple协议的Streaming RPC方式,会在consumer跟provider之间建立多条用户态的长连接,Stream。同一个TCP连接之上能同时存在多个Stream,其中每条Stream都有StreamId进行标识,对于一条Stream上的数据包会以顺序方式读写。
参考
[1] https://blog.csdn.net/superfjj/article/details/121507952