自学内容网 自学内容网

Flux.from 使用说明书

from

public static <T> Flux<T> from(Publisher<? extends T> source)
Decorate the specified  Publisher with the  Flux API.

用 Flux API 装饰指定的 Publisher。

Hooks.onEachOperator(String, Function) and similar assembly hooks are applied unless the source is already a  Flux.

除非源已经是 Flux,否则会应用 Hooks.onEachOperator(String, Function) 和类似的组装钩子。

Type Parameters:

T - The type of values in both source and output sequences

Parameters:

source - the source to decorate

Returns:

a new  Flux

类型参数:

T - 源和输出序列中值的类型

参数:

source - 要装饰的源

返回:

一个新的 Flux。

public static <T> Flux<T> from(Publisher<? extends T> source) 

是 Reactor 框架中的一个静态方法,用于将一个 Publisher 转换为 Flux。

Flux 是 Reactor 中表示 0 到 N 个元素的反应式流的核心类型。

这个方法通常用于将其他类型的 Publisher(如 Mono、CompletableFuture 或其他 Flux)转化为 Flux。

方法解析:

  • 作用:将一个 Publisher 转换为 Flux,以便可以利用 Flux 提供的丰富的反应式操作符。
  • 泛型参数
  • <T>:表示 Publisher 发布的元素的类型。
  • 参数
  • source:一个实现了 Publisher 接口的实例,可以是任意发布类型的对象。

特性:

  1. 兼容性:允许将其他类型的发布者(如 Mono)转换为 Flux,使得你可以使用 Flux 的操作符来处理数据流。
  2. 元素数量:如果 source 只发出一个元素,转换后 Flux 仍然可以处理这个元素,允许返回一个 0 到 N 个元素的流。
  3. 错误处理:from 方法保留了原始 Publisher 的错误处理机制。当源 Publisher 发出错误时,生成的 Flux 也会发出错误信号。

使用场景:

  • 当你想从其他异步数据源(如 Mono 或其他 Flux)构建 Flux 时,可以使用这个方法。
  • 在组合多个流时,可以方便地将不同类型的 Publisher 转换为 Flux,使得流操作变得更加一致和便捷。

示例代码:

下面的示例展示了如何使用 from 方法将一个 Mono 转换为 Flux:

java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FromExample {
    public static void main(String[] args) {
        // 创建一个 Mono,表示单个元素Mono<String> monoSource = Mono.just("Hello, World!");

        // 使用 from 方法将 Mono 转换为 FluxFlux<String> fluxFromMono = Flux.from(monoSource);

        // 订阅并打印结果fluxFromMono.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );
    }
}

输出结果:

Received: Hello, World!
Flux completed!

解析:

  • 在这个示例中,首先创建了一个 Mono 对象 monoSource,它发出一个字符串 "Hello, World!"。
  • 使用 Flux.from(monoSource) 方法将 Mono 转换为 Flux,然后订阅该 Flux。
  • 订阅者接收到数据后,打印出 "Received: Hello, World!",并在完成时打印 "Flux completed!"。

总结:

  • Flux.from(Publisher<? extends T> source) 方法是一个非常实用的工具,用于将任何实现了 Publisher 接口的对象转换为 Flux。这使得在反应式编程中更方便地处理和组合不同类型的异步数据流。通过这个方法,可以轻松地将单一元素流(如 Mono)转化为可以处理多个元素的 Flux。

fromArray

public static <T> Flux<T> fromArray(T[] array)
Create a  Flux that emits the items contained in the provided array.

创建一个 Flux,它发出提供的数组中包含的项。

Type Parameters:

T - The type of values in the source array and resulting Flux

Parameters:

array - the array to read data from

Returns:

a new  Flux

类型参数:

T - 源数组和结果 Flux 中值的类型

参数:

array - 要读取数据的数组

返回:

一个新的 Flux。

public static <T> Flux<T> fromArray(T[] array) 

是 Reactor 框架中的一个静态方法,用于将一个数组转换为 Flux。

这个方法允许你将一个包含任意数量元素的数组转化为一个反应式流(Flux),以便进行流式处理。

方法解析:

  • 作用:将一个包含元素的数组转换为 Flux,从而可以利用 Flux 提供的丰富操作符进行处理。
  • 泛型参数
  • <T>:表示数组中元素的类型。
  • 参数
  • array:要转换为 Flux 的数组,包含 0 个或多个元素。

特性:

  1. 元素数量:可以接受一个任意数量的元素的数组(包括 0 个元素),并将这些元素发布为 Flux 流。
  2. 错误处理:fromArray 方法不会直接处理错误,但如果提供的数组为 null,则会抛出 NullPointerException。
  3. 完成信号:当所有元素都被发布后,生成的 Flux 将会完成。

使用场景:

  • 当你需要将一个固定数量的元素(如配置值或常量集合)转换为反应式流时,可以使用此方法。
  • 结合其他 Flux 操作符,便于对数组中的元素进行批量处理。

示例代码:

下面的示例展示了如何使用 fromArray 方法将一个字符串数组转换为 Flux:

java

import reactor.core.publisher.Flux;

public class FromArrayExample {
    public static void main(String[] args) {
        // 创建一个字符串数组String[] stringArray = {"Apple", "Banana", "Cherry", "Date"};

        // 使用 fromArray 方法将数组转换为 FluxFlux<String> fluxFromArray = Flux.fromArray(stringArray);

        // 订阅并打印结果fluxFromArray.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );
    }
}

输出结果:

Received: Apple
Received: Banana
Received: Cherry
Received: Date
Flux completed!

解析:

  • 在这个示例中,首先创建了一个包含四个水果名称的字符串数组 stringArray。
  • 使用 Flux.fromArray(stringArray) 方法将数组转换为 Flux。
  • 通过订阅该 Flux,每个元素都被逐个处理并打印出来,最后输出 "Flux completed!",表示流的完成。

总结:

  • Flux.fromArray(T[] array) 方法是将一个数组转换为 Flux 的简便方法。这使得在处理一组固定数量的元素时,可以方便地使用反应式编程的特性。这个方法可以与其他 Flux 操作符组合使用,以实现复杂的流式处理逻辑。

fromIterable

public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
Create a  Flux that emits the items contained in the provided  Iterable. The  Iterable.iterator() method will be invoked at least once and at most twice for each subscriber.

创建一个 Flux,它发出提供的 Iterable 中包含的项。对于每个订阅者,Iterable.iterator() 方法将被调用至少一次,最多调用两次。

This operator inspects the  Iterable's  Spliterator to assess if the iteration can be guaranteed to be finite (see  Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two  Iterable.iterator() calls. This second invocation is skipped on a  Collection however, a type which is assumed to be always finite.

此操作符检查 Iterable 的 Spliterator,以评估迭代是否可以保证是有限的(请参见 Operators.onDiscardMultiple(Iterator, boolean, Context))。由于默认的 Spliterator 包装了 Iterator,因此我们可以进行两次 Iterable.iterator() 调用。然而,对于 Collection 类型,这第二次调用会被跳过,因为它被认为总是有限的。

Discard Support: 

Upon cancellation, this operator attempts to discard the remainder of the  Iterable if it can safely ensure the iterator is finite. Note that this means the  Iterable.iterator() method could be invoked twice.

Type Parameters:

T - The type of values in the source  Iterable and resulting Flux

Parameters:

it - the  Iterable to read data from

Returns:

a new  Flux

丢弃支持:

在取消时,此操作符会尝试丢弃 Iterable 的其余部分,前提是它能够安全地确保迭代器是有限的。请注意,这意味着 Iterable.iterator() 方法可能会被调用两次。

类型参数:

T - 源 Iterable 和结果 Flux 中的值类型

参数:

it - 要读取数据的 Iterable

返回:

一个新的 Flux

public static <T> Flux<T> fromIterable(Iterable<? extends T> it)

是 Reactor 框架中的一个静态方法,用于将一个可迭代的集合(如 List、Set 等)转换为 Flux。

这使得我们能够将一个集合中的元素以反应式的方式进行处理。

方法解析:

  • 作用:将一个 Iterable 转换为 Flux,以便进行流式处理。
  • 泛型参数
  • <T>:表示集合中元素的类型。
  • 参数
  • it:要转换为 Flux 的 Iterable 集合,包含 0 个或多个元素。

特性:

  1. 元素数量:可以接受一个包含任意数量元素的 Iterable,并将这些元素发布为 Flux 流。
  2. 错误处理:如果传入的 Iterable 为 null,则会抛出 NullPointerException。
  3. 完成信号:当所有元素都被发布后,生成的 Flux 将会完成。

使用场景:

  • 当你需要将一个集合(如 List 或 Set)转换为反应式流时,可以使用此方法。
  • 结合其他 Flux 操作符,便于对集合中的元素进行批量处理。

示例代码:

下面的示例展示了如何使用 fromIterable 方法将一个 List 转换为 Flux:

java

import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;

public class FromIterableExample {
    public static void main(String[] args) {
        // 创建一个 List 集合List<String> fruitList = Arrays.asList("Apple", "Banana", "Cherry", "Date");

        // 使用 fromIterable 方法将 List 转换为 FluxFlux<String> fluxFromIterable = Flux.fromIterable(fruitList);

        // 订阅并打印结果fluxFromIterable.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );
    }
}

输出结果:

Received: Apple
Received: Banana
Received: Cherry
Received: Date
Flux completed!

解析:

  • 在这个示例中,首先创建了一个包含四个水果名称的 List 集合 fruitList。
  • 使用 Flux.fromIterable(fruitList) 方法将集合转换为 Flux。
  • 通过订阅该 Flux,每个元素都被逐个处理并打印出来,最后输出 "Flux completed!",表示流的完成。

总结:

  • Flux.fromIterable(Iterable<? extends T> it) 方法是将一个可迭代的集合转换为 Flux 的便捷方法。这使得在处理一组元素时,可以方便地使用反应式编程的特性。这个方法可以与其他 Flux 操作符组合使用,以实现复杂的流式处理逻辑。

fromStream

public static <T> Flux<T> fromStream(Stream<? extends T> s)
Create a  Flux that emits the items contained in the provided  Stream. Keep in mind that a  Stream cannot be re-used, which can be problematic in case of multiple subscriptions or re-subscription (like with  repeat() or  retry()). The  Stream is  closed automatically by the operator on cancellation, error or completion.

创建一个 Flux,该 Flux 会发出提供的 Stream 中包含的项。请注意,Stream 不能被重复使用,这可能会在多次订阅或重新订阅(例如使用 repeat() 或 retry() 时)造成问题。Stream 会在取消、错误或完成时由操作符自动关闭。

Discard Support: 

Upon cancellation, this operator attempts to discard remainder of the  Stream through its open  Spliterator, if it can safely ensure it is finite (see  Operators.onDiscardMultiple(Iterator, boolean, Context)).

Type Parameters:

T - The type of values in the source  Stream and resulting Flux

Parameters:

s - the  Stream to read data from

Returns:

a new  Flux

丢弃支持:

在取消时,如果该操作符能够安全地确保 Stream 是有限的,它会通过其打开的 Spliterator 尝试丢弃 Stream 的剩余部分(请参见 Operators.onDiscardMultiple(Iterator, boolean, Context))。

类型参数:

T - 源 Stream 和生成的 Flux 中值的类型

参数:

s - 要读取数据的 Stream

返回:

一个新的 Flux

public static <T> Flux<T> fromStream(Stream<? extends T> s)

是 Reactor 框架中提供的一个静态方法,用于将 Java 8 的 Stream 转换为 Flux。

这使得我们能够将一个流中的元素以反应式的方式进行处理。

方法解析:

  • 作用:将一个 Stream 转换为 Flux,以便进行流式处理。
  • 泛型参数
  • <T>:表示流中元素的类型。
  • 参数
  • s:要转换为 Flux 的 Stream,可以是任意类型的 Stream。

特性:

  1. 元素数量:可以接受任意数量的元素,包括空流。
  2. 错误处理:如果传入的 Stream 为 null,则会抛出 NullPointerException。
  3. 完成信号:当所有元素都被发布后,生成的 Flux 将会完成。

使用场景:

  • 当你需要将 Java 8 的 Stream 转换为反应式流时,可以使用此方法。
  • 结合其他 Flux 操作符,便于对流中的元素进行批量处理。

示例代码:

下面的示例展示了如何使用 fromStream 方法将一个 Stream 转换为 Flux:

java

import reactor.core.publisher.Flux;

import java.util.stream.Stream;

public class FromStreamExample {
    public static void main(String[] args) {
        // 创建一个 StreamStream<String> fruitStream = Stream.of("Apple", "Banana", "Cherry", "Date");

        // 使用 fromStream 方法将 Stream 转换为 Flux
        Flux<String> fluxFromStream = Flux.fromStream(fruitStream);

        // 订阅并打印结果
        fluxFromStream.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );
    }
}

输出结果:

Received: Apple
Received: Banana
Received: Cherry
Received: Date
Flux completed!

解析:

  • 在这个示例中,首先创建了一个包含四个水果名称的 Stream。
  • 使用 Flux.fromStream(fruitStream) 方法将流转换为 Flux。
  • 通过订阅该 Flux,每个元素都被逐个处理并打印出来,最后输出 "Flux completed!",表示流的完成。

注意事项:

  • Stream 是一次性消费的,一旦流被消费完,不能再重新使用。因此,在调用 fromStream 方法后,流中的元素将被发布到 Flux 中,之后流将不可用。
  • 如果需要多个 Flux 实例,建议在调用 fromStream 之前,先将流的内容收集到一个集合中,然后再从集合创建 Flux。

总结:

  • Flux.fromStream(Stream<? extends T> s) 方法是将 Java 8 的 Stream 转换为 Flux 的便捷方法。这使得在处理一个流中的元素时,可以方便地使用反应式编程的特性,结合其他 Flux 操作符,实现复杂的流式处理逻辑。

fromStream

public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier)
Create a  Flux that emits the items contained in a  Stream created by the provided  Supplier for each subscription. The  Stream is  closed automatically by the operator on cancellation, error or completion.

创建一个 Flux,该 Flux 会为每个订阅发出由提供的 Supplier 创建的 Stream 中的项。Stream 会在取消、错误或完成时自动关闭。

Discard Support:

Upon cancellation, this operator attempts to discard remainder of the  Stream through its open  Spliterator, if it can safely ensure it is finite (see  Operators.onDiscardMultiple(Iterator, boolean, Context)).

Type Parameters:

T - The type of values in the source  Stream and resulting Flux

Parameters:

streamSupplier - the  Supplier that generates the  Stream from which to read data

Returns:

a new  Flux

丢弃支持:

在取消时,该操作符尝试通过其打开的 Spliterator 丢弃 Stream 的剩余部分,如果它可以安全地确保该 Stream 是有限的(请参见 Operators.onDiscardMultiple(Iterator, boolean, Context))。

类型参数:

T - 源 Stream 和结果 Flux 中的值类型

参数:

streamSupplier - 生成 Stream 的 Supplier,以便从中读取数据

返回:

一个新的 Flux

public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier) 

是 Reactor 框架中的一个静态方法,用于将一个 Stream 转换为 Flux。

与之前提到的 fromStream(Stream<? extends T> s) 方法不同,这个方法接受一个 Supplier,这意味着每次调用都会提供一个新的 Stream。

方法解析:

  • 作用:通过提供一个 Supplier,每次调用 Flux 的订阅时,都会创建一个新的 Stream。
  • 泛型参数
  • <T>:表示流中元素的类型。
  • 参数
  • streamSupplier:返回一个要转换为 Flux 的 Stream 的供应商。

特性:

  1. 懒加载:每次订阅时,都会调用 streamSupplier,生成新的 Stream。这对于需要在每次订阅时重新生成数据的情况非常有用。
  2. 资源管理:如果 Stream 中的元素是来自某种资源(如文件、数据库等),那么在流处理完成后,资源可以被释放。
  3. 错误处理:如果返回的 Stream 为 null,则会抛出 NullPointerException。

使用场景:

  • 适合需要动态生成数据流的场景,尤其是当数据来源可能会在每次订阅时改变时。

示例代码:

下面的示例展示了如何使用 fromStream 方法通过 Supplier 创建 Flux:

java

import reactor.core.publisher.Flux;

import java.util.stream.Stream;
import java.util.function.Supplier;

public class FromStreamSupplierExample {
    public static void main(String[] args) {
        // 创建一个 Supplier,用于返回一个 StreamSupplier<Stream<String>> streamSupplier = () -> Stream.of("Apple", "Banana", "Cherry", "Date");

        // 使用 fromStream 方法将 Supplier 转换为 FluxFlux<String> fluxFromStream = Flux.fromStream(streamSupplier);

        // 订阅并打印结果fluxFromStream.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );

        // 重新订阅,演示 Supplier 的懒加载特性System.out.println("Second subscription:");
        fluxFromStream.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Flux completed!")
        );
    }
}

输出结果:

Received: Apple
Received: Banana
Received: Cherry
Received: Date
Flux completed!
Second subscription:
Received: Apple
Received: Banana
Received: Cherry
Received: Date
Flux completed!

解析:

  • 在这个示例中,首先定义了一个 Supplier,该 Supplier 返回一个包含四个水果名称的 Stream。
  • 使用 Flux.fromStream(streamSupplier) 方法将 Supplier 转换为 Flux。
  • 通过首次订阅,元素被逐个处理并打印。接着,进行了第二次订阅,验证了 Supplier 的懒加载特性,再次打印出相同的元素。

注意事项:

  • 由于每次调用 streamSupplier 都会生成新的 Stream,因此你可以在流生成过程中控制流的内容。
  • 该方法适用于需要从外部源(如文件或数据库)动态获取数据的场景。

总结:

Flux.fromStream(Supplier<Stream<? extends T>> streamSupplier) 方法是一个强大的工具,它允许你通过 Supplier 动态生成 Stream,并将其转换为 Flux。这种懒加载特性使得每次订阅都能生成新的数据流,非常适合处理变化的或动态生成的数据。


原文地址:https://blog.csdn.net/heliangb46/article/details/143084210

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!