自学内容网 自学内容网

基于trace_id实现SpringCloudGateway网关的链路追踪

之前写的两篇关于基于 trace_id 的链路追踪的文章:

一、引言

在之前的文章中,我们讨论了基于 trace_id 的链路追踪的常见场景。然而,最近我意识到在微服务架构中,我们还缺少对一个非常常见场景的探讨:在网关中如何处理 trace_id,尤其是在WebFlux Reactor 异步模式下的处理。因此,我决定记录下这些思考和解决方案。

二、具体场景

在Spring Cloud Gateway网关中,我们需要实现请求访问日志的打印功能,以便更好地排查问题。具体的实现方式包括两个全局过滤器:

  • TraceIdGlobalFilter:实现 trace_id 全局拦截(先执行)。
  • AccessLogGlobalFilter:实现请求访问日志的打印(后执行)。

在正常情况下,这两个过滤器可以打印请求的 request 日志和 response 日志,并且日志中都包含相同的 trace_id。然而,在开发调试过程中,我发现了一种异常情况:request 日志中总能打印出 trace_id,而 response 日志中则有时能打印出 trace_id,有时却不能。这导致了 request 日志和 response 日志无法关联的问题。

三、分析

1. 为什么 response 日志没有打印 trace_id
通过分析日志,我发现打印 response 日志的线程与打印 request 日志的线程并不是同一个线程。基于此,我们可以判断,trace_id 没有传递到打印 response 日志的线程中。

2. 为什么 trace_id 没有传递到打印 response 日志的线程中?
我们知道 Spring Cloud Gateway 是基于 WebFlux Reactor 异步模式实现的,因此一个请求的 request response 可能由不同的线程来执行。在 TraceIdGlobalFilter 中,我们使用了 MDC来传递 trace_id。然而,MDC 在普通的多线程环境中有效,但在 Reactor 异步模式下并不起作用。这是因为 Reactor 异步模式需要通过另外一种方式来传递 trace_id

四、解决方案

在 WebFlux Reactor 异步模式下,我们需要使用 reactor.util.context.Context 来传递 trace_id。核心逻辑如下:
透传 trace_id 通过 Mono.contextWrite(context) 往 context 中设置 trace_id
取出 trace_id 通过 Flux.deferContextual(context) 从 context 中获取 trace_id
具体实现代码示例如下:

// 设置 trace_id 
Mono.contextWrite(context -> context.put("trace_id", traceId));

// 获取 trace_id
Flux.deferContextual(context -> {
    String traceId = context.get("trace_id");
    // 可将 traceId 设置到MDC中供当前线程使用
    return Flux.just(traceId);
});

通过这种方式,我们可以确保 trace_id 在整个请求处理链路中都能被正确传递和使用,解决了 request 日志和 response 日志断联的问题。

五、具体代码

TraceIdGlobalFilter

/**
 * trace_id 全局拦截器
 */
@Slf4j
@Component
public class TraceIdGlobalFilter implements GlobalFilter, Ordered {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();

        String traceId = request.getHeaders().getFirst(TraceConsts.TRACE_ID);

        // trace_id
        traceId = MdcUtil.attachTraceId(traceId);

        // 将traceId传递给下游微服务
        String finalTraceId = traceId;
        Consumer<HttpHeaders> headersConsumer = httpHeaders -> {
            httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId);
        };
        ServerHttpRequest requestNew = exchange.getRequest().mutate().headers(headersConsumer).build();

        return chain.filter(exchange.mutate().request(requestNew).build())
                .doFinally(s -> {
                    // 清除MDC
                    MdcUtil.detachTraceId();
                });
    }

    @Override
    public int getOrder() {
        return -100;
    }

}

AccessLogGlobalFilter


/**
 * 请求访问日志 全局拦截器
 */
@Slf4j
@Component
public class AccessLogGlobalFilter implements GlobalFilter, Ordered {

    /**
     * gateway access log 日志开关
     * <p>
     * 特别注意:高并发业务场景下,可以关闭日志来提升性能
     */
    @Value("${com.gateway.access.log.enabled:true}")
    private boolean logEnabled;

    private final HandlerStrategies handlerStrategies = HandlerStrategies.withDefaults();

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        ServerHttpRequest httpRequest = exchange.getRequest();
        // 日志开关,直接进入下一个Filter
        if (!logEnabled) {
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                stopWatch.stop();
                // 为了方便排查问题,还是打印一个简单的日志
                if (log.isDebugEnabled()) {
                    log.debug("请求参数 [{}] [{}] query:{}, time: {} ms", httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis());
                }
            }));
        }

        // Request 处理
        ServerRequest request = ServerRequest.create(exchange, handlerStrategies.messageReaders());

        // header 参数
        HttpHeaders httpHeaders = request.headers().asHttpHeaders();

        // 是否为文件上传,若是文件上传,则不打印body
        boolean isFile = null != httpHeaders.getContentType() && AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString());

        // response 包装
        ServerHttpResponseDecorator responseDecorator = responseDecoratorAndRecordLog(exchange, stopWatch);

        if (isFile) {
            // 打印请求日志
            this.reqLog(request, isFile, null);

            // 执行过滤器
            return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build())
                    // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它
                    // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id
                    .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));
        }

        Mono<String> modifiedBody = request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body -> {
            // 打印请求日志
            this.reqLog(request, isFile, body);
            return Mono.just(body);
        });

        // 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次
        // BodyInserters.fromPublisher 不支持文件上传,所以不能用
        BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());
        headers.remove(HttpHeaders.CONTENT_LENGTH);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);

        return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
            // request 包装
            ServerHttpRequestDecorator requestDecorator = requestDecorator(exchange, headers, outputMessage);

            // 执行过滤器
            return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())
                    // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它
                    // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id
                    .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));
        }));
    }

    @Override
    public int getOrder() {
        return -90;
    }

    /**
     * 打印 request log
     */
    private void reqLog(ServerRequest request, boolean isFile, String body) {
        // URL query 参数
        String queryString = request.uri().getRawQuery();

        // header 参数
        HttpHeaders headers = request.headers().asHttpHeaders();
        String headersParams = headersToString(headers);

        if (isFile) {
            if (log.isInfoEnabled()) {
                log.info("请求参数 [{}] [{}] query:{}, headers:{}", request.uri().getPath(), request.methodName(), queryString, headersParams);
            }
            return;
        }

        // request body 长度处理,避免太长,打印耗性能
        String requestBody = AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN);

        if (log.isInfoEnabled()) {
            log.info("请求参数 [{}] [{}] query:{}, headers:{}, body:{}", request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody);
        }

    }

    /**
     * 过滤headers,避免打印过多的日志
     */
    private String headersToString(HttpHeaders headers) {
        Map<String, String> map = new HashMap<String, String>();
        for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
            if (RequestParamUtil.containsHeader(entry.getKey())) {
                map.put(entry.getKey(), entry.getValue().toString());
            }
        }
        return JSON.toJSONString(map);
    }

    /**
     * Request装饰器,重新计算 headers
     */
    private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers,
                                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(super.getHeaders());
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }

    /**
     * Response装饰器,记录响应日志
     * <p>
     * 通过 DataBufferFactory 解决响应体分段传输问题。
     */
    private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) {
        ServerHttpResponse response = exchange.getResponse();
        DataBufferFactory bufferFactory = response.bufferFactory();
        return new ServerHttpResponseDecorator(response) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                stopWatch.stop();

                if (!(body instanceof Flux)) {
                    return super.writeWith(body);
                }
                // 获取响应类型
                String responseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);

                if (AccessLogUtil.isBinayBodyData(responseContentType)) {
                    if (log.isInfoEnabled()) {
                        log.info("响应参数: time {} ms", stopWatch.getTotalTimeMillis());
                    }
                    return super.writeWith(body);
                }

                // info及以上日志级别才做如下处理
                if (log.isInfoEnabled()) {
                    Flux<? extends DataBuffer> fluxBody = Flux.from(body).flatMap(dataBuffer -> Flux.deferContextual(context -> {
                        // webflux reactor 异步模式下:通过 deferContextual 取出context中的trace_id
                        MdcUtil.putTraceId(context.get(TraceConsts.TRACE_ID));
                        if (log.isDebugEnabled()) {
                            log.debug("spring cloud gateway webflux reactor 异步模式下,透传trace_id: {}", MdcUtil.getTraceId());
                        }
                        return Flux.just(dataBuffer);
                    })).doFinally(signalType -> {
                        // 清理掉trace_id
                        MdcUtil.removeTraceId();
                    });

                    return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
                        // 合并多个流集合,解决返回体分段传输
                        DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                        DataBuffer join = dataBufferFactory.join(dataBuffers);
                        byte[] content = new byte[join.readableByteCount()];
                        join.read(content);
                        // 释放掉内存
                        DataBufferUtils.release(join);

                        String responseBody = new String(content, StandardCharsets.UTF_8);

                        // response body 长度处理,避免太长,打印耗性能
                        responseBody = AccessLogUtil.fixFieldAndReplaceWhite(responseBody, AccessLogUtil.DEF_MAX_LEN);
                        log.info("响应参数: {}, time {} ms", responseBody, stopWatch.getTotalTimeMillis());

                        return bufferFactory.wrap(content);
                    }));
                }
                return super.writeWith(body);
            }
        };
    }
}

原文地址:https://blog.csdn.net/icansoicrazy/article/details/140572783

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!