Spring WebFlux 高级实战(3-1)
1、响应式基础
Spring 框架的整个基础设施都是围绕Servlet API 构建的,它们之间紧密耦合。因此在开始深入响应式Web 之前,先回顾一下Web 模块的设计:
底层 Servlet 容器负责处理容器内的所有映射Servlet。DispatchServlet 作为一个集成点,用于集成灵活且高度可配置的Spring Web基础设施和繁重且复杂的Servlet API。HandlerMapping将业务逻辑与Servlet API 分离。Spring MVC的缺点:
- 不允许在整个请求声明周期中出现非阻塞操作。没有开箱即用的非阻塞HTTP客户端。
- WebMVC 抽象不能支持非阻塞 Servlet 3.1 的所有功能。
- 对于非 Servlet 服务器,重用 Spring Web 功能或变成模块不灵活。
因此,Spring团队在过去几年中的核心挑战,就是如何构建一个新的解决方案,以在使用基于注解的编程模型的同时,提供异步非阻塞服务的所有优势。
1.1、响应式 Web 内核
响应式Web内核首先需要使用模拟接口和对请求进行处理的方法替换 javax.servlet.Servlet.service 方法。更改相关的类和接口,增强和定制 Servlet API 对客户端请求和服务器响应的交互方式。
/**
* 请求的封装。
* 获取请求报文体的类型是Flux,表示具备响应式能力。
* DataBuffer是针对字节缓冲区的抽象,便于对特定服务器实现数据交换。
* 除了请求报文体,还有消息头、请求路径、cookie、查询参数等信息,可以在该接口或子接口中提供。
*/
interface ServerHttpRequest {
// ...
Flux<DataBuffer> getBody();
// ...
}
/**
* 响应的封装。
* writeWith方法接收的参数是Publisher,提供了响应式,并与特定响应式库解耦。
* 返回值是Mono<Void>,表示向网络发送数据是一个异步的过程。
* 即,只有当订阅Mono时才会执行发送数据的过程。
* 接收服务器可以根据传输协议的流控支持背压。
*/
interface ServerHttpResponse {
// ...
Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
// ...
}
/**
* HTTP请求-响应的容器。
* 这是高层接口,除了HTTP交互,还可以保存框架相关信息。
* 如请求的已恢复的WebSession信息等。
*
*/
interface ServerWebExchange {
// ...
ServerHttpRequest getRequest();
ServerHttpResponse getResponse();
// ...
Mono<WebSession> getSession();
// ...
}
上述三个接口类似于Servlet API 中的接口。响应式接口旨在从交互模型的角度提供几乎相同的方法,同时提供开箱即用的响应式。请求的处理程序和过滤器 API,如下所示:
/**
* 对应于WebMVC中的DispatcherServlet
* 查找请求的处理程序,使用视图解析器渲染视图,因此handle方法不需要返回任何结果。
*
* 返回值Mono<Void>提供了异步处理。
* 如果在指定的时间内没有信号出现,可以取消执行。
*/
interface WebHandler {
Mono<Void> handle(ServerWebExchange exchange);
}
/**
* 过滤器链
*/
interface WebFilterChain {
Mono<Void> filter(ServerWebExchange exchange);
}
/**
* 过滤器
*/
interface WebFilter {
Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain);
}
以上是响应式Web应该具备的基础API。还需要为这些接口适配不同的服务器。即与ServerHttpRequest和ServerHttpResponse进行直接交互的组件。同时负责ServerWebExchange的构建,特定的会话存储、本地化解析器等信息的保存。
public interface HttpHandler {
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
通过该适当的抽象,隐藏了服务器引擎的细节,具体服务器的工作方式对 Spring WebFlux 用户不重要。
1.2、响应式 Web 和 MVC 框架
Spring Web MVC 模块的关键特性 基于注解。因此,需要为响应式Web 栈提供相同的概念。重用 WebMVC 的基础设施,用 Flux 、 Mono 和 Publisher 等响应式类型替换同步通信。保留与 Spring Web MVC 相同的 HandlerMapping 和 HandlerAdapter 链,使用基于 Reactor 的响应式交互替换实时命令:
interface HandlerMapping {
/*
HandlerExecutionChain getHandler(HttpServletRequest request)
*/
Mono<Object> getHandler(ServerWebExchange exchange);
}
interface HandlerAdapter {
boolean supports(Object handler);
/*
ModelAndView handle(HttpServletRequest request, HttpServletResponse
response, Object handler);
*/
Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
响应式 HandlerMapping 中,两个方法整体上类似,不同之处在于响应式返回Mono 类型支持响应式。
响应式HandlerAdapter 接口中,由于 ServerWebExchange 类同时组合了请求和响应,因此 handle 方法的响应式版本更简洁。该方法返回 HandlerResult 的 Mono 而不是 ModelAndView 。
遵循这些步骤,我们将得到一个响应式交互模型,而不会破坏整个执行层次结构,从而可以保留现有设计并能以最小的更改重用现有代码。最后设计:
- 传入请求,由底层服务器引擎处理。服务器引擎列表不限于基于ServletAPI 的服务器。每个服务器引擎都有自己的响应式适配器,将 HTTP 请求和 HTTP 响应的内部表示映射到 ServerHttpRequest 和 ServerHttpResponse 。
- HttpHandler 阶段,该阶段将给定的 ServerHttpRequest 、 ServerHttpResponse 、用户 Session 和相关信息组合到 ServerWebExchage 实例中。
- WebFilterChain 阶段,它将定义的 WebFilter 组合到链中。然后, WebFilterChain 会负责执行此链中每个 WebFilter 实例的 WebFilter#filter 方法,以过滤传入的 ServerWebExchange 。
- 如果满足所有过滤条件, WebFilterChain 将调用 WebHandler 实例。
- 查找 HandlerMapping 实例并调用第一个合适的实例。可以是RouterFunctionMapping、也可以是RequestMappingHandlerMapping 和HandlerMapping 资源。RouterFunctionMapping,引入到WebFlux 之中,超越了纯粹的功能请求处理。
- RequestMappingHandlerAdapter 阶段,与以前功能相同,使用响应式流来构建响应式流。
在WebFlux 模块中,默认服务器引擎是Netty。Netty 服务器很适合作为默认服务器,因为它广泛用于响应式领域。该服务器引擎还同时提供客户端和服务器异步非阻塞交互。同时,可以灵活地选择服务器引擎。WebFlux 模块对应了Spring Web MVC 模块的体系结构,很容易理解。
1.3、基于 WebFlux 的纯函数式 Web
完整示例源码https://blnp.lanzouj.com/i091m2etmyif
1.3.1、项目搭建配置
纯函数式Web主要是函数式路由映射。通过函数式映射,可以生成轻量级应用。示例如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.blnp.net</groupId>
<artifactId>reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactor-demo</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-crypto</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<!--数据库支持-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.3.2、实体类创建
package com.blnp.net.reactor.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/7 11:21
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private String id;
}
1.3.3、数据库接口服务
package com.blnp.net.reactor.repository;
import com.blnp.net.reactor.entity.Order;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/7 11:20
*/
@Repository
public interface OrderRepository {
/**
* 用途:查询
* @author liaoyibin
* @since 11:23 2024/11/7
* @params [id]
* @param id
* @return reactor.core.publisher.Mono<com.blnp.net.reactor.entity.Order>
**/
Mono<Order> findById(String id);
/**
* 用途:新增
* @author liaoyibin
* @since 11:23 2024/11/7
* @params [order]
* @param order
* @return reactor.core.publisher.Mono<com.blnp.net.reactor.entity.Order>
**/
Mono<Order> save(Order order);
/**
* 用途:删除
* @author liaoyibin
* @since 11:24 2024/11/7
* @params [id]
* @param id
* @return reactor.core.publisher.Mono<java.lang.Void>
**/
Mono<Void> deleteById(String id);
}
注意:当前我使用的工程里并没有写与数据库表操作的相关功能,所以实际调用执行的时候会抛出异常:Invalid bound statement (not found):
1.3.4、请求处理器
package com.blnp.net.reactor.handler;
import com.blnp.net.reactor.entity.Order;
import com.blnp.net.reactor.repository.OrderRepository;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.net.URI;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/7 11:19
*/
@Service
public class OrderHandler {
final OrderRepository orderRepository;
public OrderHandler(OrderRepository repository) {
orderRepository = repository;
}
public Mono<ServerResponse> create(ServerRequest request) {
return request
.bodyToMono(Order.class)
.flatMap(orderRepository::save)
.flatMap(o ->
ServerResponse.created(URI.create("/orders/" + o.getId()))
.build()
);
}
public Mono<ServerResponse> get(ServerRequest request) {
return orderRepository
.findById(request.pathVariable("id"))
.flatMap(order ->
ServerResponse
.ok()
.syncBody(order)
)
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> list(ServerRequest request) {
return null;
}
}
1.3.5、路由配置
package com.blnp.net.reactor;
import com.blnp.net.reactor.handler.OrderHandler;
import com.blnp.net.reactor.handler.ServerRedirectHandler;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@SpringBootApplication(scanBasePackages = "com.blnp.net.reactor")
@MapperScan("com.blnp.net.reactor.repository")
public class ReactorDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorDemoApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(OrderHandler handler) {
//包含两个参数:1,测试条件是否通过,如果通过,则路由到第二个参数指定的路由函数
return nest(
//判断请求路径是否匹配指定的模式,(请求路径前缀)
path("/orders"),
//如果匹配通过,则路由到该路由函数
nest(
//判断请求的报文头字段accept是否匹配APPLICATION_JSON媒体类型
accept(APPLICATION_JSON),
//如果匹配,则路由到下面的路由函数:将/orders/{id}路由给handler的get方法
route(GET("/{id}"),handler::get)
//如果是get请求/orders,则路由到handler的list
.andRoute(method(HttpMethod.GET),handler::list)
//如果是POST请求/orders,则路由到handler的create
.andNest(contentType(APPLICATION_JSON),route(POST("/"), handler::create))
)
);
}
public RouterFunction<ServerResponse> cusRoutes(ServerRedirectHandler serverRedirectHandler) {
return nest((serverRequest) -> serverRequest.cookies()
.containsKey("Redirect-Traffic"), route(all(), serverRedirectHandler)
);
}
}
create 方法接收ServerRequest(函数式路由请求类型)。ServerRequest可以将请求体手动映射到Mono 或Flux。该API 还可以指定请求体应映射的类。最后,WebFlux 中的函数式附加功能提供了一个API,使用ServerResponse 类的流式API 构建响应。
我们可以看到,除函数式路由声明的API 之外,我们还有一个用于请求和响应处理的函数式API。同时,函数式Web 框架允许在不启动整个Spring 基础设施的情况下构建Web 应用程序。如下案例:
实体类:
package com.blnp.net.reactor.entity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/11 20:03
*/
@Data
public class PasswordDto {
private String raw;
private String secured;
@JsonCreator
public PasswordDto(@JsonProperty("raw") String raw,
@JsonProperty("secured") String secured) {
this.raw = raw;
this.secured = secured;
}
}
服务配置:
package com.blnp.net.reactor;
import com.blnp.net.reactor.entity.PasswordDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/11 20:01
*/
public class StandaloneApplication {
static Logger LOGGER = LoggerFactory.getLogger(StandaloneApplication.class);
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 调用routes 方法,然后将RouterFunction 转换为HttpHandler。
HttpHandler httpHandler = RouterFunctions.toHttpHandler(
// BCrypt算法进行18 轮散列,这可能需要几秒钟来编码/匹配
routes(new BCryptPasswordEncoder(18)
));
// 内置HttpHandler适配器
ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
// 创建HttpServer实例,它是ReactorNetty API的一部分。
DisposableServer server = HttpServer.create()
//配置host
.host("localhost")
//配置端口
.port(8087)
//指定handler
.handle(reactorHttpHandler)
//调用bindNow启动服务器
.bindNow();
LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms");
// 为了使应用程序保持活动状态,阻塞主Thread 并监听服务器的处理事件
server.onDispose().block();
}
/**
* 用途:路由配置
* @author liaoyibin
* @since 20:03 2024/11/11
* @params [passwordEncoder]
* @param passwordEncoder
* @return org.springframework.web.reactive.function.server.RouterFunction<org.springframework.web.reactive.function.server.ServerResponse>
**/
public static RouterFunction<ServerResponse> routes(PasswordEncoder passwordEncoder) {
return
// 使用/ check 路径处理任何POST 方法的请求
route(POST("/password"),
request -> request
.bodyToMono(PasswordDto.class)
// 使用PasswordEncoder检查已加密密码的原始密码
.map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
// 如果密码与存储的密码匹配
// 则ServerResponse 将返回OK状态(200)
// 否则,返回EXPECTATION_FAILED(417)
.flatMap(isMatched -> isMatched
? ServerResponse.ok()
.build()
: ServerResponse.status(HttpStatus.EXPECTATION_FAILED)
.build()
)
);
}
}
这种Web 应用程序的好处是它的启动时间要短得多。
非 Standalone服务:
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
package com.blnp.net.reactor.handler;
import com.blnp.net.reactor.entity.PasswordDto;
import org.springframework.http.HttpStatus;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/11 20:10
*/
@Component
public class PasswordHandlerFunction implements HandlerFunction {
private BCryptPasswordEncoder passwordEncoder = new BCryptPasswordEncoder(18);
@Override
public Mono<Void> handle(ServerRequest request) {
return request.bodyToMono(PasswordDto.class)
.map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
.flatMap(isMatched -> isMatched
? ServerResponse.ok()
.build()
: ServerResponse.status(HttpStatus.EXPECTATION_FAILED)
.build()).then(Mono.empty());
}
}
package com.blnp.net.reactor;
import com.blnp.net.reactor.handler.OrderHandler;
import com.blnp.net.reactor.handler.PasswordHandlerFunction;
import com.blnp.net.reactor.handler.ServerRedirectHandler;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@SpringBootApplication(scanBasePackages = "com.blnp.net.reactor")
@MapperScan("com.blnp.net.reactor.repository")
public class ReactorDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorDemoApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(OrderHandler handler) {
//包含两个参数:1,测试条件是否通过,如果通过,则路由到第二个参数指定的路由函数
return nest(
//判断请求路径是否匹配指定的模式,(请求路径前缀)
path("/orders"),
//如果匹配通过,则路由到该路由函数
nest(
//判断请求的报文头字段accept是否匹配APPLICATION_JSON媒体类型
accept(APPLICATION_JSON),
//如果匹配,则路由到下面的路由函数:将/orders/{id}路由给handler的get方法
route(GET("/{id}"),handler::get)
//如果是get请求/orders,则路由到handler的list
.andRoute(method(HttpMethod.GET),handler::list)
//如果是POST请求/orders,则路由到handler的create
.andNest(contentType(APPLICATION_JSON),route(POST("/"), handler::create))
)
);
}
@Bean
public static RouterFunction<ServerResponse>
routes(PasswordHandlerFunction handlerFunction) {
return route(POST("/password"), handlerFunction);
}
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity serverHttpSecurity) {
return serverHttpSecurity.csrf()
.disable()
.build();
}
public RouterFunction<ServerResponse> cusRoutes(ServerRedirectHandler serverRedirectHandler) {
return nest((serverRequest) -> serverRequest.cookies()
.containsKey("Redirect-Traffic"), route(all(), serverRedirectHandler)
);
}
}
通过切换到函数式路由声明:
- 可以在一个位置维护所有路由配置,并使用响应式方法对传入请求进行处理。
- 在访问传入的请求参数、路径变量和请求的其他重要组件方面,函数式路由的灵活性与基于注解的常规方法几乎相同。
- 函数式路由不但能避免运行整个Spring 框架基础设施,并且在路由设置方面同样灵活,让应用程序的启动更快。
1.4、基于WebClient的非阻塞跨服务通信
从本质上讲, WebClient 是旧 RestTemplate 的响应式替代品。WebClient 中有一个函数式API,并提供内置的到 Project Reactor 类型(如 Flux 或 Mono )的映射。以下示例:
// 创建的时候指定基础URI
WebClient.create("http://localhost/api")
// 指定请求方法:GET
.get()
// 指定相对URI,并对URI变量进行扩展
// 还可以指定消息头、cookie 和请求主体。
.uri("/users/{id}", userId)
// 指定结果的处理方式
.retrieve()
// 将响应体进行反序列化
.bodyToMono(User.class)
// 进行其他操作
.map(...)
// 订阅触发异步执行,发起远程调用。这里只使用了订阅的副作用。
.subscribe();
最常见的响应处理是处理消息主体,在某些情况下需要处理响应状态、消息头或cookie。构建一个对密码检查服务的调用,并使用WebClient API以自定义方式处理响应状态:
import org.junit.jupiter.api.Test;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.test.StepVerifier;
import java.time.Duration;
public class StandaloneTest {
@Test
public void checkApplicationRunning() {
BCryptPasswordEncoder encoder = new BCryptPasswordEncoder(18);
DefaultPasswordVerificationService service =
new DefaultPasswordVerificationService(WebClient.builder());
StepVerifier.create(service.check("test", encoder.encode("test")))
.expectSubscription()
.expectComplete()
.verify(Duration.ofSeconds(30));
}
}
import reactor.core.publisher.Mono;
public interface PasswordVerificationService {
Mono<Void> check(String raw, String encoded);
}
import org.springframework.http.ResponseEntity;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import static org.springframework.http.HttpStatus.EXPECTATION_FAILED;
public class DefaultPasswordVerificationService
implements PasswordVerificationService {
final WebClient webClient;
public DefaultPasswordVerificationService(WebClient.Builder webClientBuilder) {
webClient = webClientBuilder
.baseUrl("http://localhost:8080").build();
}
@Override
public Mono < Void > check(String raw, String encoded) {
return WebClient.create("http://localhost:8080").post()
.uri("/password")
.body(BodyInserters.fromPublisher(
Mono.just(new PasswordDTO(raw, encoded)),
PasswordDTO.class
))
.retrieve()
.toEntityFlux(ResponseEntity.class)
.flatMap(response -> {
if (response.getStatusCode().is2xxSuccessful()) {
return Mono.empty();
} else if (response.getStatusCode() ==
EXPECTATION_FAILED) {
return Mono.error(new BadCredentialsException("Invalid credentials"));
}
return Mono.error(new IllegalStateException());
});
}
}
在需要处理公共HTTP 响应的状态码、消息头、cookie 和其他内部数据的情况下,最合适的是exchange 方法,该方法返回ClientResponse。如前文所述,DefaultWebClient 使用ReactorNetty HttpClient 来提供与远程服务器的异步和非阻塞交互。
但是,DefaultWebClient 旨在轻松更改底层HTTP 客户端。为此,出现了一个名为org.springframework.http.client.reactive.ClientHttpConnector 的针对HTTP 连接的低级别响应式抽象。默认情况下,DefaultWebClient 预先配置为使用ReactorClientHttpConnector,而这是ClientHttpConnector 接口的实现。
从SpringWebFlux 5.1 开始,JettyClientHttpConnector 实现出现,它使用Jetty 中的响应式 HttpClient。为了更改底层HTTP 客户端引擎,我们可以使用WebClient.Builder#clientConnector 方法并传递所需的实例,该实例既可以是自定义实现,也可以是现有实例。
WebClient.builder().clientConnector(new JettyClientHttpConnector())
.build()
.get()
.uri("http://localhost:8080/password/{id}", "ABOP8UOFDSA")
.retrieve()
.toEntityFlux(ClientResponse.class)
.subscribe();
除了有用的抽象层,ClientHttpConnector 还可以以原始格式的方式使用。例如,它可以用于下载大文件、即时处理或简单的字节扫描。
1.5、响应式模板引擎
Spring 5.x 和 WebFlux 模块已经放弃支持包括Apache Velocity 在内的许多技术。Spring WebFlux 与 Web MVC 拥有相同的视图渲染技术。以下示例展示了一种指定渲染视图的常用方法:
@RequestMapping("/")
public String index() {
return "index";
}
模板渲染过程中如何支持响应式方法?考虑一个涉及渲染大型音乐播放列表的案例:
@RequestMapping("/play-list-view-ftl")
public Mono < String > getPlaylist(final Model model) {
List < Song > songs = new ArrayList < > ();
Song song = null;
for (int i = 0; i < 5; i++) {
song = new Song("曲目" + i, "张三" + i, "1001" + i, "专辑1" + (i %
3));
songs.add(song);
}
final Flux < Song > playlistStream = Flux.fromIterable(songs);
return playlistStream
.collectList()
.doOnNext(list -> model.addAttribute("playList", list))
.then(Mono.just("/freemarker/play-list-view"));
}
正如上述示例中所示,使用了一个响应式类型 Mono<String> ,以便异步返回视图名称。另外,我们的模板有一个占位符dataSource,它应该由给定Song 的列表填充。提供特定于上下文数据的常用方法是定义Model,并在其中放置所需的属性。
FreeMarker 不支持数据的响应式呈现和非阻塞呈现,必须将所有歌曲收集到列表中并将收集的数据全部放入Model 中。
src/main/resources/templates/freemarker/play-list-view.ftl:
<!DOCTYPE html>
<html>
<body>
<head>
<meta charset="UTF-8" />
<title>曲目列表-freemarker</title>
</head>
<table border="1">
<thead>
</thead>
<tbody>
<#list playList as e>
<tr>
<td>${e.id}</td>
<td>${e.name}</td>
<td>${e.artist}</td>
<td>${e.album}</td>
</tr>
</#list>
</tbody>
</table>
</body>
</html>
src/main/java/com/lagou/webflux/demo/config/WebConfig.java:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.ViewResolverRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.result.view.freemarker.FreeMarkerConfigurer;
@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
@Override
public void configureViewResolvers(ViewResolverRegistry registry) {
// 使用".ftl"后缀注册一个FreeMarkerViewResolver
registry.freeMarker();
}
@Bean
public FreeMarkerConfigurer freeMarkerConfigurer() {
FreeMarkerConfigurer configurer = new FreeMarkerConfigurer();
// 设置模板路径
configurer.setTemplateLoaderPath("classpath:/templates");
return configurer;
}
}
渲染这些模板是一项CPU 密集型操作。如果我们有一个庞大的数据集,执行该操作可能需要一些时间和内存。
- Thymeleaf 支持响应式WebFlux,并为异步和流模板渲染提供更多可能性。
- Thymeleaf 提供与FreeMarker 类似的功能,并允许编写相同的代码来呈现UI。
- Thymeleaf 能够将响应式类型用作模板内的数据源,并在流中的新元素可用时呈现模板的一部分。
以下示例展示了在处理请求期间如何将响应式流与Thymeleaf 一起使用:
@RequestMapping("/play-list-view-thy")
public String view(final Model model) {
List < Song > songs = new ArrayList < > ();
Song song = null;
for (int i = 0; i < 10; i++) {
song = new Song("曲目" + i, "张三" + i, "1001" + i, "专辑1" + (i % 3));
songs.add(song);
}
final Flux < Song > playlistStream = Flux.fromIterable(songs);
model.addAttribute(
"playList",
new ReactiveDataDriverContextVariable(playlistStream, 1, 1)
);
return "thymeleaf/play-list-view";
}
ReactiveDataDriverContextVariable 用于创建懒加载上下文变量,封装异步响应式数据流,并指定缓冲区大小(单位是消息个数),第一个SSE事件ID。
接收的响应式类型,如Publisher、Flux、Mono、Observable 和由ReactiveAdapterRegistry类支持的其他响应式类型。响应式支持需要流使用额外的类包装器,模板端不需要任何更改。以下示例展示了如何使用与处理普通集合类似的方式处理响应式流:
src/main/resources/templates/thymeleaf/play-list-view.html:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<body>
<head>
<meta charset="UTF-8" />
<title>曲目列表</title>
</head>
<table border="1">
<thead>
</thead>
<tbody>
<tr th:each="e : ${playList}">
<td th:text="${e.id}">...</td>
<td th:text="${e.name}">...</td>
<td th:text="${e.artist}">...</td>
<td th:text="${e.album}">...</td>
</tr>
</tbody>
</table>
</body>
</html>
生成一个表,带有一些表头和一个正文。该表是由 Song 条目的 playList 和它们的信息构成的行所填充的。
Thymeleaf 的渲染引擎开始将数据流传输到客户端,而不必等待最后一个元素被发射。它支持渲染无限的元素流。这可以通过添加对 Transfer-Encoding:chunked 的支持来实现。Thymeleaf 不会渲染内存中的整个模板,而会首先渲染可用的部分,然后在新元素可用时以块的形式异步发送模板的其余部分。
1.6、响应式 Web 安全
1.6.1、准备
Spring Web的SpringSecurity 模块通过在任何控制器和Web 处理程序调用之前提供Filter 来设置安全的Web应用程序。为了支持响应式和非阻塞交互,Spring Security的响应式栈,使用WebFilter并依赖Project Reactor上下文。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
SecurityProfileController.java
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/v1")
public class SecurityProfileController {
private final ProfileService profileService;
public SecurityProfileController(ProfileService profileService) {
this.profileService = profileService;
}
@GetMapping("/profiles")
public Mono < Profile > getProfile() {
return ReactiveSecurityContextHolder
.getContext() // 访问当前的SecurityContext
.map(SecurityContext::getAuthentication) // 从SecurityContext获取认证信息
.flatMap(auth -> profileService.getByUser(auth.getName()));
// 访问用户个人信息
}
}
Profile.java
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Getter
public class Profile {
private String name;
private String desc;
}
ProfileService.java
import reactor.core.publisher.Mono;
public interface ProfileService {
Mono < Profile > getByUser(String name);
}
DefaultProfileService.java
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class DefaultProfileService implements ProfileService {
@Override
public Mono < Profile > getByUser(String name) {
return Mono.just(new Profile(name, "这是【" + name + "】的描述信息。"));
}
}
SecurityConfiguration.java
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.web.server.SecurityWebFilterChain;
@SpringBootConfiguration
// 该注解导入所需的配置,以启用特定的带注解MethodInterceptor
@EnableReactiveMethodSecurity
public class SecurityConfiguration {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity httpSecurity) {
return httpSecurity
.formLogin() // 配置基于表单的认证
.and() // 接着向下配置
.authorizeExchange() // 配置授权
.anyExchange() // 禁用授权
.authenticated() // 需要有认证的用户
.and() // 接着向下配置
.build(); // 创建web安全过滤器链对象
}
@Bean
public ReactiveUserDetailsService userDetailsService() {
User.UserBuilder userBuilder = User.withDefaultPasswordEncoder();
UserDetails user = userBuilder.username("zhangsan")
.password("zs123")
.roles("emp")
.build();
UserDetails user1 = userBuilder.username("lisi")
.password("ls123")
.roles("mgr")
.build();
return new MapReactiveUserDetailsService(user, user1);
}
}
1.6.2、对SecurityContext 的响应式访问
使用Spring Security中的 ReactiveSecurityContextHolder 访问响应式SecurityContext。ReactiveSecurityContextHolder 的 getContext 方法返回 Mono<SecurityContext> 。
@GetMapping("/profiles")
@PreAuthorize("hasRole('emp')")
public Mono < Profile > getProfile() {
return ReactiveSecurityContextHolder
.getContext() // 访问当前的SecurityContext
.map(SecurityContext::getAuthentication) // 从SecurityContext获取认证信息
.flatMap(auth -> profileService.getByUser(auth.getName())); // 访问用户个人信息
}
@PreAuthorize 注解用于访问控制,检查Authentication 是否具有所需的角色。如果是响应式返回类型,则方法调用推迟,直到所需的 Authentication 解析完并存在所需的权限。
新的响应式API 类似于同步API。基于新一代的Spring Security,可以使用相同的注解来检查所需的权限。在内部, ReactiveSecurityContextHolder 依赖于Reactor Context API。有关登录用户的当前信息保存在Context 接口的实例中。如下源码:
可以使用subscriberContext访问内部Reactor Context。执行过程与获取存储(如数据库)的SecurityContext 有关,该过程仅在某人订阅给定的Mono 时执行。
尽管ReactiveSecurityContextHolder 的API 看起来很熟悉,但它隐藏了许多陷阱。例如,可能错误地遵循使用SecurityContextHolder 时的惯用法。这样一来,可能盲目地实现以下示例代码中描述的常见交互:
ReactiveSecurityContextHolder
.getContext()
.map(SecurityContext::getAuthentication)
.block();
就像以前从ThreadLocal 中获取SecurityContext 一样,可能尝试使用ReactiveSecurityContextHolder 执行相同的操作。但是,当调用 getContext 并使用 block 方法订阅流时,我们在流中配置的是一个空的上下文。
因此,一旦 ReactiveSecurityContextHodler 类尝试访问内部Context,就不会在那里找到可用的 SecurityContext。当我们正确连接流时,如何设置Context 并使其可访问?可以使用 Spring Security中的 ReactorContextWebFilter 。
在调用期间, ReactorContextWebFilter 使用subscriberContext 方法提供一个Reactor Context。此外, SecurityContext 的解析是通过 ServerSecurityContextRepository 来执行的。ServerSecurityContextRepository 有两个方法,分别是save 和load:
package org.springframework.security.web.server.context;
public interface ServerSecurityContextRepository {
Mono < Void > save(ServerWebExchange exchange, SecurityContext context);
Mono < SecurityContext > load(ServerWebExchange exchange);
}
如上代码,save 方法将 SecurityContext 与特定的 ServerWebExchange 关联,然后使用来自用户请求的 load 方法将其还原。
响应式访问意味着实际的 SecurityContext 可以存储在数据库中,因此解析存储的 SecurityContext 不需要阻塞操作。因为上下文解析的策略是惰性的,所以只有在订阅 ReactiveSecurityContextHolder.getContext() 时才会执行对底层存储的实际调用。
1.6.3、响应式安全启用
基于WebFlux 的安全性配置只需要声明少量bean。以下是我们如何执行此操作的参考示例:
@SpringBootConfiguration
// 该注解导入所需的配置,以启用特定的带注解MethodInterceptor
@EnableReactiveMethodSecurity
public class SecurityConfiguration {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity httpSecurity) {
return httpSecurity
.formLogin() // 配置基于表单的认证
.and() // 接着向下配置
.authorizeExchange() // 配置授权
.anyExchange() // 禁用授权
.authenticated() // 需要有认证的用户
.and() // 接着向下配置
.build(); // 创建web安全过滤器链对象
}
@Bean
public ReactiveUserDetailsService userDetailsService() {
User.UserBuilder userBuilder = User.withDefaultPasswordEncoder();
UserDetails user = userBuilder.username("zhangsan")
.password("zs123")
.roles("emp")
.build();
UserDetails user1 = userBuilder.username("lisi")
.password("ls123")
.roles("mgr")
.build();
return new MapReactiveUserDetailsService(user, user1);
}
}
为了启用特定的带注解的 MethodInterceptor ,必须添加 @EnableReactiveMethodSecurity 注解,导入所需的配置。Spring Security为我们提供了ServerHttpSecurity,它是一个带有流式API 的构建器。为了在默认的Spring Security 设置中对用户进行身份验证,必须提供 ReactiveUserDetailsService 的实现。
出于演示目的,提供了接口的内存实现,并配置一个测试用户以登录系统。如上述代码所示,Spring Security 的整体配置与之前的类似。
1.7、与其它响应式库的交互
WebFlux 使用Project Reactor 3 作为核心构建块,同时,WebFlux 也允许使用其他响应式库。为了实现跨库互操作性,WebFlux 中的大多数操作基于响应式流规范中的接口。通过这种方式,我们可以轻松地用 RxJava 2 或 Akka Streams 替换Reactor 3 所编写的代码。
curl -H "Content-Type: application/json" -d '张三' --request POST
http://localhost:8080/songs
核心代码如下:核心代码如下:
@PostMapping("/songs")
public Observable < Song > findAlbumByArtist(@RequestBody Mono < String >
nameMono) {
// 将Mono转换为Observable类型
Observable < String > observable = (Observable < String > )
adapterRegistry.getAdapter(Observable.class).fromPublisher(nameMono);
return observable.flatMap(new Function < String, ObservableSource < Song >> () {
@Override
public ObservableSource < Song > apply(String s) throws Exception {
return Observable.just(new Song("五环之歌", s));
}
});
}
首先,导入 RxJava 2 的 Observable 。findAlbumByArtists 接受一个 Mono<String> 类型的 Publisher 并返回 Observable<Song> 。声明将 artistsFlux 映射到 Observable<String> ,执行业务逻辑,并将结果返回给调用者。
响应式类型转换是 Spring Core 模块的一部分,并受 org.springframework.core.ReactiveAdapterRegistry 和 org.springframework.core.ReactiveAdapter 的支持。因此,通过使用该支持库,可以使用几乎任何响应式库,与Project Reactor 解耦。具体案例:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.20</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
AlbumsController.java
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Function;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class AlbumsController {
final ReactiveAdapterRegistry adapterRegistry;
public AlbumsController(ReactiveAdapterRegistry adapterRegistry) {
this.adapterRegistry = adapterRegistry;
}
@PostMapping("/songs")
public Observable < Song > findAlbumByArtist(@RequestBody Mono < String > nameMono) {
// 将Mono转换为Observable类型
Observable < String > observable = (Observable < String > )
adapterRegistry.getAdapter(Observable.class).fromPublisher(nameMono);
return observable.flatMap(new Function < String,ObservableSource < Song >> () {
@Override
public ObservableSource < Song > apply(String s) throws Exception {
return Observable.just(new Song("五环之歌", s));
}
});
}
}
Song.java
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public class Song {
private String name;
private String artistName;
}
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>
Title
</title>
<script type="text/javascript" src="jquery-1.9.1.js">
</script>
<script type="text/javascript">
$(function() {
$("#btn").click(function() {
if ($("#artistName").val() == '') {
alert("请填写歌手名称") $("#artistName").focus() return
}
$.ajax({
url: "/songs",
data: $("#artistName").val(),
contentType: "application/json;charset=utf-8",
cache: false,
method: "post",
success: function(data) {
alert(JSON.stringify(data))
},
error: function() {
alert("通信异常")
}
})
})
})
</script>
</head>
<body>
<input type="text" id="artistName" placeholder="请输入歌手名称">
<input id="btn" type="button" value="获取数据">
</body>
</html>
1.8、对比 SpringMVC
Spring MVC和Spring WebFlux并不是分立的。它们都扩展了开发的可用选项。两者设计的目标就是彼此的连续性和一致性,可以一起使用,发挥各自优势。下图展示了两者的联系和区别:
- 如果现存的项目是基于Spring MVC的并且没有问题,就别更改了。命令式编程开发、阅读、debug都是最简单的。同时可选择的库也很多,只不过大多数都是阻塞式的。
- 如果项目的技术栈是非阻塞的,则使用WebFlux可以使用与环境相同的模型来执行,WebFlux 也提供了服务器的选项(Netty、Tomcat、Jetty、Undertow以及Servlet 3.1及以上的容器),提供了编程模型的选项(基于注解的控制器和函数式web端点),以及响应式库的选项(Reactor、RxJava以及其他的)。
- 如果希望发挥java8 lambda或Kotlin的优势,使用轻量级、函数式的web框架,则可以使用Spring WebFlux函数式web端点的编程模型。Spring WebFlux非常适合小型的应用或没有复杂需求的微服务。
- 在微服务架构中,可以同时使用Spring WebFlux和Spring MVC,或者将Spring WebFlux作为函数式端点来使用。由于它们基于相同的注解编程模型,可以很方便的做到在正确的场合使用正确的工具。
- 一个简单的评估应用的方式是检查应用的依赖。如果使用的是阻塞式的持久化API(JPA,JDBC)或者阻塞式的网络API,Spring MVC基本上是最好的选择。技术上Reactor和RxJava也可以使用分立的线程支持阻塞式的操作,但无法发挥非阻塞web技术栈的全部优势。
- 如果Spring MVC应用需要调用远程服务,可以使用响应式的 WebClient 。可以让Spring MVC 控制器的方法直接返回响应式类型(Reactor、RxJava或其他的)数据。每个远程调用的延迟越大,各个远程调用之间的依赖越大,响应式模型的优势发挥的越明显。当然,Spring MVC 的控制器也可以调用其他响应式组件。
- 如果开发团队很大,就要考虑到转向非阻塞、函数式、声明式编程模型的陡峭的学习曲线。最佳实践是先使用响应式 WebClient 做部分转向。然后在小的模块中使用并评估响应式模型带来的优势。一般对于整个项目,没必要全部转向响应式模型。如果不确定响应式编程带来的优势,可以先学习一下非阻塞I/O的工作流程(例如单线程Node.js的并发)以及效果。
原文地址:https://blog.csdn.net/qq_37165235/article/details/143585592
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!