Spring WebFlux 核心原理(2-2)
1、Project Reactor 核心
1.1、新建项目
新建maven项目,将Project Reactor作为依赖项添加到应用程序中:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.blnp.net</groupId>
<artifactId>reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactor-demo</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.2、响应式类型(Flux 和 Mono)
响应流规范只定义了4个接口,即:
- Publisher<T>
- Subscriber<T>
- Subscription
- Processor<T, R>
Project Reactor提供了 Publisher<T> 接口的实现,即 Flux<T> 和 Mono<T> 。
1.2.1、Flux
上图为将Flux流转换为另一个Flux流的示例。Flux定义了一个通用的响应式流,它可以产生零个、一个或多个元素,乃至无限元素。有如下公式:
onNext x 0..N [onError | onComplete]
如下代码生成一个简单的无限响应流:
Flux.range(1, 5).repeat();
该流重复产生1到5的数字(1,2,3,4,5,1,2。。。)。每个元素都无需完成整个流创建即可被转换和消费。订阅者可以随时取消订阅从而将无限流转换为有限流。收集无限流发出的所有元素会导致 OutOfMemoryException 。如下代码重现该问题:
public static void main(String[] args) {
// range操作符创建从[1到100]的整数序列
Flux.range(1, 100)
// repeat操作符在源流完成之后一次又一次地订阅源响应式流。
// repeat操作符订阅流操作符的结果、接收从1到100的元素以及onComplete信号,
// 然后再次订阅、接收,不断重复该过程
.repeat()
// 使用collectList操作符尝试将所有生成的元素收集到一个集合中。
// 由于是无限流,最终它会消耗所有内存,导致OOM。
.collectList()
// block操作符触发实际订阅并阻塞正在运行的线程,直到最终结果到达
// 当前场景不会发生,因为响应流是无限的。
.block();
}
1.2.2、Mono
上图为将Mono流转换为另一个Mono流的示例。与Flux相比,Mono类型定义了一个最多可以生成一个元素的流,可以通过如下公式表示:
onNext x 0..1 [onError | onComplete]
当应用程序API最多返回一个元素时,可以使用 Mono<T> 。它可以轻松替换 CompletableFuture<T> ,并提供相似的语义,只不过 CompletableFuture 在没有发出值的情况下无法正常完成。CompletableFuture 会立即开始处理,而Mono在订阅者出现之前什么也不做。
Mono类型不仅提供了大量的响应式操作符,还能够整合到更大的响应式工作流程中。当需要对已完成的操作通知客户端时,也可以使用Mono。此时,可以返回 Mono<Void> 类型并在处理完成时发出 onComplete() 信号,或者在发生异常时返回 onError() 。此时,我们不返回任何数据,而是发出通知信号,而该信号可以用作进一步计算的触发器。
Mono和Flux可以容易地相互“转换”。如: Flux<T>.collectList() 返回 Mono<List<T>> ,而 Mono<T>.flux() 返回 Flux<T> 。
1.2.3、RxJava 2 响应式类型
即使 RxJava 2.x 库和 Project Reactor 具有相同的基础,RxJava 2 还是有一组不同的响应式发布者。由于这两个库实现了相同的理念,包括响应式操作符、线程管理和错误处理,都非常相似。因此,或多或少熟悉其中一个库意味着同时熟悉了这两个库。
RxJava 1.x中最初只有 Observable 这一个响应式类型,之后又添加了 Single 和 Completable类型。在版本 2 中,该库具有以下响应式类型:
- Observable
- Flowable
- Single
- Maybe
- Completable
1、Observable
- 与 RxJava 1.x 的Observable语义几乎相同,但是,不接收 null 值。
- Observable 既不支持背压,也不实现 Publisher接口,所以它与响应式流规范不直接兼容。
- Observable 类型的开销小于 Flowable 类型。
- 它具有toFlowable 方法,可以通过应用用户选择的背压策略将流转换为 Flowable。
2、Flowable
- Flowable 类型是 Reactor Flux 类型的直接对应物。
- 实现了响应式流的 Publisher,可以应用在由 Project Reactor 实现的响应式工作流中,因为 API 消费 Publisher 类型的参数,而不是针对特定库的Flux 类型。
3、Single
- Single 类型表示生成且仅生成一个元素的流。
- 不继承 Publisher 接口。
- 具有 toFlowable 方法。
- 不需要背压策略。
- 相较 Reactor 中的 Mono 类型,Single 更好地表示了 CompletableFuture 的语义,但是在订阅发生之前它仍然不会开始处理。
4、Maybe
- 实现了与 Reactor 的 Mono 类型相同的语义,但是不兼容响应式流,因为 Maybe 不实现 Publisher 接口。
- 具有 toFlowable 方法,以兼容响应式流规范。
5、Completable
- 只能触发 onError 或onComplete 信号,但不能产生 onNext 信号。
- 不实现 Publisher 接口,但具有toFlowable 方法。
- 它对应不能生成 onNext 信号的 Mono<Void>类型。
总而言之,要与其他兼容响应式流的代码集成,应将 RxJava 类型转换为 Flowable 类型。
1.3、创建 Flux 序列和 Mono 序列
Flux 和 Mono 提供了许多工厂方法,可以根据已有的数据创建响应流。如,可以使用对象引用或集合创建 Flux,甚至可以简单地用数字范围来创建:
@Test
public void test1() {
Flux < String > just = Flux.just("hello", "Blnp");
just.subscribe(System.out::println);
Flux < String > stringFlux = Flux.fromArray(new String[] {
"hello",
"cat",
"pig",
"dog",
"fish"
});
stringFlux.subscribe(System.out::println);
Flux < Integer > integerFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
integerFlux.subscribe(System.out::println);
// 第一个参数是起点,第二个参数表示序列中元素的数量
Flux < Integer > range = Flux.range(1000, 5);
range.subscribe(System.out::println);
}
Mono 提供类似的工厂方法,但主要针对单个元素。它也经常与 nullable 类型和 Optional类型一起使用:
@Test
public void test2() {
Mono < String > just = Mono.just("yes");
just.subscribe(System.out::println);
Mono < Object > objectMono = Mono.justOrEmpty(null);
objectMono.subscribe(System.out::println);
// 避免空指针异常,返回不包含任何值的Optional对象。
Mono < Object > objectMonoEmp = Mono.justOrEmpty(Optional.empty());
objectMonoEmp.subscribe(System.out::println);
}
Mono 对于包装异步操作(如 HTTP 请求或数据库查询)非常有用。Mono 提供了:
- fromCallable(Callable)
- fromRunnable(Runnable)
- fromSupplier(Supplier)
- fromFuture(CompletableFuture)
- fromCompletionStage(CompletionStage)
public String httpRequest() throws IOException, InterruptedException {
URL url = new URL("https://www.baidu.com");
URLConnection urlConnection = url.openConnection();
urlConnection.connect();
InputStream inputStream = urlConnection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String tmp = null;
StringBuffer stringBuffer = new StringBuffer();
while ((tmp = reader.readLine()) != null) {
stringBuffer.append(tmp).append("\r\n");
}
return stringBuffer.toString();
}
@Test
public void test3() {
Mono.fromCallable(() -> httpRequest())
.subscribe(System.out::println);
}
注意,上述代码不仅异步发出 HTTP 请求(由适当的 Scheduler 提供),还会处理onError信号传播的错误。
@Test
public void test3() {
Mono.fromCallable(() -> httpRequest())
.subscribe(
info -> System.out.println(info),
ex -> System.out.println("请求异常:" + ex.toString()),
() -> System.out.println("请求已完成")
);
}
Flux 和 Mono 都可以使用 from(Publisher<T> p) 工厂方法适配任何其他 Publisher 实例。
@Test
public void test4() {
Flux.from(new Publisher < String > () {
@Override
public void subscribe(Subscriber < ? super String > subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext("第 " + (i + 1) + " 条");
}
subscriber.onComplete();
}
}).subscribe(
System.out::println,
System.out::println,
() -> System.out.println("处理已完成!")
);
}
或者是这种写法:
@Test
public void test5() {
Flux.from((Publisher < String > ) subscriber -> {
for (int i = 0; i < 15; i++) {
subscriber.onNext("第 " + (i + 1) + " 条");
}
subscriber.onComplete();
}).subscribe(
System.out::println,
System.out::println,
() -> System.out.println("处理已完成!")
);
}
两种响应式类型都提供了简便的方法来创建常用的空流以及只包含错误的流:
Flux<String> empty = Flux.empty();
FLux<String> never = Flux.never();
Mono<String> error = Mono.error(new RuntimeException("url不可达"));
- empty()工厂方法,它们分别生成 Flux 或 Mono 的空实例。
- never()方法会创建一个永远不会发出完成、数据或错误等信号的流。
- error(Throwable)工厂方法创建一个序列,该序列在订阅时始终通过每个订阅者的onError(...)方法传播错误。由于错误是在 Flux 或 Mono 声明期间被创建的,因此,每个订阅者都会收到相同的 Throwable 实例。
defer 工厂方法创建一个序列,并在订阅时决定其行为,可以为不同的订阅者生成不同的数据:
public boolean isValidSeed(String seed) {
System.out.println("调用了isValidSeed方法");
return true;
}
public String getData(String seed) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "echo : " + seed;
}
public Mono < String > requestData(String seed) {
return isValidSeed(seed) ?
Mono.fromCallable(() -> getData(seed)) :
Mono.error(new RuntimeException("Invalid seed value"));
}
public Mono < String > requestDeferData(String seed) {
return Mono.defer(
() -> isValidSeed(seed) ?
Mono.fromCallable(() -> getData(seed)) :
Mono.error(new RuntimeException("Invalid seed value"))
);
}
@Test
public void test6() {
requestData("zhangsan");
requestDeferData("zhangsan");
requestDeferData("zhangsan").subscribe();
}
总结:
- Project Reactor 只需使用 just 方法枚举元素就可以创建 Flux 和 Mono 序列。
- 可以使用 justOrEmpty 轻松地将 Optional 包装到 Mono 中,或者使用 fromSupplier 方法将Supplier 包装到 Mono 中。
- 可以使用 fromFuture 方法映射 Future,或使用 fromRunnable工厂方法映射 Runnable。
- 可以使用fromArray 或 fromIterable 方法将数组或 Iterable 集合转换为 Flux 流。
完整测试代码:
package com.blnp.net.reactor.demo;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Optional;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/10/10 17:59
*/
public class MainTest {
@Test
public void test1() {
Flux<String> just = Flux.just("hello", "Blnp");
just.subscribe(System.out::println);
Flux<String> stringFlux = Flux.fromArray(new String[]{"hello", "cat","pig", "dog", "fish"});
stringFlux.subscribe(System.out::println);
Flux<Integer> integerFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
integerFlux.subscribe(System.out::println);
// 第一个参数是起点,第二个参数表示序列中元素的数量
Flux<Integer> range = Flux.range(1000, 5);
range.subscribe(System.out::println);
}
@Test
public void test2() {
Mono<String> just = Mono.just("yes");
just.subscribe(System.out::println);
Mono<Object> objectMono = Mono.justOrEmpty(null);
objectMono.subscribe(System.out::println);
// 避免空指针异常,返回不包含任何值的Optional对象。
Mono<Object> objectMonoEmp = Mono.justOrEmpty(Optional.empty());
objectMonoEmp.subscribe(System.out::println);
}
public String httpRequest() throws IOException, InterruptedException {
URL url = new URL("https://www.baidu.com");
URLConnection urlConnection = url.openConnection();
urlConnection.connect();
InputStream inputStream = urlConnection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String tmp = null;
StringBuffer stringBuffer = new StringBuffer();
while ((tmp = reader.readLine()) != null) {
stringBuffer.append(tmp).append("\r\n");
}
return stringBuffer.toString();
}
@Test
public void test3() {
Mono.fromCallable(() ->httpRequest())
.subscribe(
info -> System.out.println(info),
ex -> System.out.println("请求异常:" + ex.toString()),
() -> System.out.println("请求已完成")
);
}
@Test
public void test4() {
Flux.from(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext("第 " + (i+1) + " 条");
}
subscriber.onComplete();
}
}).subscribe(
System.out::println,
System.out::println,
() -> System.out.println("处理已完成!")
);
}
@Test
public void test5() {
Flux.from((Publisher<String>)subscriber -> {
for (int i = 0; i < 15; i++) {
subscriber.onNext("第 " + (i+1) + " 条");
}
subscriber.onComplete();
}).subscribe(
System.out::println,
System.out::println,
() -> System.out.println("处理已完成!")
);
}
public boolean isValidSeed(String seed) {
System.out.println("调用了isValidSeed方法");
return true;
}
public String getData(String seed) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "echo : " + seed;
}
public Mono<String> requestData(String seed) {
return isValidSeed(seed) ?
Mono.fromCallable(() -> getData(seed))
: Mono.error(new RuntimeException("Invalid seed value"));
}
public Mono<String> requestDeferData(String seed) {
return Mono.defer(
() -> isValidSeed(seed) ?
Mono.fromCallable(() -> getData(seed))
: Mono.error(new RuntimeException("Invalid seed value"))
);
}
@Test
public void test6() {
requestData("zhangsan");
requestDeferData("zhangsan");
requestDeferData("zhangsan").subscribe();
}
}
1.4、订阅响应式流(重点关注)
Flux 和 Mono 提供了对 subscribe()方法的基于 lambda 的重载,简化了订阅的开发。subscribe 方法的所有重载都返回 Disposable接口的实例,可以用于取消基础的订阅过程。在重载方法1到4中,订阅发出对无界数据(Long.MAX_VALUE)的请求。
注意:简单订阅请求无界数据(Long.MAX_VALUE)的选项有时可能迫使生产者完成大量工作以满足需求。因此,如果生产者更适合处理有界数据请求,建议使用订阅对象或应用请求限制操作符来控制需求。
// 重载方法1
// 订阅流的最简单方法,忽略所有信号。通常用于触发具有副作用的流处理。
subscribe();
// 重载方法2
// 对每个值(onNext 信号)调用 dataConsumer,不处理 onError 和 onComplete信号。
subscribe(Consumer <T> dataConsumer);
// 重载方法3
// 与重载方法2相同,处理 onError 信号,忽略 onComplete 信号。
subscribe(Consumer <T> dataConsumer, Consumer <Throwable> errorConsumer);
// 重载方法4
// 与重载方法3相同,处理 onComplete 信号。
subscribe(Consumer <T> dataConsumer, Consumer <Throwable> errorConsumer,Runnable completeConsumer);
// 重载方法5
// 消费响应式流中的所有元素,包括错误处理和完成信号。重要的是,这种重载方法能通过请求足够数量的数据来控制订阅,当然,请求数量仍然可以是 Long.MAX_VALUE。
subscribe(Consumer < T > dataConsumer, Consumer < Throwable > errorConsumer,
RUnnable completeConsumer, Consumer <Subscription> subscriptionConsumer);
// 重载方法6
// 订阅序列的最通用方式。在这里,可以为Subscriber的实现提供所需的行为。
subscribe(Subscriber < T > subscriber);
重载方法6非常通用,但很少被用到。具体使用方式:
@Test
public void test7() {
Flux.just("你好", "貂蝉", "是我的")
.subscribe(
data -> System.out.println("onNext:" + data),
ex -> System.err.println("异常信息:" + ex.getMessage()),
() -> System.out.println("响应流处理结束")
);
}
添加副作用的栗子:
@Test
public void test8() {
Flux.range(1, 100)
.filter(item -> item % 7 == 0)
.map(num -> "hello girl - " + num)
//添加副作用
.doOnNext(System.out::println)
.subscribe();
}
@Test
public void test9() {
Flux.range(1, 100)
.filter(item -> item % 7 == 0)
.map(num -> "hello girl - " + num)
//添加副作用
.doOnNext((obj) -> {
String num = obj + "(i)";
System.out.println(num);
})
.subscribe();
}
所谓的副作用其实也就是对上游传递的元素进行额外的操作处理,即上游传递一个就处理一个。
处理正常情况:
@Test
public void test9() {
Flux.range(1, 100)
.filter(item -> item % 9 == 0)
.subscribe(System.out::println);
}
添加异常的订阅处理:
@Test
public void test9() {
Flux.from(subscriber -> {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onError(new Exception("异常测试"));
}).subscribe(
item -> System.out.println("onnext:" + item),
ex -> System.out.println("异常情况:" + ex)
);
}
添加完成事件的订阅处理:
@Test
public void test9() {
Flux.from(subscriber -> {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onComplete();
}).subscribe(
item -> System.out.println("onNext:" + item),
ex -> System.out.println("异常情况:" + ex),
() -> System.out.println("处理完成")
);
}
添加订阅成功的处理:
@Test
public void test9() {
Flux.range(1, 10).subscribe(
item -> System.out.println("onNext:" + item),
ex -> System.out.println("异常处理:" + ex),
() -> System.out.println("处理结束"),
// 一订阅成功就取消订阅
subscription -> subscription.cancel()
);
}
手动控制订阅:
Flux.range(1, 100)
.subscribe(
data -> System.out.println("onNext:" + data),
ex -> System.err.println("异常信息:" + ex.getMessage()),
() -> System.out.println("onComplete"),
subscription -> {
// 订阅响应式流5个元素
subscription.request(5);
// 取消订阅
subscription.cancel();
}
);
上述执行没有收到 onComplete 信号,因为订阅者在流完成之前取消了订阅。
特别注意:
- 响应式流可以由生产者完成(使用 onError 或 onComplete 信号)
- 响应式流可以由订阅者通过 Subscription 实例进行取消。
- Disposable 实例也可用于取消。
通常,Disposable实例不是由订阅者使用,而是由更上一级抽象的代码使用。如在主线程通过调用 Disposable 来取消流处理:
@Test
public void test10() throws InterruptedException {
Disposable disposable = Flux.interval(Duration.ofMillis(50))
.subscribe(
data -> System.out.println("onNext:" + data)
);
Thread.sleep(300);
// 主线程取消订阅
disposable.dispose();
}
1.4.1、实现自定义订阅者
如果默认的 subscribe(...)方法不提供所需的多种功能,则可以实现自己的Subscriber,直接从响应式流规范实现 Subscriber 接口,并将其订阅到流,如下所示:
@Test
public void test11() {
Subscriber < String > subscriber = new Subscriber < String > () {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
System.out.println("initial request for 1 element");
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
System.out.println("requesting 1 more element");
subscription.request(1);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable t) {
System.out.println("onError: " + t.getMessage());
}
};
Flux < String > stream = Flux.just("Hello", "world", "!");
stream.subscribe(subscriber);
}
但是,上述定义订阅的方法是不对的。它打破了线性代码流,也容易出错。最困难的部分是需要自己管理背压并正确实现订阅者的所有 TCK 要求。在前面的示例中,打破了有关订阅验证和取消这几个 TCK 要求。建议扩展 Project Reactor 提供的 BaseSubscriber 类。在这种情况下,订阅者如下所示:
package com.blnp.net.reactor.demo;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/10/11 17:36
*/
public class MySubscriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("订阅成功,开始请求第一个元素");
request(1);
}
@Override
protected void hookOnNext(T value) {
System.out.println("onNext: " + value);
System.out.println("再次请求一个元素");
request(1);
}
}
不仅可以重载 hookOnSubscribe(Subscription)方法,hookOnNext(T)方法,还可以重载hookOnError(Throwable)方法、hookOnCancel()方法、hookOnComplete()方法以及其他方法。
BaseSubscriber 类提供了 request(long)和 requestUnbounded()这些方法来对响应式流需求进行粒度控制。使用 BaseSubscriber 类,实现符合 TCK 的订阅者更为容易。
1.5、用操作符转换响应式流
使用响应式流,除了需要能够创建和使用流,还必须能够完美地转换和操作。Project Reactor 为几乎所有所需的响应式转换提供了工具,通常,可以对库的功能特性做如下分类:
- 转换现有序列;
- 查看序列处理的方法;
- 拆分和聚合 Flux 序列;
- 处理时间;
- 同步返回数据。
1.5.1、映射响应式流元素
转换序列的最自然方式是将每个元素映射到一个新值。Flux 和 Mono 给出了 map 操作符,具有 map(Function<T,R>) 签名的方法可用于逐个处理元素。当操作符将元素的类型从 T 转变为 R 时,整个序列的类型将改变。
Mono 类的 map 操作符具有类似行为。cast(Class c) 操作符将流的元素强制转换为目标类。实现 cast(Class c) 操作符的最简单方法是使用 map() 操作符。如下Flux类源码:
index 操作符可用于枚举序列中的元素。该方法具有以下签名: Flux<Tuple2<Long,T >> index() 。
Flux.range(1, 10)
.map(item -> "hello girl - " + item)
.index()
.subscribe(System.out::println);
timestamp 操作符的行为与 index 操作符类似,但会添加当前时间戳而不是索引。
@Test
public void test12() {
Flux.range(1, 10)
.map(item -> "(A)hello girl - " + item)
.timestamp()
.subscribe(System.out::println);
Flux.range(1, 10)
.map(item -> "(B)hello girl - " + item)
.timestamp()
.subscribe(item -> System.out.println(item.getT1() + " <---> " +
item.getT2()));
}
1.5.2、过滤响应式流
Project Reactor 包含用于过滤元素的各种操作符。
- filter:操作符仅传递满足条件的元素。
- ignoreElements:操作符返回 Mono<T> 并过滤所有元素。结果序列仅在原始序列结束后结束。也就是创建一个Mono序列,忽略作为源的Publisher中的所有元素,只产生消息。
- take(n):操作符限制所获取的元素,该方法忽略除前 n 个元素之外的所有元素。
- takeLast:仅返回流的最后一个元素。
- takeUntil(Predicate):传递一个元素直到满足某个条件。
- elementAt(n):只可用于获取序列的第 n 个元素。
- single:操作符从数据源发出单个数据项,也为空数据源发出 NoSuchElementException错误信号,或者为具有多个元素的数据源发出IndexOutOfBoundsException 信号。它不仅可以基于一定数量来获取或跳过元素,还可以通过带有Duration的 skip(Duration) 或 take(Duration) 操作符。
- takeUntilOther(Publisher) 或 skipUntilOther(Publisher):操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
考虑如下工作流程,该工作流中,首先开始一个流的处理,然后从其他流收到特定事件之后,停止该流的处理。代码如下所示:
@Test
public void test1() throws InterruptedException {
System.out.println("开始 " + System.currentTimeMillis());
Mono < String > start = Mono.just("start").delayElement(Duration.ofSeconds(3));
Mono < String > stop = Mono.just("stop").delayElement(Duration.ofSeconds(6));
Flux.interval(Duration.ofMillis(500))
.map(item -> "fluxElement " + item)
.timestamp()
.skipUntilOther(start)
.takeUntilOther(stop)
.subscribe(System.out::println);
Thread.sleep(10000);
}
此时,可以启动然后停止元素处理,但只执行一次。该场景的弹珠图:
ignoreElements 用法:
@Test
public void test2() {
Flux < String > flux = Flux.just("a", "b", "c");
flux.ignoreElements()
.then(Mono.just("done"))
.subscribe(System.out::println);
/**
* 在这个例子中,尽管flux中有三个元素,但由于使用了ignoreElements,这些元素被忽略,
* 只有当Flux结束时,then操作才会执行,打印出"done"。
**/
}
take 的用法:
@Test
public void test3() {
Flux < Integer > flux = Flux.range(1, 10);
flux.take(5).subscribe(System.out::println);
}
takeLast 的用法:
@Test
public void test4() {
Flux < Integer > flux = Flux.range(1, 10);
flux.takeLast(8).subscribe(System.out::println);
}
小结: take 和 takeLast 的用法区别是,前者是往前取 n 个元素;后者是从后往前取 n 个元素。
takeUntil 用法:
@Test
public void test5() {
Flux.range(1, 10)
.takeUntil(n -> n > 6)
.subscribe(System.out::println);
}
这里有一个Flux流,它不断发出整数,当发出的整数大于5时停止接收数据。takeUntil操作符的用途非常广泛,适用于需要在满足特定条件时停止数据处理的场景。 例如,在网络请求中,当接收到特定的响应状态码时,可以停止进一步的请求处理;在数据处理流程中,当达到某个阈值时,可以停止进一步的数据处理。
elementAt 用法:
@Test
public void test6() {
Flux < Integer > flux = Flux.just(1, 2, 3, 4, 5);
Mono < Integer > result = flux.elementAt(5, 8);
result.subscribe(System.out::println);
}
即从指定的元素序列中获取指定索引位置的值,若索引不存在则取得设定的默认值返回。
single 用法:
@Test
public void test7() {
Mono.just(1)
.single()
.subscribe(
value -> System.out.println(value),
error -> System.err.println("Error: " + error)
);
Flux.just(1, 2, 3, 4, 5)
.single(7)
.subscribe(System.out::println);
Mono.empty()
.single()
.subscribe(System.out::println);
}
1.5.3、收集响应式流
收集列表中的所有元素,并使用 Flux.collectList() 和 Flux.collectSortedList() 将结果集合处理为 Mono 流是可能的。 Flux.collectSortedList() 不仅会收集元素,还会对它们进行排序。如下代码:
@Test
public void test1() {
Flux.just(1, 6, 2, 8, 3, 1, 5, 1)
.collectSortedList(Comparator.reverseOrder())
.subscribe(System.out::println);
}
请注意,收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗所有可用的内存。Project Reactor 不仅可以将 Flux 元素收集到 List,还可以收集以下内容:
- 使用 collectMap 操作符的映射( Map<K,T> );
- 使用 collectMultimap 操作符的多映射( Map<K,Collection<T>> );
- Flux.collect(Collector) 操作符收集到任何实现了 java.util.stream.Collector 的数据结构。
- Flux 和 Mono 都有 repeat() 方法和 repeat(times) 方法,这两种方法可以针对传入序列进行循环操作。
- defaultIfEmpty(T) 是另一个简洁的方法,它能为空的 Flux 或 Mono 提供默认值。
- Flux.distinct() 仅传递之前未在流中遇到过的元素。但是,因为此方法会跟踪所有唯一性元素,所以(尤其是涉及高基数数据流时)请谨慎使用。distinct 方法具有重载方法,可以为重复跟踪提供自定义算法。因此,有时可以手动优化 distinct 操作符的资源使用。Flux.distinctUntilChanged() 操作符没有此限制,可用于无限流以删除出现在不间断行中的重复项。
注:高基数(high-cardinality)是指具有非常罕见元素或唯一性元素的数据。例如,身份编号和用户名就是典型的高基数数据,而枚举值或来自小型固定字典的值就不是。
collectMap 操作符的使用:
@Test
public void test2() {
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(new Function < Integer, String > () {
@Override
public String apply(Integer integer) {
return String.format("一、(new Function)key:num - %s", integer);
}
})
.subscribe(System.out::println);
System.out.println("============================ ============================\n");
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(item -> String.format("(Default)key:num - %s", item))
.subscribe(System.out::println);
System.out.println("============================ ============================\n");
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(new Function < Integer, String > () {
@Override
public String apply(Integer integer) {
return String.format("二、(new Function)key: - %s", integer);
}
}, new Function < Integer, String > () {
@Override
public String apply(Integer integer) {
return String.format("二、(new Function)value: - %s", integer);
}
})
.subscribe(System.out::println);
System.out.println("============================ ============================\n");
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(item1 -> "key: " + item1, item2 -> "value: " + item2)
.subscribe(System.out::println);
System.out.println("============================ ============================\n");
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(
val1 -> "key: " + val1,
val2 -> "value: " + val2,
() -> {
Map < String,
String > map = new HashMap < > ();
for (int i = 0; i < 5; i++) {
map.put(i + "-key", i + "-value");
}
return map;
}
)
.subscribe(System.out::println);
}
collectMultimap 的使用:
@Test
public void test3() {
Flux.just(1, 2, 3, 4, 5)
.collectMultimap(
key -> "key: " + key,
val -> {
List < String > values = new ArrayList < > ();
for (int i = 0; i < val; i++) {
values.add("value" + i);
}
return values;
}
)
.subscribe(System.out::println);
System.out.println("\n");
Flux.just(1, 2, 3, 4, 5)
.collectMultimap(
item -> "key:" + item,
item1 -> {
List < String > values = new ArrayList < > ();
for (int i = 0; i < item1; i++) {
values.add("value" + i);
}
return values;
},
() -> {
Map map = new HashMap < String,
List < String >> ();
List < String > list = new ArrayList < > ();
for (int i = 0; i < 3; i++) {
list.clear();
for (int j = 0; j < i; j++) {
list.add("ele:" + j);
}
map.put(i + ":key", list);
}
return map;
}
).subscribe(System.out::println);
}
repeat 操作符的使用:
@Test
public void test4() {
Flux.just(1, 2, 3)
// 实际上是打印四次,1次原始的,3次重复的。
.repeat(3)
.subscribe(System.out::println);
}
defaultIfEmpty 操作符的使用:
@Test
public void test5() {
Flux.empty()
.defaultIfEmpty("hello")
.subscribe(System.out::println);
}
distinct 操作符的使用:
@Test
public void test6() {
Flux.just(1, 2, 3, 3, 3)
.repeat(3)
.distinct()
.subscribe(System.out::println);
}
distinctUntilChanged 操作符的使用:
@Test
public void test7() {
Flux.just(1, 2, 3)
.repeat(3)
.distinctUntilChanged()
.subscribe(System.out::println);
System.out.println("============================ ============================");
Flux.just(1, 1, 2, 2, 3, 3)
.distinctUntilChanged()
.subscribe(System.out::println);
}
1.5.4、裁剪流中的元素
- 统计流中元素的数量;
- 检查所有元素是否具有 Flux.all(Predicate) 所需的属性;
- 使用 Flux.any(Predicate) 操作符检查是否至少有一个元素具有所需属性;
- 使用 hasElements 操作符检查流中是否包含多个元素;
- 使用 hasElement 操作符检查流中是否包含某个所需的元素。短路逻辑,在元素与值匹配时立即返回true。
- any 操作符不仅可以检查元素的相等性,还可以通过提供自定义 Predicate 实例来检查任何其他属性。
检查序列中是否包含偶数:
@Test
public void test8() {
Flux.just(3, 5, 7, 9, 11, 15, 16, 17)
.any(item -> item % 2 == 0)
.subscribe(System.out::println);
}
sort 操作符:
sort 操作符在后台对元素进行排序,然后在原始序列完成后发出已排序的序列。
@Test
public void test9() {
Flux.just(3, 12, 7, 27, 15, 19, 16, 4)
.sort()
.subscribe(System.out::println);
}
reduce 操作符:
Flux 类能使用自定义逻辑来裁剪序列(也称为折叠)。 reduce 操作符通常需要一个初始值和一个函数,而该函数会将前一步的结果与当前步的元素组合在一起。reduce 操作符只生成一个具有最终结果的元素。举例将 1 到 5 之间的整数加起来:
@Test
public void test10() {
Flux.range(1, 5)
// 初始值,折叠操作
.reduce(0, (item1, item2) -> item1 + item2)
.subscribe(System.out::println);
}
scan 操作符:
Flux.scan()操作符在进行聚合时,可以向下游发送中间结果。scan 操作符对 1 到 5 之间的整数求和:
@Test
public void test11() {
Flux.range(1, 5)
//会将每一步计算的结果发送给下游
.scan(0, (num1, num2) -> num1 + num2)
.subscribe(System.out::println);
}
scan 操作符对于许多需要获取处理中事件的相关信息的应用程序有用。例如,我们可以计算流上的移动平均值:
@Test
public void test12() {
int arrLength = 5;
Flux.range(1, 500)
.index()
.scan(new int[arrLength], (arr, entry) -> {
//scan 第一个发射的元素是它的初始值
arr[(int)(entry.getT1() % arrLength)] = entry.getT2();
return arr;
})
// 当窗口数组被灌满之后开始计算平均值,因此跳过没有灌满的情况
.skip(arrLength)
.map(array -> Arrays.stream(array).sum() * 1.0 / arrLength)
.subscribe(System.out::println);
}
then、thenMany 和 thenEmpty
Mono 和 Flux 流有 then、thenMany 和 thenEmpty 操作符,它们在上游流完成时完成。上游流完成处理后,这些操作符可用于触发新流,订阅是对于新流的。
@Test
public void test13() {
Flux.just(1, 2, 3)
.doOnNext(item -> System.out.println("副作用:" + item))
.doOnNext(obj -> System.out.println("============================ 🍔 ============================"))
.thenMany(Flux.just(4, 5, 6))
.subscribe(System.out::println);
}
即使 1、2 和 3 是由流生成和处理的,subscribe 方法中的 lambda 也只接收 4、5 和 6。
1.5.5、组合响应式流
Project Reactor 可以将许多传入流组合成一个传出流。指定的操作符虽然有许多重载方法,但是都会执行以下转换。
- concat 操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后对第二个流执行相同的操作(先后)。
- merge 操作符将来自上游序列的数据合并到一个下游序列中。与 concat 操作符不同,上游数据源是立即(同时)被订阅的。
- zip 操作符订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中。
- combineLatest 操作符与 zip 操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值。
concat 操作符的使用:
@SneakyThrows
@Test
public void test1() {
Flux.concat(
Flux.range(1000, 5).delayElements(Duration.ofMillis(100)),
Flux.range(10, 5).delayElements(Duration.ofMillis(100))
)
.subscribe(System.out::println);
Thread.sleep(10000);
}
merge 操作符的使用:
@SneakyThrows
@Test
public void test2() {
Flux.merge(
Flux.range(1000, 5).delayElements(Duration.ofMillis(100)),
Flux.range(10, 5).delayElements(Duration.ofMillis(100))
)
.subscribe(System.out::println);
Thread.sleep(10000);
}
zip 操作符的使用:
@Test
public void test3() {
Flux.zip(
Flux.range(1000, 5),
Flux.range(10, 5)
)
.subscribe(System.out::println);
}
combineLatest 操作符的使用:
@SneakyThrows
@Test
public void test4() {
Flux.combineLatest(
Flux.range(1000, 5).delayElements(Duration.ofMillis(250)),
Flux.range(10, 5).delayElements(Duration.ofMillis(100)),
(val1, val2) -> val1 + " ---- " + val2
)
.subscribe(System.out::println);
Thread.sleep(10000);
}
1.5.6、流元素批处理
Project Reactor 支持以以下几种方式对流元素( Flux<T> )执行批处理。
- 将元素缓冲(buffering)到容器(如 List)中,结果流的类型为 Flux<List<T>> 。
- 通过开窗(windowing)方式将元素加入诸如 Flux<Flux<T>> 等流中。请注意,现在的流信号不是值,而是可以处理的子流。
- 通过某些键将元素分组(grouping)到具有 Flux<GroupedFlux<K, T>> 类型的流中。每个新键都会触发一个新的 GroupedFlux 实例,并且具有该键的所有元素都将被推送到GroupFlux 类的该实例中。
可以基于以下场景进行缓冲和开窗操作:
- 处理元素的数量,比方说每 10 个元素;
- 一段时间,比方说每 5 分钟一次;
- 基于一些谓语,比方说在每个新的偶数之前切割;
- 基于来自其他 Flux 的一个事件,该事件控制着执行过程。
buffer 操作符:
如,为列表(大小为 5)中的整数元素执行缓冲操作:
@Test
public void test1() {
Flux.range(1, 100)
.buffer(5)
.subscribe(System.out::println);
}
buffer 操作符将许多事件收集到一个事件集合中。该集合本身成为下游操作符的事件。当需要使用元素集合来生成一些请求,而不是使用仅包含一个元素的集合来生成许多小请求时,用缓冲操作符来实现批处理会比较方便。如,可以将数据项缓冲几秒钟然后批量插入,而不是逐个将元素插入数据库。
window 操作符:
如果需要根据数字序列中的元素是否为素数进行开窗拆分,可以使用 window 操作符的变体windowUntil。它使用谓词来确定何时创建新切片。代码如下所示:
@Test
public void test2() {
Flux.range(101, 20)
/**
* 参数1:判断条件
* 参数2:当条件满足时,是否将触发判断条件的元素会分到下一波即下一个窗口
**/
.windowUntil(val -> isPrime(val), true)
.subscribe(window ->
window.collectList()
.subscribe(item -> System.out.println("window = " + item))
);
}
/**
* 用途:判断是否为素数
* @author liaoyibin
* @since 15:25 2024/10/12
* @params [integer]
* @param integer
* @return boolean
**/
public boolean isPrime(Integer integer) {
double sqrt = Math.sqrt(integer);
if (integer < 2) {
return false;
}
if (integer == 2 || integer == 3) {
return true;
}
if (integer % 2 == 0) {
return false;
}
for (int i = 3; i <= sqrt; i++) {
if (integer % i == 0) {
return false;
}
}
return true;
}
请注意第一个窗口为空。这是因为一旦启动原始流,就会生成一个初始窗口。然后,第一个元素会到达(数字 101),它是素数,会触发一个新窗口。因此,已经打开的窗口会在没有任何元素的情况下通过 onComplete 信号关闭。
window操作符和buffer操作符类似,后者仅在缓冲区关闭时才会发出集合,而 window 操作符会在事件到达时立即对其进行传播,以更快地做出响应并实现更复杂的工作流程。
groupBy 操作符:
groupBy 操作符通过某些条件对响应式流中的元素进行分组。通过对每个元素打一个标签(key),按照标签将元素进行分组。如:将整数序列按照奇数和偶数进行分组,并仅跟踪每组中的最后两个元素。代码如下所示:
@Test
public void test3() {
Flux.range(1, 7)
.groupBy(val -> val % 2 == 0 ? "偶数" : "奇数")
.subscribe(groupFlux -> groupFlux.scan(
new ArrayList < Integer > (),
(list, element) -> {
list.add(element);
if (list.size() > 2) {
list.remove(0);
}
return list;
}
)
.filter(list -> !list.isEmpty())
.subscribe(item -> System.out.println(groupFlux.key() + " <---> " + item))
);
}
1.5.7、flatMap、concatMap 和 flatMapSequential 操作符
flatMap 操作符在逻辑上由 map 和 flatten(就 Reactor 而言,flatten 类似于 merge 操作符)这两个操作组成。flatMap 操作符的 map 部分将传入的每个元素转换为响应式流(T -> Flux<R>);flatten 部分将所有生成的响应式流合并为一个新的响应式流,通过该流可以传递 R 类型的元素。
Project Reactor 提供了 flatMap 操作符的一些不同变体。除了重载,该库还提供了flatMapSequential 操作符和 concatMap 操作符。这 3 个操作符在以下几个方面有所不同。
- 操作符是否立即订阅其内部流:
flatMap 操作符和 flatMapSequential 操作符会立即订阅,而 concatMap 操作符则会在生成下一个子流并订阅它之前等待每个内部完成。
- 操作符是否保留生成元素的顺序:
concatMap 天生保留与源元素相同的顺序,flatMapSequential 操作符通过对所接收的元素进行排序来保留顺序,而 flatMap 操作符不一定保留原始排序。
- 操作符是否允许对来自不同子流的元素进行交错:
flatMap 操作符允许交错,而 concatMap和 flatMapSequential 不允许交错。
flatMap 操作符(及其变体)在函数式编程和响应式编程中都非常重要,因为它能使用一行代码实现复杂的工作流。
flatMap 操作符:
@Test
public void test5() throws InterruptedException {
Random random = new Random();
Flux.just(
Arrays.asList(1, 2, 3),
Arrays.asList("a", "b", "c", "d"),
Arrays.asList(7, 8, 9)
)
.doOnNext(System.out::println)
.flatMap(item -> Flux.fromIterable(item)
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
.doOnSubscribe(subscription -> {
System.out.println("已订阅");
})
)
.subscribe(System.out::println);
Thread.sleep(3000);
}
concatMap 操作符:
@Test
public void test6() throws InterruptedException {
Random random = new Random();
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList("a", "b", "c", "d"),
Arrays.asList(7, 8, 9))
.doOnNext(System.out::println)
.concatMap(item -> Flux.fromIterable(item)
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
.doOnSubscribe(subscription -> {
System.out.println("已订阅");
}))
.subscribe(System.out::println);
Thread.sleep(3000);
}
concatMap对每个上游的元素,在接收后都立即生成新的流,新流每个元素处理完之后,进行下一个新流的处理。
flatMapSequential 操作符的使用:
@Test
public void test7() throws InterruptedException {
Random random = new Random();
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList("a", "b", "c", "d"),
Arrays.asList(7, 8, 9))
.doOnNext(System.out::println)
.flatMapSequential(item -> Flux.fromIterable(item)
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
.doOnSubscribe(subscription -> {
System.out.println("已订阅");
}))
.subscribe(System.out::println);
Thread.sleep(3000);
}
1.5.8、元素采样
对于高吞吐量场景而言,通过应用采样技术处理一小部分事件是有意义的。sample 操作符和 sampleTimeout 操作符可以让流周期性地发出与时间窗口内最近看到的值相对应的数据项。我们假设使用以下代码:
sample API:
@Test
public void test8() throws InterruptedException {
// 每隔100ms就从流中取一个元素
Flux.range(1, 100)
.delayElements(Duration.ofMillis(10))
.sample(Duration.ofMillis(100))
.subscribe(System.out::println);
Thread.sleep(10000);
}
这里使我们每10毫秒都顺序生成数据项,订阅者也只会收到所指定的约束条件内的一小部分事件。通过这种方法,我们可以在不需要所有传入事件就能成功操作的场景下使用被动限速。流控。
sampleTimeout API:
@Test
public void test9() throws InterruptedException {
Random random = new Random();
Flux.range(1, 20)
.delayElements(Duration.ofMillis(100))
// 并发计算超时时间,调节速度快慢。
.sampleTimeout(item -> Mono.delay(Duration.ofMillis(random.nextInt(100) + 50)), 20)
.subscribe(System.out::println);
Thread.sleep(Long.MAX_VALUE);
}
1.5.9、响应式流转阻塞结构
Project Reactor 库提供了一个 API,用于将响应式流转换为阻塞结构。有以下选项来阻塞流并同步生成结果:
- toIterable: 方法将响应式 Flux 转换为阻塞 Iterable。
- toStream: 方法将响应式 Flux 转换为阻塞 Stream API。从 Reactor 3.2 开始,在底层使用toIterable 方法。
- blockFirst: 方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
- blockLast: 方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在 onError的情况下,它会在被阻塞的线程中抛出异常。
blockFirst 操作符和 blockLast 操作符具有方法重载,可用于设置线程阻塞的持续时间。这应该可以防止线程被无限阻塞。
toIterable 和 toStream 方法能够使用 Queue 来存储事件,这些事件可能比客户端代码阻塞Iterable 或 Stream 更快到达。微批处理。
@Test
public void test10() {
Flux.just(1, 2, 3).toIterable(3)
.forEach(System.out::println);
Stream < Integer > integerStream = Flux.just(8, 9, 10).toStream(3);
long count = integerStream
.map(obj -> {
System.out.println("obj = " + obj);
return obj;
})
.count();
}
@Test
public void test11() throws InterruptedException {
final Iterable < Integer > integers = Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.toIterable();
System.out.println("A==================");
for (Integer integer: integers) {
System.out.println(integer);
}
System.out.println("A==================");
Stream < Integer > integerStream = Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.toStream();
System.out.println("B==================");
integerStream.forEach(System.out::println);
System.out.println("B==================");
Integer integer = Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.doOnNext(System.out::println)
.blockFirst();
System.out.println("C==============");
System.out.println(integer);
System.out.println("C==============");
Thread.sleep(5000);
// 该方法不会阻塞主线程
Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.doOnEach(System.out::println)
.subscribe();
// 该方法阻塞,直到流处理到最后一个元素
Integer integer2 = Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.doOnEach(System.out::println)
.blockLast();
System.out.println("D================");
System.out.println(integer2);
System.out.println("D================");
Flux < Integer > integerFlux = Flux.just(1, 2,
3).delayElements(Duration.ofSeconds(1));
integerFlux.subscribe(item -> System.out.println("第一个订阅:" + item));
integerFlux.subscribe(item -> System.out.println("第二个订阅:" + item));
integerFlux.subscribe(item -> System.out.println("第三个订阅:" + item));
final Integer integer3 = integerFlux.blockLast();
System.out.println("阻塞等待最后一个元素" + integer3);
System.out.println("E-=-=-=-=-=-=-=-=-");
Thread.sleep(5000);
}
1.5.10、在序列处理时查看元素
有时,我们需要对处理管道中的每个元素或特定信号执行操作。为满足此类要求,Project Reactor提供了以下方法。
- doOnNext(Consumer<T>): 使我们能对 Flux 或 Mono 上的每个元素执行一些操作
- doOnComplete 和 doOnError(Throwable): 可以应用在相应的事件上
- doOnSubscribe(Consumer<Subscription>) 、 doOnRequest(LongConsumer) 和 doOnCancel(Runnable): 使我们能对订阅生命周期事件做出响应
- 无论是什么原因导致的流终止, doOnTerminate(Runnable) 都会在流终止时被调用
此外,Flux 和 Mono 提供了 doOnEach(Consumer<Signal>) 方法,该方法处理表示响应式流领域的所有信号,包括 onError、onSubscribe、onNext、onError 和 onComplete。如下示例:
@Test
public void test12() {
Flux.just(1, 2, 3)
.concatWith(Flux.error(new RuntimeException("手动异常")))
.doOnEach(item -> System.out.println(item))
.subscribe();
}
1.5.11、物话与非物化信号
将流中的元素封装为Signal对象进行处理。有时,采用信号进行流处理比采用数据进行流处理更有用。为了将数据流转换为信号流并再次返回,Flux 和 Mono 提供了 materialize 方法和 dematerialize 方法。示例如下:
@Test
public void test13() throws InterruptedException {
Flux.just(1, 2, 3).delayElements(Duration.ofMillis(1000))
.publishOn(Schedulers.parallel())
.concatWith(Flux.error(new Exception("手动异常")))
.materialize()
.doOnEach(item -> System.out.println(item.isOnComplete()))
.subscribe(System.out::println);
Thread.sleep(10000);
}
这里,在处理信号流时,doOnNext 方法不仅接收带有数据的 onNext 事件,还接收包含在Signal类中的 onComplete 事件。此方法能采用一个类型层次结构来处理 onNext、onError和 onCompete 事件。
如果我们只需要记录信号而不修改它们,那么 Reactor 提供了 log 方法,该方法使用可用的记录器记录所有处理过的信号。
1.6、以编程方式创建流
有时候需要一种更复杂的方法来在流中生成信号,或将对象的生命周期绑定到响应式流的生命周期。
1.6.1、push 和 create 工厂方法
push 工厂方法能通过适配一个单线程生产者来编程创建 Flux 实例。此方法对于适配异步、单线程、多值 API 非常有用,而无须关注背压和取消, push 方法本身包含背压和取消。如下代码示例:
@Test
public void pushTest() throws InterruptedException {
// Java Stream API生成1000个整数元素并通过FluxSink的next方法将元素发送到下游响应式流
Flux.push(new Consumer < FluxSink < Integer >> () {
@Override
public void accept(FluxSink < Integer > fluxSink) {
IntStream.range(2000, 3000)
.forEach(fluxSink::next);
}
})
.delayElements(Duration.ofMillis(1))
.subscribe(event -> System.out.println("onNext:" + event));
Thread.sleep(5000);
//============================
Flux.push(fluxSink -> IntStream.range(1000, 2000).forEach(fluxSink::next))
.delayElements(Duration.ofMillis(100))
.subscribe(System.out::println);
Thread.sleep(10000);
}
push 工厂方法可以很方便地使用默认的背压和取消策略来适配异步 API。
create 方法:
create 工厂方法,与 push 工厂方法类似,起到桥接的作用。该方法能从不同的线程发送事件。如下代码所示:
@Test
public void createTest() throws InterruptedException {
MyEventProcessor myEventProcessor = new MyEventProcessor();
//注册监听事件
Flux < String > bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener < String > () {
@Override
public void onDataChunk(List < String > chunk) {
for (String s: chunk) {
sink.next(s);
}
}
@Override
public void processComplete() {
sink.complete();
}
});
});
//订阅消息处理
bridge.subscribe(
System.out::println,
ex -> System.err.println(ex),
() -> System.out.println("处理完成")
);
//通知完成
myEventProcessor.process();
Thread.sleep(5000);
}
1.6.2、generate 工厂方法
generate 工厂方法旨在基于生成器的内部处理状态创建复杂序列。它需要一个初始值和一个函数,该函数根据前一个内部状态计算下一个状态,并将 onNext 信号发送给下游订阅者。例如,创建一个简单的响应式流来生成斐波那契(Fibonacci)数列(1,1,2,3,5,8,13,…)。
@Test
public void test1() throws InterruptedException {
Flux.generate(
// 通过Callable提供初始状态实例
new Callable < ArrayList < Long >> () {
@Override
public ArrayList < Long > call() throws Exception {
final ArrayList < Long > longs = new ArrayList < > ();
longs.add(0 L);
longs.add(1 L);
return longs;
}
},
// 负责生成斐波拉契数列
// 函数的第一个参数类型,函数第二个参数类型,函数计算结果类型
new BiFunction < ArrayList < Long > , SynchronousSink < Long > , ArrayList < Long >> () {
@Override
public ArrayList < Long > apply(ArrayList < Long > longs,
SynchronousSink < Long > sink) {
final Long aLong = longs.get(longs.size() - 1);
final Long aLong1 = longs.get(longs.size() - 2);
// 向下游发射流元素
sink.next(aLong);
longs.add(aLong + aLong1);
return longs;
}
}
).delayElements(Duration.ofMillis(500))
.take(10)
.subscribe(System.out::println);
Thread.sleep(5000);
}
lambda 形式:
@Test
public void test2() throws InterruptedException {
Flux.generate(
() -> {
final ArrayList < Long > longs = new ArrayList < > ();
longs.add(0 L);
longs.add(1 L);
return longs;
},
(state, sink) -> {
final Long aLong = state.get(state.size() - 1);
final Long bLong = state.get(state.size() - 2);
//发射元素
sink.next(aLong);
state.add(aLong + bLong);
return state;
}
)
.delayElements(Duration.ofMillis(500))
.take(10)
.subscribe(System.out::println);
Thread.sleep(5000);
}
二元组参数示例:
@Test
public void test3() throws InterruptedException {
Flux.generate(
() -> Tuples.of(0 L, 1 L),
(state, sink) -> {
System.out.println("生成的数字:" + state.getT2());
sink.next(state.getT2());
long newValue = state.getT1() + state.getT2();
return Tuples.of(state.getT2(), newValue);
}
).delayElements(Duration.ofMillis(500))
.take(10)
.subscribe(System.out::println);
Thread.sleep(5000);
}
在下一个值生成之前,每个新值都被同步传播给订阅者。当生成不同的复杂响应式流,而该序列需要保持发射之间的中间状态时,该方法非常有用。
1.6.3、using 用法
using 工厂方法能根据一个 disposable 资源创建流,是用于管理资源的函数。它在响应式编程中实现了 try-with-resources 方法。假设我们需要包装一个阻塞 API,而该 API 使用以下有如下表示:
package com.blnp.net.reactor.stream;
import org.junit.Test;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.Random;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/10/31 16:49
*/
public class DisposableTest {
@Test
public void test() {
try(Connection connection = Connection.newConnection()) {
connection.getData().forEach(data -> System.out.println("接收的数据 = " + data));
}catch (Exception e) {
System.out.println("错误信息 = " + e);
}
}
static class Connection implements AutoCloseable {
private final Random rnd = new Random();
static Connection newConnection() {
System.out.println("创建Connection对象");
return new Connection();
}
public Iterable<String> getData() {
if (rnd.nextInt(10) < 3) {
throw new RuntimeException("通信异常");
}
return Arrays.asList("数据1", "数据2");
}
/**
* 用途:close方法可以释放内部资源,并且应该始终被调用,即使在getData执行期间发生错误也是如此。
* @author liaoyibin
* @since 16:54 2024/10/31
* @params []
* @param
* @return void
**/
@Override
public void close() {
System.out.println("关闭Connection连接");
}
}
@Test
public void test2() {
Flux.using(
Connection::newConnection,
connection -> Flux.fromIterable(connection.getData()),
Connection::close
).subscribe(
data -> System.out.println("onNext接收到的数据:" + data),
ex -> System.err.println("onError接收到的异常信息:" + ex),
() -> System.out.println("处理完毕")
);
}
}
拓展:这里特别说明下关于 try 的不同。这里是将语法从:try{}catch{}finaliy{} 变成了 try(){}catch{},凡是在try的括号中声明的类都必须实现java.io.Closeable接口,这样try就会自动将声明的流在使用完毕后自动关闭。
而使用响应式的编程实现同样的效果,如下代码所示:
@Test
public void test2() {
/**
* Flux.using(
* resourceSupplier, // 生成资源的工厂方法
* resourceConsumer, // 使用资源的方法
* resourceCleaner // 清理资源的方法
* )
**/
Flux.using(
Connection::newConnection,
connection -> Flux.fromIterable(connection.getData()),
Connection::close
).subscribe(
data -> System.out.println("onNext接收到的数据:" + data),
ex -> System.err.println("onError接收到的异常信息:" + ex),
() -> System.out.println("处理完毕")
);
}
1.6.4、usingWhen 用法
与 using 操作符类似,usingWhen 操作符使我们能以响应式方式管理资源。但是,using 操作符会同步获取受托管资源(通过调用 Callable 实例)。同时,usingWhen 操作符响应式地获取受托管资源(通过订阅 Publisher 的实例)。此外,usingWhen 操作符接受不同的处理程序,以便应对主处理流终止的成功和失败。这些处理程序由发布者实现。
可以仅使用 usingWhen 一个操作符实现完全无阻塞的响应式事务。假设我们有一个完全响应式的事务。出于演示目的,代码做了简化处理。响应式事务实现如下所示:
package com.blnp.net.reactor.stream;
import lombok.SneakyThrows;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/10/31 17:16
*/
public class UsingWhenTest {
static class Transaction {
private static final Random random = new Random();
private final int id;
public Transaction(int id) {
this.id = id;
System.out.println("创建事务实例:" + id);
}
/**
* 用途:开启响应式事务
* @author liaoyibin
* @since 16:00 2024/11/1
* @params []
* @param
* @return reactor.core.publisher.Mono<com.blnp.net.reactor.stream.UsingWhenTest.Transaction>
**/
public static Mono<Transaction> beginTransaction() {
return Mono.defer(() -> {
return Mono.just(new Transaction(random.nextInt(1000)));
});
}
/**
* 用途:响应式插入数据
* @author liaoyibin
* @since 16:04 2024/11/1
* @params [rows]
* @param rows
* @return reactor.core.publisher.Flux<java.lang.String>
**/
public Flux<String> insertRows(Publisher<String> rows) {
return Flux.from(rows)
.delayElements(Duration.ofMillis(100))
.flatMap(row -> {
if (random.nextInt(10) < 2) {
return Mono.error(new RuntimeException("出错的条目:" + row));
} else {
return Mono.just(row);
}
});
}
/**
* 用途:响应式提交
* @author liaoyibin
* @since 16:26 2024/11/1
* @params []
* @param
* @return reactor.core.publisher.Mono<java.lang.Void>
**/
public Mono<Void> commit() {
return Mono.defer(() -> {
System.out.println("[开始提交事务:" + id);
if (random.nextBoolean()) {
return Mono.empty();
} else {
return Mono.error(new RuntimeException("事务提交异常"));
}
});
}
/**
* 用途:响应式回滚
* @author liaoyibin
* @since 16:27 2024/11/1
* @params []
* @param
* @return reactor.core.publisher.Mono<java.lang.Void>
**/
public Mono<Void> rollback() {
return Mono.defer(() -> {
System.out.println("开始回滚事务:" + id);
if (random.nextBoolean()) {
return Mono.empty();
} else {
return Mono.error(new RuntimeException("连接异常"));
}
});
}
}
@SneakyThrows
@Test
public void test1() {
Flux.usingWhen(
//提供资源
Transaction.beginTransaction(),
new Function<Transaction, Publisher<?>>() {
@Override
public Publisher<?> apply(Transaction transaction) {
return transaction.insertRows(Flux.just("a", "b", "c"));
}
},
//资源的使用
new Function<Transaction, Publisher<?>>() {
@Override
public Publisher<?> apply(Transaction transaction) {
return transaction.commit();
}
},
//当资源正常使用结束,调用了onComplete,则使用该函数清理资源
new BiFunction<Transaction, Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Transaction transaction, Throwable throwable) {
return transaction.rollback();
}
},
//如果取消资源的使用,则使用该函数清理资源。如果设置为null,则使用资源正常结束时的清理函数
new Function<Transaction, Publisher<?>>() {
@Override
public Publisher<?> apply(Transaction transaction) {
return null;
}
}
)
.subscribe(
event -> System.out.println("onNext:" + event),
ex -> System.err.println("onError:" + ex.getCause()),
() -> System.out.println("处理完成")
);
Thread.sleep(10000);
}
}
Lambda形式:
@SneakyThrows
@Test
public void test2() {
Flux.usingWhen(
Transaction.beginTransaction(),
transaction -> transaction.insertRows(Flux.just("A", "B", "C")),
Transaction::commit,
(transaction, throwable) -> transaction.rollback(),
Transaction::rollback
).subscribe(
event -> System.out.println("onNext:" + event),
ex -> System.err.println("onError:" + ex.getCause()),
() -> System.out.println("处理完成")
);
Thread.sleep(10000);
}
使用 usingWhen 操作符,不仅可以更容易地以完全响应式的方式管理资源生命周期,还可以轻松实现响应式事务。因此,与 using 操作符相比,usingWhen 操作符有巨大改进。
1.7、错误处理
onError 信号是响应式流规范的一个组成部分,一种将异常传播给可以处理它的用户。但是,如果最终订阅者没有为 onError 信号定义处理程序,那么 onError 抛异常。
@Test
public void test() {
Flux.from(new Publisher < String > () {
@Override
public void subscribe(Subscriber < ? super String > subscriber) {
subscriber.onError(new RuntimeException("手动异常"));
}
}).subscribe(System.out::println);
}
此外,响应式流的语义定义了 onError 是一个终止操作,该操作之后响应式流会停止执行。此时,我们可能采取以下策略中的一种做出不同响应:
- 为 subscribe 操作符中的 onError 信号定义处理程序。
- 通过 onErrorReturn 操作符捕获一个错误,并用一个默认静态值或一个从异常中计算出的值替换它。
- 通过 onErrorResume 操作符捕获异常并执行备用工作流。
- 通过 onErrorMap 操作符捕获异常并将其转换为另一个异常来更好地表现当前场景。
- 定义一个在发生错误时重新执行的响应式工作流。如果源响应序列发出错误信号,那么retry 操作符会重新订阅该序列。
假设有如下推荐服务,该服务是不可靠的:
package com.blnp.net.reactor.stream;
import lombok.SneakyThrows;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/1 16:52
*/
public class NormalTest {
private Random random = new Random();
private CountDownLatch latch = new CountDownLatch(1);
@Test
public void test() {
Flux.from(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onError(new RuntimeException("手动异常"));
}
}).subscribe(System.out::println, System.err::println);
}
public Flux<String> recommendedBooks(String userId) {
return Flux.defer(() -> {
if (random.nextInt(10) < 7) {
return Flux.<String>error(new RuntimeException("Err"))
// 整体向后推移指定时间,元素发射频率不变
.delaySequence(Duration.ofMillis(100));
} else {
return Flux.just("Blue Mars", "The Expanse")
.delayElements(Duration.ofMillis(50));
}
}).doOnSubscribe(
item -> System.out.println("请求:" + userId)
);
}
@SneakyThrows
@Test
public void test1() {
Flux.just("user-0010")
.flatMap(user -> recommendedBooks(user)).retry(3)
.subscribe(
System.out::println,
ex -> {
System.err.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
}
@SneakyThrows
@Test
public void test2() {
Flux.just("user-0010")
.flatMap(user -> recommendedBooks(user)).retry(3)
.onErrorResume(event -> Flux.just("java编程指南.pdf"))
.subscribe(
System.out::println,
ex -> {
System.err.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
}
@SneakyThrows
@Test
public void test3() {
Flux.just("user-0010")
.flatMap(user -> recommendedBooks(user)).retry(3)
.onErrorReturn("程序员生存之道.pdf")
.subscribe(
System.out::println,
ex -> {
System.err.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
}
@SneakyThrows
@Test
public void test4() {
Flux.just("user-0010")
.flatMap(user -> recommendedBooks(user))
.onErrorMap(throwable -> {
if (throwable.getMessage().equals("Err")) {
return new Exception("我的异常替换");
}
return new Exception("未知异常");
})
.subscribe(
System.out::println,
ex -> {
System.err.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
}
}
总而言之,Project Reactor 提供了丰富的工具集,可以帮助处理异常情况,从而提高应用程序的回弹性。
1.8、背压处理(🍕重点)
尽管响应式流规范要求将背压构建到生产者和消费者之间的通信中,但这仍然可能使消费者溢出。一些消费者可能无意识地请求无界需求,然后无法处理生成的负载。
另一些消费者则可能对传入消息的速率有严格的限制。例如,数据库客户端每秒不能插入超过 1000 条记录。在这种情况下,事件批处理技术可能有所帮助。此时可以通过以下方式配置流以处理背压情况:
- onBackPressureBuffer: 操作符会请求无界需求并将返回的元素推送到下游。如果下游消费者无法跟上,那么元素将缓冲在队列中。
- onBackPressureDrop: 操作符也请求无界需求(Integer.MAX_VALUE)并向下游推送数据。如果下游请求数量不足,那么元素会被丢弃。自定义处理程序可以用来处理已丢弃的元素。
- onBackPressureLast :操作符与 onBackPressureDrop 的工作方式类似。只是会记住最近收到的元素,并在需求出现时立即将其推向下游。
- onBackPressureError: 操作符在尝试向下游推送数据时请求无界需求。如果下游消费者无法跟上,则操作符会引发错误。
管理背压的另一种方法是使用速率限制技术。limitRate(n) 操作符将下游需求拆分为不大于 n的较小批次。可以保护脆弱的生产者免受来自下游消费者的不合理数据请求的破坏。 limitRate(n) 操作符会限制来自下游消费者的需求(总请求值)。
如, limitRequest(100) 确保不会向生产者请求超过 100 个元素。发送 100 个事件后,操作符成功关闭流。
onBackpressureBuffer 操作符:
package com.blnp.net.reactor.stream;
import lombok.SneakyThrows;
import org.junit.Test;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/1 17:24
*/
public class BackpressureTest {
@SneakyThrows
@Test
public void test() {
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10))
.onBackpressureBuffer(900)
.delayElements(Duration.ofMillis(100))
.subscribe(
System.out::println,
ex -> {
System.out.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
System.out.println("main结束");
}
}
onBackpressureDrop 操作符:
@SneakyThrows
@Test
public void test1() {
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10))
.onBackpressureDrop()
.delayElements(Duration.ofMillis(100))
.subscribe(
System.out::println,
ex -> {
System.out.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
System.out.println("main结束");
}
onBackpressureLast 操作符:
@SneakyThrows
@Test
public void test2() {
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10))
.onBackpressureLatest()
.delayElements(Duration.ofMillis(100))
.subscribe(
System.out::println,
ex -> {
System.out.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
System.out.println("main结束");
}
onBackpressureError 操作符:
@SneakyThrows
@Test
public void test3() {
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10))
.onBackpressureError()
.delayElements(Duration.ofMillis(100))
.subscribe(
System.out::println,
ex -> {
System.out.println(ex);
latch.countDown();
},
() -> {
System.out.println("处理完成");
latch.countDown();
}
);
latch.await();
System.out.println("main结束");
}
1.9、热数据流和冷数据流
冷发布者行为方式:无论订阅者何时出现,都为该订阅者生成所有序列数据,没有订阅者就不会生成数据。以下代码表示冷发布者的行为:
@Test
public void test() {
Flux < String > coldPublisher = Flux.defer(() -> {
System.out.println("生成新数据");
return Flux.just(UUID.randomUUID().toString());
});
System.out.println("尚未生成新数据");
coldPublisher.subscribe(e -> System.out.println("onNext: " + e));
coldPublisher.subscribe(e -> System.out.println("onNext: " + e));
System.out.println("为两个订阅者生成了两次数据");
}
每当订阅者出现时都会有一个新序列生成,而这些语义可以代表 HTTP 请求。热发布者中的数据生成不依赖于订阅者而存在。因此,热发布者可能在第一个订阅者出现之前开始生成元素。
这种语义代表数据广播场景。例如,一旦股价发生变化,热发布者就可以向其订阅者广播有关当前股价的更新。
但是,当订阅者到达时,它仅接收未来的价格更新,而不接受先前价格历史。Reactor 库中的大多数热发布者扩展了 Processor 接口。但是,just 工厂方法会生成一个热发布者,因为它的值只在构建发布者时计算一次,并且在新订阅者到达时不会重新计算。
可以通过将 just 包装在 defer 中来将其转换为冷发行者。这样,即使 just 在初始化时生成值,这种初始化也只会在新订阅出现时发生。后一种行为由 defer 工厂方法决定。
1.9.1、多播流元素(🍕重点)
通过响应式转换将冷发布者转变为热发布者。如,一旦所有订阅者都准备好生成数据,希望在几个订阅者之间共享冷处理器的结果。同时,我们又不希望为每个订阅者重新生成数据。Project Reactor为此目的提供了 ConnectableFlux 。
ConnectableFlux ,不仅可以生成数据以满足最急迫的需求,还会缓存数据,以便所有其他订阅者可以按照自己的速度处理数据。队列和超时的大小可以通过类的 publish 方法和 replay 方法进行配置。
此外, ConnectableFlux 可以使用 connect 、 autoConnect(n) 、 refCount(n) 和 refCount(int,Duration) 等方法自动跟踪下游订阅者的数量,以便在达到所需阈值时触发执行操作。如下案例:
@Test
public void test1() {
Flux < Integer > source = Flux.range(0, 3)
.doOnSubscribe(
s -> System.out.println("对冷发布者的新订阅票据:" + s)
);
final ConnectableFlux < Integer > conn = source.publish();
conn.subscribe(item -> System.out.println("[Subscriber 1] onNext:" + item));
conn.subscribe(item -> System.out.println("[Subscriber 2] onNext:" + item));
System.out.println("所有订阅者都准备好建立连接了");
conn.connect();
}
可以看到,冷发布者收到了订阅,只生成了一次数据项。但是,两个订阅者都收到了整个事件集合。
1.9.2、缓存流元素
使用 ConnectableFlux 可以轻松实现不同的数据缓存策略。但是,Reactor 已经以 cache 操作符的形式提供了用于事件缓存的 API。
cache 操作符使用 ConnectableFlux ,因此它的主要附加值是它所提供的一个流式而直接的API。可以调整缓存所能容纳的数据量以及每个缓存项的到期时间。示例代码:
@SneakyThrows
@Test
public void test2() {
Flux < Integer > source = Flux.range(0, 5)
.doOnSubscribe(
s -> System.out.println("冷发布者的新订阅票据")
);
final Flux < Integer > cachedSource = source.cache(Duration.ofMillis(1000));
cachedSource.subscribe(item -> System.out.println("[S 1] onNext: " + item));
cachedSource.subscribe(item -> System.out.println("[S 2] onNext: " + item));
Thread.sleep(1200);
cachedSource.subscribe(item -> System.out.println("[S 3] onNext: " + item));
}
前两个订阅者共享第一个订阅的同一份缓存数据。然后,在一定延迟之后,由于第三个订阅者无法获取缓存数据,因此一个针对冷发布者的新订阅被触发了。最后,即使该数据不来自缓存,第三个订阅者也接收到了所需的数据。
1.9.3、共享流元素
我们可以使用 ConnectableFlux 向几个订阅者多播事件。但是需要等待订阅者出现才能开始处理。share 操作符可以将冷发布者转变为热发布者。该操作符会为每个新订阅者传播订阅者尚未错过的事件。
@SneakyThrows
@Test
public void test3() {
Flux < Integer > source = Flux.range(0, 5)
.delayElements(Duration.ofMillis(100))
.doOnSubscribe(s -> System.out.println("冷发布者新的订阅票据"));
Flux < Integer > cachedSource = source.share();
cachedSource.subscribe(item -> System.out.println("[S 1] onNext: " + item));
Thread.sleep(400);
cachedSource.subscribe(item -> System.out.println("[S 2] onNext: " + item));
Thread.sleep(1000);
}
在前面的代码中,共享了一个冷发布流,该流以每 100 毫秒为间隔生成事件。然后,经过一些延迟,一些订阅者订阅了共享发布者。第一个订阅者从第一个事件开始接收,而第二个订阅者错过了在其出现之前所产生的事件(S2 仅接收到事件 3 和事件 4)。
1.10、处理时间
响应式编程是异步的,因此它本身就假定存在时序。基于 Project Reactor,可以使用 interval 操作符生成基于一定持续时间的事件,使用 delayElements 操作符生成延迟元素,并使用 delaySequence 操作符延迟所有信号。
Reactor 的 API 使你能对一些与时间相关的事件做出响应, timestamp 操作符用于输出元素的时间戳, timeout 操作符用于指定消息时间间隔的大小。与 timestamp 类似, elapsed 操作符测量与上一个事件的时间间隔。
interval 操作符:
package com.blnp.net.reactor.stream;
import lombok.SneakyThrows;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/4 13:57
*/
public class HandleTimeTest {
@SneakyThrows
@Test
public void test() {
/**
* 每 100 毫秒执行一次
**/
Flux.interval(Duration.ofMillis(100))
.subscribe(
item -> {
System.out.println(Thread.currentThread().getName() + " -- " + item);
}
);
/**
* 延时3秒后执行,每 100 毫秒执行一次
**/
Flux.interval(Duration.ofSeconds(3), Duration.ofMillis(100))
.subscribe(System.out::println);
Thread.sleep(5000);
}
@SneakyThrows
@Test
public void test1() {
Flux.interval(Duration.ofMillis(100), Schedulers.parallel())
.subscribe(
item -> {
System.out.println(Thread.currentThread().getName() + " -- " + item);
}
);
Thread.sleep(5000);
}
@SneakyThrows
@Test
public void test2() {
Flux.interval(Duration.ofMillis(100), Schedulers.newSingle("count"))
.subscribe(
item -> {
System.out.println(Thread.currentThread().getName() + " -- " + item);
}
);
Thread.sleep(5000);
}
}
delayElements 操作符:
@SneakyThrows
@Test
public void test3() {
Flux.range(1, 1000)
.delayElements(Duration.ofSeconds(1))
.subscribe(item -> {
System.out.println(Thread.currentThread().getName() + " -- " + item);
});
Thread.sleep(5000);
System.out.println("结束");
}
delaySequence 操作符:
@SneakyThrows
@Test
public void test4() {
/**
* 指定订阅时间和第一个元素发布事件的时间间隔
**/
Flux.range(1, 1000)
.delaySequence(Duration.ofSeconds(3))
.subscribe(item -> {
System.out.println(Thread.currentThread().getName() + " -- " + item);
});
Thread.sleep(6000);
System.out.println("结束");
}
timeout 操作符:
@SneakyThrows
@Test
public void test5() {
Random random = new Random();
CountDownLatch latch = new CountDownLatch(1);
// 指定时间间隔大于我们的timeout时间,就抛异常
Flux.interval(Duration.ofMillis(300))
.timeout(Duration.ofMillis(random.nextInt(20) + 290))
.subscribe(
System.out::println,
ex -> {
System.err.println(ex);
latch.countDown();
}
);
latch.await(10, TimeUnit.SECONDS);
}
timestamp 操作符:
@SneakyThrows
@Test
public void test6() {
// 为我们响应式流里面增加一个响应式时间戳,时间戳和元素会以二元组形式在流里面传递
Flux.interval(Duration.ofMillis(300))
.timestamp()
.subscribe(item -> {
Long timeStamp = item.getT1();
Long element = item.getT2();
String result = element + " 的时间戳 " + timeStamp;
System.out.println(result);
});
Thread.sleep(5000);
}
elapsed 操作符:
@SneakyThrows
@Test
public void test7() {
// elapsed
Flux.interval(Duration.ofMillis(300))
.elapsed()
.subscribe(
item -> {
Long interval = item.getT1();
Long element = item.getT2();
String result = element + " 与上一个元素的时间间隔 " + interval + "ms";
System.out.println(result);
}
);
Thread.sleep(5000);
}
从前面的输出中可以明显看出,事件并未恰好在 300 毫秒的时间间隔内到达。发生这种情况是因为 Reactor 使用 Java 的 ScheduledExecutorService 进行调度事件,而这些事件本身并不能保证精确的延迟。因此,应该注意不要在 Reactor 库中要求太精确的时间(实时)间隔。
1.11、组合和转换响应式流
当我们构建复杂的响应式工作流时,通常需要在几个不同的地方使用相同的操作符序列。transform 操作符,可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。transform 操作符,可以增强流结构本身。示例代码:
package com.blnp.net.reactor.stream;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
import java.util.function.Function;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/4 14:31
*/
public class TransformTest {
@Test
public void test() {
Function<Flux<String>,Flux<String>> logUserInfo = stream -> stream
.index()
.doOnNext(tp -> System.out.println("[" + tp.getT1() + "] User: " + tp.getT2()))
.map(Tuple2::getT2);
Flux.range(1000,3)
.map(i -> "user-"+i)
.transform(logUserInfo)
.subscribe(e -> System.out.println("onNext: " + e));
}
}
transform 操作符仅在流生命周期的组装阶段更新一次流行为,可以在响应式应用程序中实现代码重用。
1.12、处理器
响应式流规范定义了 Processor 接口。Processor 既是 Publisher 也是 Subscriber。因此,既可以订阅 Processor 实例,也可以手动向它发送信号(onNext、onError 和 onComplete)。Reactor 的作者建议忽略处理器,因为它们很难使用并且容易出错。
在大多数情况下,处理器可以被操作符的组合所取代。另外,生成器工厂方法(push、create 和 generate)可能更适合适配外部 API。Reactor 提出以下几种处理器:
- Direct :处理器只能通过操作处理器的接收器来推送因用户手动操作而产生的数据。
- DirectProcessor 和 UnicastProcessor 是这组处理器的代表。
- DirectProcessor 不处理背压,可用于向多个订阅者发布事件。
- UnicastProcessor 使用内部队列处理背压,最多只能为一个 Subscriber 服务。
-
Synchronous 处理器
-
EmitterProcessor 和 ReplayProcessor 可以同时通过手动方式和订阅上游 Publisher 的方式来推送数据。
-
EmitterProcessor 可以为多个订阅者提供服务并满足它们的需求,但仅能以同步方式消费由单一 Publisher 产生的数据。
-
ReplayProcessor 的行为类似于 EmitterProcessor ,但是它能使用几种策略来缓存传入的数据。
-
-
Asynchronous 处理器
-
WorkQueueProcessor 和 TopicProcessor 可以推送从多个上游发布者处获得的下游数据。
-
为了处理多个上游发布者,这些处理器使用 RingBuffer数据结构。这些处理器具有专用的构建器 API,因为配置选项的数量使它们很难初始化。
-
TopicProcessor 兼容响应式流,并可以为每个下游 Subscriber 关联一个 Thread 来处理交互。它可以服务的下游订阅者数量有限。
-
WorkQueueProcessor 具有与 TopicProcessor 类似的特性。但是,它放宽了一些响应式流要求,这使它在运行时所使用的资源更少。
-
1.13、调试Project Reactor
Reactor 库附带了一个通用的测试框架。 io.projectreactor:reactor-test 库提供了测试Project Reactor 所实现的响应式工作流所需的所有必要工具。
虽然响应式代码不那么容易调试,但是 Project Reactor 提供了能在需要时简化调试过程的技术。与任何基于回调的框架一样,Project Reactor 中的栈跟踪信息量不大。它们没有在代码中给出发生异常情况的确切位置。Reactor 库具有面向调试的组装时检测功能,可以使用以下代码激活:
Hooks.onOperatorDebug();
示例程序:
package com.blnp.net.reactor.stream;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/11/4 14:51
*/
public class ReactorTest {
@Test
public void test() {
//响应式测试功能检测激活
Hooks.onOperatorDebug();
Flux.range(1,10)
.map(item -> "item-" + item)
.concatWith(Flux.error(new RuntimeException("手动异常")))
.subscribe(System.out::println);
}
}
配置Hooks:
没配置:
启用后,此功能开始收集将要组装的所有流的栈跟踪,稍后此信息可以基于组装信息扩展栈跟踪信息,从而帮助我们更快地发现问题。但是,创建栈跟踪的过程成本很高。因此,作为最后的手段,它应该只以受控的方式进行激活。
此外,Project Reactor 的 Flux 和 Mono 类型提供了一个被称为 log 的便捷方法。它能记录使用操作符的所有信号。即使在调试情况下,许多方法的自定义实现也可以提供足够的自由度来跟踪所需的数据。如下代码:
Flux.range(1, 10)
.map(item -> "item-" + item)
.concatWith(Flux.error(new RuntimeException("手动异常")))
.log()
.subscribe(System.out::println);
1.14、Reactor 插件
Project Reactor 是一个通用且功能丰富的库。但是,它无法容纳所有有用的响应式工具。因此,有一些项目在一些领域扩展了 Reactor 的功能。官方的 Reactor 插件项目为 Reactor 项目提供了几个模块。
reactor-adapter 模块为 RxJava 2 响应式类型和调度程序提供桥接。此外,该模块还能与Akka 进行集成。
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-adapter</artifactId>
<version>3.4.0</version>
</dependency>
reactor-logback 模块提供高速异步日志记录功能。它以 Logback 的 AsyncAppender和 LMAXDisruptor 的 RingBuffer 为基础,其中后者通过 Reactor 的 Processor 实现。
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-logback</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
reactor-extra 模块包含用于高级需求的其他实用程序。例如,该模块包含 TupleUtils类,该类简化了编写 Tuple 类的代码。此外,该模块具有 MathFlux类,可以从数字源中计算最小值和最大值,并对它们求和或取平均。 ForkJoinPoolScheduler 类使 Java 的 ForkJoinPool 适配 Reactor 的Scheduler。可以使用以下导入方式将模块添加到项目中:
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.4.0</version>
</dependency>
此外,Project Reactor 生态系统还为流行的异步框架和消息代理服务器提供了响应式驱动程序。Reactor RabbitMQ 模块使用熟悉的 Reactor API 为 RabbitMQ 提供了一个响应式 Java 客户端。该模块不仅提供具有背压支持的异步非阻塞消息传递,还使应用程序能够通过使用 Flux和 Mono 类型将 RabbitMQ 用作消息总线。
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.5.0</version>
</dependency>
Reactor Kafka 模块为 Kafka 消息代理服务器提供了类似的功能。
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.1</version>
</dependency>
另一个广受欢迎的 Reactor 扩展被称为 Reactor Netty。它使用 Reactor 的响应式类型来适配Netty的 TCP/HTTP/UDP 客户端和服务器。Spring WebFlux 模块在内部使用 Reactor Netty 来构建非阻塞式Web 应用程序。
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.2</version>
</dependency>
原文地址:https://blog.csdn.net/qq_37165235/article/details/142827741
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!