目录

Apache Shenyu源码阅读计划(二)HTTP请求梳理

目录

就利用上一篇文章的例子,打上断点调试。

发一个请求,结果出现如下日志:

2021-06-12 16:15:16 [shenyu-netty-kqueue-3] INFO  org.apache.shenyu.plugin.base.AbstractShenyuPlugin - context_path selector success match , selector name :/context-path/http
2021-06-12 16:15:16 [shenyu-netty-kqueue-3] INFO  org.apache.shenyu.plugin.base.AbstractShenyuPlugin - context_path rule success match , rule name :/context-path/http
2021-06-12 16:15:16 [shenyu-netty-kqueue-3] INFO  org.apache.shenyu.plugin.base.AbstractShenyuPlugin - divide selector success match , selector name :/http
2021-06-12 16:15:16 [shenyu-netty-kqueue-3] INFO  org.apache.shenyu.plugin.base.AbstractShenyuPlugin - divide rule success match , rule name :/http/order/save
2021-06-12 16:15:16 [shenyu-netty-kqueue-3] INFO  org.apache.shenyu.plugin.httpclient.WebClientPlugin - The request urlPath is http://192.168.1.108:8189/order/save, retryTimes is 0

发现了比较重要的AbstractShenyuPlugin,于是进入AbstractShenyuPlugin,发现了一个execute()方法,注释说这个方法是用来处理web请求的。

于是给execute打断点,再发一个请求。

public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
    // 获取插件名字
  	String pluginName = named();
  	// 根据插件名获取PluginData
    PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
    // 判断当前插件是否可用,不可用则跳过
  	if (pluginData != null && pluginData.getEnabled()) {
      	// 获取SelectorData集合
        final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
      	// 若没有Selector,则直接跳过这个插件
        if (CollectionUtils.isEmpty(selectors)) {
            return handleSelectorIfNull(pluginName, exchange, chain);
        }
      	// 根据exchange从selectors中选出SelectorData,exchange内容示例见下方截图
        SelectorData selectorData = matchSelector(exchange, selectors);
        if (Objects.isNull(selectorData)) {
            return handleSelectorIfNull(pluginName, exchange, chain);
        }
      	// 打日志
        selectorLog(selectorData, pluginName);
      	// 获取规则列表
        List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
        if (CollectionUtils.isEmpty(rules)) {
            return handleRuleIfNull(pluginName, exchange, chain);
        }
        RuleData rule;
      	// TOLOOK
        if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
            //get last
            rule = rules.get(rules.size() - 1);
        } else {
            rule = matchRule(exchange, rules);
        }
        if (Objects.isNull(rule)) {
            return handleRuleIfNull(pluginName, exchange, chain);
        }
      	// 打日志
        ruleLog(rule, pluginName);
      	// 传入exhcange、selector信息、rule信息,具体处理请求。
        return doExecute(exchange, chain, selectorData, rule);
    }
    return chain.execute(exchange);
}

exchange内容如下图所示,目前还不太清楚是干啥的,暂时放一放。

https://img.jooks.cn/img/20210612171226.png

接下来到了ContextPathPlugin的doExchange()方法,具体来看一看。

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
  	// 从exchange中取出ShenyuContext,TOLOOK
    ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
    assert shenyuContext != null;
  	// 获取ContextMappingHandle(Context mapping thread handle)
    ContextMappingHandle contextMappingHandle = ContextPathRuleHandleCache.getInstance().obtainHandle(CacheKeyUtils.INST.getKey(rule));
    if (Objects.isNull(contextMappingHandle) || StringUtils.isBlank(contextMappingHandle.getContextPath())) {
        log.error("context path rule configuration is null :{}", rule);
        return chain.execute(exchange);
    }
    // 构建context路径和真实url
    buildContextPath(shenyuContext, contextMappingHandle);
    return chain.execute(exchange);
}

buildContextPath()方法具体内容如下:

private void buildContextPath(final ShenyuContext context, final ContextMappingHandle handle) {
    String realURI = "";
    if (StringUtils.isNoneBlank(handle.getContextPath())) {
        context.setContextPath(handle.getContextPath());
        context.setModule(handle.getContextPath());
      	// 原来的contextPath是'/http/order/save',经过下面这一步就变成了'/order/save',变成了realURI
        realURI = context.getPath().substring(handle.getContextPath().length());
    } else {
        if (StringUtils.isNoneBlank(handle.getAddPrefix())) {
            realURI = handle.getAddPrefix() + context.getPath();
        }
    }
    context.setRealUrl(realURI);
    if (StringUtils.isNoneBlank(handle.getRealUrl())) {
        log.info("context path replaced old :{} , real:{}", context.getRealUrl(), handle.getRealUrl());
        context.setRealUrl(handle.getRealUrl());
    }
}

这个方法执行完之后,继续回到插件调用链。。。(这里的context保存在了exchange的attributes中,猜测后面会统一执行插件?)

然后跟上面类似,到了DividePlugin的doExcute()方法。

后面再仔细看DividePlugin插件的doExcute()

然后接着就不知道看哪里了。于是在ShenYuWebHandler.execute()这里打一个断点,因为这里是目前所知的最早执行的程序。通过函数调用栈发现前一个执行的是ShenYuWebHandler.handle()

public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
    Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
    if (scheduled) {
        return execute.subscribeOn(scheduler);
    }
    return execute;
}

可以发现这里新建了一个DefaultShenyuPluginChain,且把插件集合给放进构造方法。

以下都是spring的内容。

然后在通过函数调用栈往前看,发现是DefaultWebFilterChain.filter()方法。

public Mono<Void> filter(ServerWebExchange exchange) {
   return Mono.defer(() ->
         this.currentFilter != null && this.chain != null ?
               invokeFilter(this.currentFilter, this.chain, exchange) :
               this.handler.handle(exchange));
}

再往上是FilteringWebHandler.handle()方法。

再往上是WebHandlerDecorator.handle()

再往上是ExceptionHandlingWebHandler.handle()

再往上是HttpWebHandlerAdapter.handle()

public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
   if (this.forwardedHeaderTransformer != null) {
      request = this.forwardedHeaderTransformer.apply(request);
   }
   // 这里应该是根据request和response生成exchange
   ServerWebExchange exchange = createExchange(request, response);

   LogFormatUtils.traceDebug(logger, traceOn ->
         exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
               (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

   return getDelegate().handle(exchange)
         .doOnSuccess(aVoid -> logResponse(exchange))
         .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
         .then(Mono.defer(response::setComplete));
}

再往上是ReactiveWebServerApplicationContext.handle()

public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
   return this.handler.handle(request, response);
}

再往上是ReactorHttpHandlerAdapter.apply()

再往上是HttpServerHandle.onStateChange()

再往上是TcpServerBind.ChildObserver.onStateChange()

再往上是HttpServerOperations.onInboundNext(),这里开始进入Netty的内容(WebFlux是基于Netty实现的)

protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
   if (msg instanceof HttpRequest) {
      try {
         listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
      }
      catch (Exception e) {
         onInboundError(e);
         ReferenceCountUtil.release(msg);
         return;
      }
      if (msg instanceof FullHttpRequest) {
         super.onInboundNext(ctx, msg);
         if (isHttp2()) {
            onInboundComplete();
         }
      }
      return;
   }
   if (msg instanceof HttpContent) {
      if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
         super.onInboundNext(ctx, msg);
      }
      if (msg instanceof LastHttpContent) {
         onInboundComplete();
      }
   }
   else {
      super.onInboundNext(ctx, msg);
   }
}

算了,还是看不懂(爬。。。)

再往上走全是netty的东西,over。

梳理一下就是(搬运):

  • HttpServerOperations : 明显的netty的请求接收的地方,请求入口

  • TcpServerBind

  • HttpServerHandle

  • ReactorHttpHandlerAdapter :生成response和request

  • ReactiveWebServerApplicationContext

  • HttpWebHandlerAdapter :exchange 的生成

  • ExceptionHandlingWebHandler

  • WebHandlerDecorator

  • FilteringWebHandler

  • DefaultWebFilterChain

  • SoulWebHandler :plugins调用链

  • DividePlugin :plugin具体处理

下面再来看一下之前没仔细看的DividePlugin.doExcute()

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
  	// 取得shenyuContext
    ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
    assert shenyuContext != null;
  	// 根据rule生成ruleHandle
    DivideRuleHandle ruleHandle = UpstreamCacheManager.getInstance().obtainHandle(CacheKeyUtils.INST.getKey(rule));
    long headerSize = 0;
  	// 下面计算header的字节总长
    for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
        for (String value : multiHeader) {
            headerSize += value.getBytes(StandardCharsets.UTF_8).length;
        }
    }
    if (headerSize > ruleHandle.getHeaderMaxSize()) {
        log.error("request header is too large");
        Object error = ShenyuResultWrap.error(ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE.getCode(), ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
        log.error("request entity is too large");
        Object error = ShenyuResultWrap.error(ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE.getCode(), ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
  	// 获取上游api列表
    List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
    if (CollectionUtils.isEmpty(upstreamList)) {
        log.error("divide upstream configuration error: {}", rule.toString());
        Object error = ShenyuResultWrap.error(ShenyuResultEnum.CANNOT_FIND_URL.getCode(), ShenyuResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
    if (Objects.isNull(divideUpstream)) {
        log.error("divide has no upstream");
        Object error = ShenyuResultWrap.error(ShenyuResultEnum.CANNOT_FIND_URL.getCode(), ShenyuResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    // set the http url
    String domain = buildDomain(divideUpstream);
    String realURL = buildRealURL(domain, shenyuContext, exchange);
    exchange.getAttributes().put(Constants.HTTP_URL, realURL);
    // set the http timeout
    exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
    exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
    return chain.execute(exchange);
}

然后我想知道到底是哪里发了请求QAQ,继续往下看。

哦对,之前日志打印的最后一行是:INFO org.apache.shenyu.plugin.httpclient.WebClientPlugin - The request urlPath is http://192.168.1.108:8189/order/save, retryTimes is 0

所以直接去看WebClientPlugin

public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
    final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
    assert shenyuContext != null;
    String urlPath = exchange.getAttribute(Constants.HTTP_URL);
    if (StringUtils.isEmpty(urlPath)) {
        Object error = ShenyuResultWrap.error(ShenyuResultEnum.CANNOT_FIND_URL.getCode(), ShenyuResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
    int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
    log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
    // 判断是哪种http请求,比如GET/POST/...
  	HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
    WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
    // 重点来了
  	return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}

继续跟进handleRequestBody()方法。

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                     final ServerWebExchange exchange,
                                     final long timeout,
                                     final int retryTimes,
                                     final ShenyuPluginChain chain) {
    return requestBodySpec.headers(httpHeaders -> {
        httpHeaders.addAll(exchange.getRequest().getHeaders());
        httpHeaders.remove(HttpHeaders.HOST);
    })
            .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
            .exchange()
            .doOnError(e -> log.error(e.getMessage(), e))
            .timeout(Duration.ofMillis(timeout))
            .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                .retryMax(retryTimes)
                .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
            .flatMap(e -> doNext(e, exchange, chain));

}

这里就是向真实的uri发起了请求。

除此之外,shenyu-plugin-httpclient模块下还有一个WebClientResponsePlugin这个类

https://img.jooks.cn/img/20210612223935.png

这个类据说是(TO CONFIRM)用来向客户端返回response的,下面看看它的execute方法。

public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
    return chain.execute(exchange).then(Mono.defer(() -> {
        ServerHttpResponse response = exchange.getResponse();
        ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
        if (Objects.isNull(clientResponse)
                || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
            Object error = ShenyuResultWrap.error(ShenyuResultEnum.SERVICE_RESULT_ERROR.getCode(), ShenyuResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
            Object error = ShenyuResultWrap.error(ShenyuResultEnum.SERVICE_TIMEOUT.getCode(), ShenyuResultEnum.SERVICE_TIMEOUT.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        response.setStatusCode(clientResponse.statusCode());
        response.getCookies().putAll(clientResponse.cookies());
        response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
        return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers())).doOnCancel(() -> clean(exchange));
    }));
}

总结

一路下来,其实有很多细节没有深入探讨。

但是梳理了一下HTTP从打到shenyu网关,再到shenyu去请求上游API,再把结果返回回去的整个流程,发现ShenYu比较核心的部分其实是shenyu-plugin模块。(感觉)

另外发现了官网上一篇对divide插件的解读,里面提到了主机探活机制,这是我没发现的地方:https://dromara.org/zh/blog/soul_source_learning_16_divide_sxj/