From 6802a5b2ecbc3cde65d3fe97bdd84e9ec42dc455 Mon Sep 17 00:00:00 2001 From: 86191 <2160251938@qq.com> Date: Sun, 29 Sep 2024 09:14:04 +0800 Subject: [PATCH] =?UTF-8?q?feat():saas=E7=B3=BB=E7=BB=9F=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/gateway/filter/AccessLogFilter.java | 226 ++++++++++++++++++ .../com/muyu/gateway/model/AccessLog.java | 120 ++++++++++ .../com/muyu/wechat/domain/AccessToken.java | 21 ++ 3 files changed, 367 insertions(+) create mode 100644 cloud-gateway/src/main/java/com/muyu/gateway/filter/AccessLogFilter.java create mode 100644 cloud-gateway/src/main/java/com/muyu/gateway/model/AccessLog.java create mode 100644 cloud-modules/cloud-modules-wechat/src/main/java/com/muyu/wechat/domain/AccessToken.java diff --git a/cloud-gateway/src/main/java/com/muyu/gateway/filter/AccessLogFilter.java b/cloud-gateway/src/main/java/com/muyu/gateway/filter/AccessLogFilter.java new file mode 100644 index 0000000..9ceec12 --- /dev/null +++ b/cloud-gateway/src/main/java/com/muyu/gateway/filter/AccessLogFilter.java @@ -0,0 +1,226 @@ +package com.muyu.gateway.filter; + +import cn.hutool.core.date.LocalDateTimeUtil; +import com.alibaba.nacos.common.utils.StringUtils; +import com.muyu.common.core.constant.SecurityConstants; +import com.muyu.gateway.model.AccessLog; +import com.muyu.gateway.utils.WebFrameworkUtils; +import lombok.extern.log4j.Log4j2; +import org.reactivestreams.Publisher; +import org.springframework.cloud.gateway.filter.GatewayFilterChain; +import org.springframework.cloud.gateway.filter.GlobalFilter; +import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage; +import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory; +import org.springframework.cloud.gateway.support.BodyInserterContext; +import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; +import org.springframework.core.Ordered; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpOutputMessage; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserter; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.server.HandlerStrategies; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.List; + + +/** + * 网关的访问日志过滤器 + *

+ *

+ * TODO 如果网关执行异常,不会记录访问日志,后续研究下 https://github.com/Silvmike/webflux-demo/blob/master/tests/src/test/java/ru/hardcoders/demo/webflux/web_handler/filters/logging + */ +@Log4j2 +@Component +public class AccessLogFilter implements GlobalFilter, Ordered { + + private final List> messageReaders = HandlerStrategies.withDefaults().messageReaders(); + + /** + * 打印日志 + * + * @param gatewayLog 网关日志 + */ + private void writeAccessLog(AccessLog gatewayLog) { + log.info("[网关日志:{}]", gatewayLog.toString()); + } + + @Override + public int getOrder() { + return -99; + } + + @Override + public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { + // 将 Request 中可以直接获取到的参数,设置到网关日志 + ServerHttpRequest request = exchange.getRequest(); + // TODO traceId + AccessLog accessLog = AccessLog.builder() + .userId(request.getHeaders().getFirst(SecurityConstants.DETAILS_USER_ID)) + .route(WebFrameworkUtils.getGatewayRoute(exchange)) + .schema(request.getURI().getScheme()) + .requestMethod(request.getMethod().name()) + .requestUrl(request.getURI().getRawPath()) + .queryParams(request.getQueryParams()) + .requestHeaders(request.getHeaders()) + .startTime(LocalDateTime.now()) + .userIp(WebFrameworkUtils.getClientIP(exchange)) + .build(); + + // 继续 filter 过滤 + MediaType mediaType = request.getHeaders().getContentType(); + return MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType) + ? + filterWithRequestBody(exchange, chain, accessLog) + : + filterWithoutRequestBody(exchange, chain, accessLog); + } + + private Mono filterWithoutRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog accessLog) { + // 包装 Response,用于记录 Response Body + ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog); + return chain.filter(exchange.mutate().response(decoratedResponse).build()) + .then(Mono.fromRunnable(() -> writeAccessLog(accessLog))); // 打印日志 + } + + /** + * 参考 {@link ModifyRequestBodyGatewayFilterFactory} 实现 + *

+ * 差别主要在于使用 modifiedBody 来读取 Request Body 数据 + */ + private Mono filterWithRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog gatewayLog) { + // 设置 Request Body 读取时,设置到网关日志 + ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders); + Mono modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { + gatewayLog.setRequestBody(body); + return Mono.just(body); + }); + + // 创建 BodyInserter 对象 + BodyInserter, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); + // 创建 CachedBodyOutputMessage 对象 + HttpHeaders headers = new HttpHeaders(); + headers.putAll(exchange.getRequest().getHeaders()); + // the new content type will be computed by bodyInserter + // and then set in the request decorator + headers.remove(HttpHeaders.CONTENT_LENGTH); // 移除 + CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); + // 通过 BodyInserter 将 Request Body 写入到 CachedBodyOutputMessage 中 + return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { + // 包装 Request,用于缓存 Request Body + ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage); + // 包装 Response,用于记录 Response Body + ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog); + // 记录普通的 + return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build()) + .then(Mono.fromRunnable(() -> writeAccessLog(gatewayLog))); // 打印日志 + + })); + } + + /** + * 记录响应日志 + * 通过 DataBufferFactory 解决响应体分段传输问题。 + */ + private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, AccessLog gatewayLog) { + ServerHttpResponse response = exchange.getResponse(); + return new ServerHttpResponseDecorator(response) { + + @Override + public Mono writeWith(Publisher body) { + if (body instanceof Flux) { + DataBufferFactory bufferFactory = response.bufferFactory(); + // 计算执行时间 + gatewayLog.setEndTime(LocalDateTime.now()); + gatewayLog.setDuration((int) (LocalDateTimeUtil.between(gatewayLog.getStartTime(), + gatewayLog.getEndTime()).toMillis())); + // 设置其它字段 +// gatewayLog.setUserId(SecurityFrameworkUtils.getLoginUserId(exchange)); + gatewayLog.setResponseHeaders(response.getHeaders()); + gatewayLog.setHttpStatus(response.getStatusCode()); + + // 获取响应类型,如果是 json 就打印 + String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); + if (StringUtils.isNotBlank(originalResponseContentType) + && originalResponseContentType.contains("application/json")) { + Flux fluxBody = Flux.from(body); + return super.writeWith(fluxBody.buffer().map(dataBuffers -> { + // 设置 response body 到网关日志 + byte[] content = readContent(dataBuffers); + String responseResult = new String(content, StandardCharsets.UTF_8); + gatewayLog.setResponseBody(responseResult); + + // 响应 + return bufferFactory.wrap(content); + })); + } + } + // if body is not a flux. never got there. + return super.writeWith(body); + } + }; + } + + // ========== 参考 ModifyRequestBodyGatewayFilterFactory 中的方法 ========== + + /** + * 请求装饰器,支持重新计算 headers、body 缓存 + * + * @param exchange 请求 + * @param headers 请求头 + * @param outputMessage body 缓存 + * @return 请求装饰器 + */ + private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { + return new ServerHttpRequestDecorator(exchange.getRequest()) { + + @Override + public HttpHeaders getHeaders() { + long contentLength = headers.getContentLength(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(super.getHeaders()); + if (contentLength > 0) { + httpHeaders.setContentLength(contentLength); + } else { + httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); + } + return httpHeaders; + } + + @Override + public Flux getBody() { + return outputMessage.getBody(); + } + }; + } + + // ========== 参考 ModifyResponseBodyGatewayFilterFactory 中的方法 ========== + + private byte[] readContent(List dataBuffers) { + // 合并多个流集合,解决返回体分段传输 + DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); + DataBuffer join = dataBufferFactory.join(dataBuffers); + byte[] content = new byte[join.readableByteCount()]; + join.read(content); + // 释放掉内存 + DataBufferUtils.release(join); + return content; + } + +} diff --git a/cloud-gateway/src/main/java/com/muyu/gateway/model/AccessLog.java b/cloud-gateway/src/main/java/com/muyu/gateway/model/AccessLog.java new file mode 100644 index 0000000..b52eb3e --- /dev/null +++ b/cloud-gateway/src/main/java/com/muyu/gateway/model/AccessLog.java @@ -0,0 +1,120 @@ +package com.muyu.gateway.model; + +import com.muyu.common.core.utils.StringUtils; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.springframework.cloud.gateway.route.Route; +import org.springframework.http.HttpStatusCode; +import org.springframework.util.MultiValueMap; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * 网关的访问日志 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AccessLog { + + /** + * 链路追踪编号 + */ + private String traceId; + /** + * 用户编号 + */ + private String userId; + + /** + * 路由 + * + * 类似 ApiAccessLogCreateReqDTO 的 applicationName + */ + private Route route; + + /** + * 协议 + */ + private String schema; + /** + * 请求方法名 + */ + private String requestMethod; + /** + * 访问地址 + */ + private String requestUrl; + /** + * 查询参数 + */ + private MultiValueMap queryParams; + /** + * 请求体 + */ + private String requestBody; + /** + * 请求头 + */ + private MultiValueMap requestHeaders; + /** + * 用户 IP + */ + private String userIp; + + /** + * 响应体 + * + * 类似 ApiAccessLogCreateReqDTO 的 resultCode + resultMsg + */ + private String responseBody; + /** + * 响应头 + */ + private MultiValueMap responseHeaders; + /** + * 响应结果 + */ + private HttpStatusCode httpStatus; + + /** + * 开始请求时间 + */ + private LocalDateTime startTime; + /** + * 结束请求时间 + */ + private LocalDateTime endTime; + /** + * 执行时长,单位:毫秒 + */ + private Integer duration; + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE) + .append("请求简略信息", + StringUtils.format("[userId:[{}]-userIp:[{}]-traceId:[{}]] ---结果--- {{}-{}}:{} ---> {}",userId, userIp, traceId, schema, requestMethod, requestUrl,httpStatus) + ) + .append("路由", route) + .append("查询参数", queryParams) + .append("请求体", requestBody) + .append("请求头", requestHeaders) + .append("响应体", responseBody) + .append("响应头", responseHeaders) + .append("耗时/时间", + StringUtils.format( + "{}MS-{}-{}", + duration, + startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), + endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))) + ) + .toString(); + } +} diff --git a/cloud-modules/cloud-modules-wechat/src/main/java/com/muyu/wechat/domain/AccessToken.java b/cloud-modules/cloud-modules-wechat/src/main/java/com/muyu/wechat/domain/AccessToken.java new file mode 100644 index 0000000..de5ac21 --- /dev/null +++ b/cloud-modules/cloud-modules-wechat/src/main/java/com/muyu/wechat/domain/AccessToken.java @@ -0,0 +1,21 @@ +package com.muyu.wechat.domain; + +import lombok.Data; + +/** + * @ClassDescription: + * @JdkVersion: 1.8 + * @Author: YZL + * @Created: 2024/9/19 20:26 + */ +@Data +public class AccessToken { + + private String access_token; + + private Long expires_in; + public void setExpiresTime(Long expiresIn) { + this.expires_in = System.currentTimeMillis() + expiresIn * 1000; + } + +}