Kotlin 协程库v1.7.1的核心模块(kotlinx-coroutines-core)-- kotlinx.coroutines.flow篇
Flow 是一种异步数据流的表示,可以用于处理异步数据流的操作。
目录
AbstractFlow
协程库中的一个抽象类,用于创建具有状态的 Flow 实现。它跟踪了用于上下文保留的所有属性,并在任何属性被违反时抛出 IllegalStateException。
fun main() = runBlocking {
val text = StringBuffer()
val values = listOf(1, 2, 3, 4, 5)
val countingListFlow = CountingListFlow(values)
countingListFlow.collect{ value ->
// 处理收集到的每个值
println("Collected value: $value")
text.append("-- $value")
}
println("---- text${text.toString()} ")
println(countingListFlow.toDiagnosticString())
delay(6000)
}
class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {
private val collectedCounter = AtomicInteger(0)
override suspend fun collectSafely(collector: FlowCollector<Int>) {
collectedCounter.incrementAndGet() // 增加收集计数
println("collectSafely${collectedCounter.get()} ")
values.forEach { // 发射所有的值
collector.emit(it)
}
}
fun toDiagnosticString(): String = "Flow with values $values was collected ${collectedCounter.get()} times"
}
collectSafely1
Collected value: 1
Collected value: 2
Collected value: 3
Collected value: 4
Collected value: 5
---- text-- 1-- 2-- 3-- 4-- 5
Flow with values [1, 2, 3, 4, 5] was collected 1 times
asFlow
kotlinx.coroutines.flow 包中的一个扩展函数,用于将不同的数据源转换为 Flow。比如:
- 函数:对应普通函数和挂起函数都是一样的操作
fun main() { val flow1: Flow<String> = singleValueFunction.asFlow() runBlocking { flow1.collect { value -> println(value) // 输出: Hello, World! } } val flow2: Flow<String> = ::delayedValue.asFlow() runBlocking { flow2.collect { value -> println(value) // 输出: Delayed Hello! } } } val singleValueFunction: () -> String = { "Hello, World!" } suspend fun delayedValue(): String { delay(1000) // 模拟异步操作 return "Delayed Hello!" } Hello, World! Delayed Hello!
- Iterable、Iterator、Sequence 或 Array
val list = listOf(1, 2, 3, 4, 5) val flowFromList: Flow<Int> = list.asFlow() val iterator = listOf(1, 2, 3, 4, 5).iterator() val flowFromIterator: Flow<Int> = iterator.asFlow() val sequence = sequenceOf(1, 2, 3, 4, 5) val flowFromSequence: Flow<Int> = sequence.asFlow() val array = arrayOf(1, 2, 3, 4, 5) val flowFromArray: Flow<Int> = array.asFlow()
- Range
val intRange = 1..5 val flowFromIntRange: Flow<Int> = intRange.asFlow() val longRange = 1L..5L val flowFromLongRange: Flow<Long> = longRange.asFlow()
asSharedFlow
将可变的共享流(MutableSharedFlow)转换为只读的共享流(SharedFlow)。在某些情况下将可变的流限制为只读的形式,以便在代码中传递并确保只读取流的内容。
fun main() = runBlocking {
// 创建一个可变的共享流
val mutableSharedFlow = MutableSharedFlow<Int>()
// 将可变的共享流转换为只读的共享流
val sharedFlow: SharedFlow<Int> = mutableSharedFlow.asSharedFlow()
// 启动一个协程,向可变的共享流发送数据
launch {
repeat(5) {
delay(1000)
mutableSharedFlow.emit(it)
}
}
// 启动另一个协程,收集只读的共享流的数据
launch {
sharedFlow.collect {
println("Received: $it")
}
}
delay(6000) // 等待足够的时间以触发所有数据的发送
}
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
asStateFlow
将可变的状态流(MutableStateFlow)转换为只读的状态流(StateFlow)
class SampleViewModel {
private val _mutableStateFlow = MutableStateFlow("Initial State")
// 将可变状态流转换为只读状态流
val stateFlow: StateFlow<String> = _mutableStateFlow.asStateFlow()
fun updateState(newState: String) {
// 可以通过 MutableStateFlow 写入新的状态
_mutableStateFlow.value = newState
}
}
fun main() = runBlocking {
val viewModel = SampleViewModel()
// 通过只读状态流收集数据
launch {
viewModel.stateFlow.collect { state ->
println("Current State: $state")
}
}
delay(1000)
// 模拟更新状态
viewModel.updateState("New State")
// 等待状态更新被收集
delay(1000)
}
Current State: Initial State
Current State: New State
buffer
在 Flow 的操作过程中引入一个缓冲区,以提高并发性能。
fun main() = runBlocking {
flowOf("A", "B", "C")
.onEach { println("1$it") }
.collect { println("2$it") }
//协程Q
//Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
flowOf("A", "B", "C")
.onEach { println("1$it") }
.buffer() // <--------------- buffer between onEach and collect
.collect { println("2$it") }
//它将使用两个协程来执行代码。调用此代码的协程Q将执行collect,buffer之前的代码将在一个单独的新协程P中与Q同时执行:
//P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }
//
// |
// | channel // buffer()
// V
//
//Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect
}
1A
2A
1B
2B
1C
2C
1A
1B
1C
2A
2B
2C
callbackFlow
将基于回调的异步 API 转换为协程的 Flow,使得异步操作可以以类似同步的方式被处理。这跟我前篇提到的suspendCancellableCoroutine:(单次回调) 类似却有些不一样他是处理(多次回调的)
// 假设有一个基于回调的 API 接口
interface CallbackBasedApi {
fun register(callback: Callback)
fun unregister(callback: Callback)
}
// 回调接口
interface Callback {
fun onNextValue(value: Int)
fun onApiError(cause: Throwable)
fun onCompleted()
}
// 使用 callbackFlow 将回调 API 转换为 Flow
fun flowFrom(api: CallbackBasedApi): Flow<Int> = callbackFlow {
// 创建回调对象
val callback = object : Callback {
override fun onNextValue(value: Int) {
// 发送值到 Flow
trySendBlocking(value)
.onFailure { throwable ->
// Downstream 已取消或失败,可以在这里进行日志记录
}
}
override fun onApiError(cause: Throwable) {
// 发生 API 错误时取消 Flow
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() {
// 当回调完成时关闭 Flow
channel.close()
}
}
// 注册回调
api.register(callback)
// 等待 Flow 完成或被取消
awaitClose { api.unregister(callback) }
}
var callBack:Callback? = null
// 在协程中使用 Flow
fun main() {
val api = object : CallbackBasedApi {
override fun register(callback: Callback) {
callBack = callback
println("注册回调")
}
override fun unregister(callback: Callback) {
callBack = null
println("取消注册回调")
}
}
// 启动协程收集 Flow 中的数据
// 这里只是简单的打印每个值,实际使用时可能需要更新 UI 或进行其他处理
val job = GlobalScope.launch {
flowFrom(api)
.collect { value ->
println("Received value: $value")
}
}
GlobalScope.launch {
delay(1000)
//模拟一些操作触发回调
callBack?.onNextValue(1)
callBack?.onNextValue(2)
delay(300)
callBack?.onNextValue(3)
// callBack?.onApiError(Throwable("111"))
delay(300)
callBack?.onNextValue(4)
callBack?.onCompleted()
}
// 等待协程完成
runBlocking {
job.join()
}
}
注册回调
Received value: 1
Received value: 2
Received value: 3
Received value: 4
取消注册回调
cancellable
操作符提供了一种快捷方式,相当于使用 .onEach { currentCoroutineContext().ensureActive() } 来检查协程的取消状态。具体而言,ensureActive: 是一个内置函数,用于检查当前协程是否已被取消,如果是,则抛出相应的取消异常。
fun createRandomNumberFlow(): Flow<Int> = flow {
repeat(10) {
delay(1000) // 模拟异步操作
emit(Random.nextInt(1, 100))
}
}
fun main() {
val job = runBlocking {
val cancellableFlow = createRandomNumberFlow().cancellable()
val job = launch {
try {
cancellableFlow.collect { value ->
println("Received: $value")
}
} catch (e: CancellationException) {
println("Flow was cancelled due to: ${e.cause}")
}
}
delay(3000) // 运行3秒后取消 Flow
job.cancel("Timeout exceeded")
job.join()
}
println("Main coroutine has completed")
}
Received: 15
Received: 13
Flow was cancelled due to: null
Main coroutine has completed
catch
一个用于捕获流(Flow)中异常的操作符。它可以用于捕获在流的产生、变换或收集过程中发生的异常,并执行特定的操作来处理这些异常
fun main() {
runBlocking {
// 创建一个流,模拟可能抛出异常的操作
val flowWithException = flow {
emit(1)
emit(2)
throw IOException("Simulated IOException")
emit(3) // 这个不会被执行
}
// 使用 catch 操作符来捕获异常并处理
flowWithException
.catch { e ->
if (e is IOException) {
// 在这里处理 IOException
println("Caught IOException: $e")
} else {
// 重新抛出其他异常
throw e
}
}
.onEach { value ->
// 这里的代码不受异常的影响
println("Received value: $value")
}
.catch { e ->
// 这个 catch 操作符用于处理第一个 catch 未处理的异常
println("Unhandled exception: $e")
}
.collect {
// 这里的代码不受异常的影响
println("Collected value: $it")
}
}
}
Received value: 1
Collected value: 1
Received value: 2
Collected value: 2
Caught IOException: java.io.IOException: Simulated IOException
channelFlow
一个构建 Flow 的函数,它允许您以异步和并发的方式生成 Flow 的元素
fun main() {
runBlocking {
val other = flowOf( 11, 23, 45, 76)
// 使用 channelFlow 创建冷流
val flow: Flow<Int> = channelFlow {
// ProducerScope 提供了 SendChannel 用于发送元素
for (i in 1..3) {
send(i)
}
other.collect {
send(it)
}
}
// 启动协程收集流中的元素
val job = launch {
flow.collect { value -> println("Received: $value") }
}
// 等待协程完成
job.join()
}
}
Received: 1
Received: 2
Received: 3
Received: 11
Received: 23
Received: 45
Received: 76
collect
用于收集 Flow 中所有元素的终端操作符。它的作用是触发 Flow 的收集操作,但它本身不处理任何收集到的元素。collect()是collect {}的简写
fun main() {
runBlocking {
// 创建一个简单的 Flow
val simpleFlow: Flow<Int> = flow {
for (i in 1..5) {
emit(i)
// 模拟异步操作
delay(1000)
}
}
// 使用 onEach 处理每个元素
simpleFlow
.onEach { value -> println("Received: $value") }
.catch { e -> println("Exception: $e") }
.collect() // 触发收集操作
}
}
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
collectIndexed
使用提供的动作收集给定的流,该动作接受元素的索引(从零开始)和元素本身,比collect {}多一个索引
fun main() {
runBlocking {
// 创建一个简单的 Flow
val simpleFlow: Flow<Int> = flow {
for (i in 11..16) {
emit(i)
// 模拟异步操作
delay(1000)
}
}
// 使用 onEach 处理每个元素
simpleFlow
.onEach { value -> println("Received: $value") }
.catch { e -> println("Exception: $e") }
.collectIndexed { index, value ->
println("Received: index--$index value--$value")
}
}
}
Received: 11
Received: index--0 value--11
Received: 12
Received: index--1 value--12
Received: 13
Received: index--2 value--13
Received: 14
Received: index--3 value--14
Received: 15
Received: index--4 value--15
Received: 16
Received: index--5 value--16
collectLatest
提供一个挂起的操作来处理这个流的每个值。与 collect 不同的是,当原始流发出新值时,会取消上一个值的操作块
val flow = flow {
emit(1)
delay(50)
emit(2)
}
flow
.onEach { value -> println("Emitting $value") }
.flowOn(Dispatchers.Default) // 设置流(flow)运行在后台线程
.collectLatest { value ->
println("Collecting $value")
delay(100) // 模拟耗时操作
println("$value collected")
}
Emitting 1
Collecting 1
Emitting 2
Collecting 2
2 collected
combine
用于将多个 Flow 的最新值进行合并,然后通过指定的转换函数生成新的 Flow。这允许您在处理异步数据流时,根据不同的数据源生成新的数据流。
fun main() {
runBlocking {
val flow = flowOf(1, 2).onEach { delay(1000) }
val flow2 = flowOf("a", "b", "c").onEach { delay(1500) }
// 1000触发(flow:1 flow2:null)
// 1500触发(flow:1 flow2:a)
// 2000触发(flow:2 flow2:a)
// 3000触发(flow:2 flow2:b)
// 4500触发(flow:2 flow2:c)
combine(flow, flow2) { i, s -> "$i$s" }.collect {
println(it) // Will print "1a 2a 2b 2c"
}
// 同效果代码
// flow.combine(flow2) { i, s -> "$i$s" }.collect {
// println(it) // Will print "1a 2a 2b 2c"
// }
}
}
combineTransform
带Transform的组合函数其实生命哪个示例就是combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }的简写
fun requestFlow(): Flow<String> = flow {
emit("Requesting data")
delay(1000)
emit("Data received")
}
fun searchEngineFlow(): Flow<String> = flow {
emit("Searching for results")
delay(1500)
emit("Results found")
}
fun download(request: String, searchEngine: String): String {
return "$request - $searchEngine"
}
fun main() = runBlocking {
val flow1 = requestFlow()
val flow2 = searchEngineFlow()
flow1.combineTransform(flow2) { request, searchEngine ->
emit("Downloading in progress")
val result = download(request, searchEngine)
emit(result)
}.collect {
println(it)
}
}
Requesting data - Searching for results
Downloading in progress
Data received - Searching for results
Downloading in progress
Data received - Results found
conflate
通过使用 conflated channel 来合并流的发射,同时在一个单独的协程中运行收集器。其效果是,发射器不会因为慢速的收集器而被挂起,而收集器总是获取到最近发射的值。buffer 操作符的一种快捷方式,等同于使用 buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
consumeAsFlow
ReceiveChannel 转换为一个热流,并在第一次对此流进行收集时消耗该通道。结果的流只能被收集一次,如果尝试多次收集它,将抛出 IllegalStateException。
fun main() = runBlocking {
// 创建一个通道
val channel = Channel<Int>()
// 向通道发送数据
launch {
for (i in 1..5) {
channel.send(i)
}
channel.close()
}
// 使用 consumeAsFlow 将通道转换为流
val flow = channel.consumeAsFlow()
// 收集流的数据
flow.collect {
println(it)
}
}
1
2
3
4
5
count
计算 Flow 中元素的数量。有两个重载的函数:
- count():返回 Flow 中元素的总数量
- count(predicate: suspend (T) -> Boolean):返回 Flow 中满足给定条件的元素的数量
fun main() = runBlocking {
val numbersFlow: Flow<Int> = listOf(1, 2, 3, 4, 5).asFlow()
// 计算 Flow 中元素的总数量
val totalCount = numbersFlow.count()
println("Total Count: $totalCount")
// 计算 Flow 中满足条件的元素数量
val evenCount = numbersFlow.count { it % 2 == 0 }
println("Even Count: $evenCount")
}
Total Count: 5
Even Count: 2
debounce
过滤掉在给定时间间隔内后面紧跟着的新值,而只发射最新的值。这在处理用户输入或其他频繁变化的数据时非常有用,以避免不必要的处理和减轻系统负担。
fun main() = runBlocking {
//通过固定时间间隔进行 debounce
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce(1000)
.collect { println(it) }
//通过动态的时间间隔进行 debounce
println("-----------------")
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce {
if (it == 1) {
0L
} else {
1000L
}
}
.collect { println(it) }
}
3
4
5
-----------------
1
3
4
5
distinctUntilChanged
过滤掉流中连续重复的元素,确保只有当新元素与前一个元素不相同时才会传递。当然你可以自定义过滤条件
fun main() = runBlocking {
val numbers = arrayOf(1,2,2,3,4,5,5,5,6,7,8).asFlow()
// 示例1: 基本版本
numbers
.distinctUntilChanged()
.onEach { println("Received: $it") }
.collect()
// 示例2: 自定义比较函数版本 仅当两个偶数才过滤
val customComparator: (old: Int, new: Int) -> Boolean = { old, new -> (old % 2 == 0) && (new % 2 == 0) }
numbers
.distinctUntilChanged(areEquivalent = customComparator)
.onEach { println("Received (Custom): $it") }
.collect()
}
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received (Custom): 1
Received (Custom): 2
Received (Custom): 3
Received (Custom): 4
Received (Custom): 5
Received (Custom): 5
Received (Custom): 5
Received (Custom): 6
Received (Custom): 7
Received (Custom): 8
distinctUntilChangedBy
过滤掉流中连续重复的元素与distinctUntilChanged()相比,它可以自定义条件字段。
fun main():Unit = runBlocking {
val data = listOf(
Person(1, "Alice"),
Person(2, "Bob"),
Person(1, "Charlie1"),
Person(1, "Charlie2"),
Person(1, "Charlie3"),
Person(3, "David")
).asFlow()
data.distinctUntilChangedBy {
it.id
}.onEach {
println("Person id:${it.id} str:${it.str}")
}.collect()
}
Person id:1 str:Alice
Person id:2 str:Bob
Person id:1 str:Charlie1
Person id:3 str:David
drop
忽略的元素
fun main():Unit = runBlocking {
//drop函数忽略前面的3个元素
flowOf(0,1,2,3,4,5,6,7,8,9).drop(3).collect{
println("Received element:${it}")
}
}
Received element:3
Received element:4
Received element:5
Received element:6
Received element:7
Received element:8
Received element:9
dropWhile
检查元素,一旦找到第一个不满足条件的元素,它就会返回一个新的Flow,其中包含从该元素开始的所有后续元素。
fun main():Unit = runBlocking {
flowOf(0,1,2,3,4,5,6,7,8,9).dropWhile {
it == 0 || it == 1 || it == 2 || it == 3 || it == 5
}.collect{
println("Received element:${it}")
}
}
Received element:4
Received element:5
Received element:6
Received element:7
Received element:8
Received element:9
emitAll
传递元素。它有两个重载版本
- suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>)
从给定的通道中发射所有元素到流收集器(FlowCollector)中
相当于 channel.consumeEach { value -> emit(value) } - suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
从给定的流中收集所有的值并将它们发射到流收集器(FlowCollector)中
相当于 flow.collect { value -> emit(value) }
fun main():Unit {
// 通过流发射数据
val myFlow = flow {
// 在这里生成流数据
for (i in 1..5) {
emit(i)
delay(1000) // 模拟异步操作
}
}
// 通过通道发射数据
val myChannel = callbackFlow {
// 在这里生成通道数据
for (i in 6..11) {
send(i)
delay(1000) // 模拟异步操作
}
awaitClose { /* 可选的清理操作 */ }
}
// 使用 emitAll 将流中的数据传递给另一个流
val combinedFlow = flow {
emitAll(myFlow)
emitAll(myChannel)
}
//使用协程来观察数据
runBlocking {
combinedFlow.collect { value ->
// 处理从流中接收到的值
println("Received: $value")
}
}
}
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Received: 10
Received: 11
emptyFlow
表示一个没有元素的数据流,这在一些场景中可能很有用,例如在初始化时创建一个空的数据流,或者在某些条件不满足时返回一个空的数据流。
filter
筛选出符合给定条件的元素,返回一个新的Flow
fun main():Unit {
runBlocking {
// 创建一个简单的Flow,包含1到10的数字
val originalFlow = (1..10).asFlow()
// 使用filter操作符,过滤出偶数
val filteredFlow = originalFlow.filter { it % 2 == 0 }
// 收集并打印过滤后的结果
filteredFlow.collect { println(it) }
}
}
2
4
6
8
10
filterIsInstance
过滤 Flow 中的元素,仅保留指定类型的元素
fun main() = runBlocking {
val mixedFlow: Flow<Any> = flowOf(1, "Hello", 3.14, "World", 42)
val stringFlow: Flow<String> = mixedFlow.filterIsInstance()
stringFlow.collect { value ->
println(value)
}
}
Hello
World
filterNot
过滤掉满足给定条件的元素,返回一个新的 Flow,包含原始 Flow 中不符合条件的元素
fun main() = runBlocking {
// 创建一个包含数字的 Flow
val numbersFlow: Flow<Int> = (1..10).asFlow()
// 使用 filterNot 过滤掉偶数
val oddNumbersFlow: Flow<Int> = numbersFlow.filterNot { it % 2 == 0 }
// 收集并打印结果
oddNumbersFlow.collect { println(it) }
}
1
3
5
7
9
filterNotNull
过滤掉 null 值,得到一个新的 Flow
class MyViewModel : ViewModel() {
private val _nullableStrings = MutableStateFlow<String?>(null)
// 提供一个只包含非空字符串的 Flow
val nonNullStrings = _nullableStrings.asStateFlow().filterNotNull()
fun updateString(newString: String?) {
viewModelScope.launch {
_nullableStrings.value = newString
}
}
}
first
返回流中的第一个元素或满足条件的第一个元素,并取消流的收集。如果流为空,则抛出 NoSuchElementException 异常
fun main() = runBlocking {
runBlocking {
// 使用first获取第一个元素
val flow1 = flowOf(1, 2, 3, 4, 5)
val firstElement1 = flow1.first()
println("First element in flow1: $firstElement1")
// 使用first获取匹配谓词的第一个元素
val flow2 = flowOf(1, 2, 3, 4, 5)
val firstElement2 = flow2.first { it > 2 }
println("First element in flow2 greater than 2: $firstElement2")
// 处理流为空的情况
val emptyFlow = emptyFlow<Int>()
try {
val result = emptyFlow.first()
println("Result from emptyFlow: $result")
} catch (e: NoSuchElementException) {
println("Error: ${e.message}")
}
}
}
First element in flow1: 1
First element in flow2 greater than 2: 3
Error: Expected at least one element
firstOrNull
同上,区别为空 返回 null
flatMapConcat
将原始的 Flow 中的元素进行转换,转换的过程中产生的每个元素都是一个 Flow,并将这些新的 Flow 连接在一起
fun main() = runBlocking {
// 模拟一个包含整数的 Flow
val originalFlow: Flow<Int> = (1..3).asFlow()
// 定义一个转换函数,将每个整数转换为一个 Flow 包含两倍和三倍
suspend fun transform(value: Int): Flow<Int> = flow {
emit(value * 2)
emit(value * 3)
}
// 使用 flatMapConcat 进行转换
val resultFlow: Flow<Int> = originalFlow
.flatMapConcat { transform(it) }
.flowOn(Dispatchers.Default) // 切换到后台线程以执行转换操作
// 收集结果并打印
resultFlow.collect { println(it) }
}
2
3
4
6
6
9
flatMapLatest
处理Flow中的元素,并通过指定的 transform 函数将每个元素映射为一个新的Flow。这个新的Flow将会被订阅,而当原始Flow发射新的元素时,之前订阅的流将被取消。这个操作符通常在需要处理最新元素的情况下非常有用。
fun main() = runBlocking {
val originalFlow = flow {
emit("a")
delay(100)
emit("b")
}
val transformedFlow = originalFlow.flatMapLatest { value ->
flow {
emit(value)
delay(200)//模拟复杂处理
emit(value + "_last")
}
}
transformedFlow.collect { println(it) }
}
a
b
b_last
flatMapMerge
将每个元素转换为另一个 Flow,并将这些新的 Flow 合并和展平。这个操作符可以限制并发地收集流的数量,以提高性能
fun main() = runBlocking {
// 模拟包含 URL 的流
val urlFlow: Flow<String> = flow {
emit("https://example.com/image1.jpg")
emit("https://example.com/image2.jpg")
emit("https://example.com/image3.jpg")
}
// 使用 flatMapMerge 并发下载图像
urlFlow.flatMapMerge { url ->
flow {
// 模拟下载操作,这里使用 delay 模拟下载耗时
delay(1000)
emit("${System.currentTimeMillis()} Downloaded image from $url")
}
}.collect { result ->
println(result)
}
}
1705657442283 Downloaded image from https://example.com/image1.jpg
1705657442290 Downloaded image from https://example.com/image2.jpg
1705657442290 Downloaded image from https://example.com/image3.jpg
flattenConcat
将一个包含其他 Flow 的 Flow 打平(flatten),使得所有内部 Flow 的元素按顺序依次发射,而不会交叉发射。这意味着它会按照顺序收集和发射内部 Flow,而不会在它们之间进行交错。
fun main() = runBlocking {
val flowOfFlows: Flow<Flow<Int>> = flow {
emit(flow {
emit(1)
delay(100)
emit(2)
})
emit(flow {
delay(50)
emit(3)
})
emit(flow {
delay(200)
emit(4)
})
}
val flattenedFlow: Flow<Int> = flowOfFlows.flattenConcat()
flattenedFlow.collect { result ->
println("${System.currentTimeMillis()} --- $result")
}
}
1705717218466 --- 1
1705717218604 --- 2
1705717218665 --- 3
1705717218867 --- 4
flattenMerge
将包含 Flow 的 Flow 扁平化为一个单一的 Flow,并可以限制并发收集的内部 Flow 的数量。这对于在处理并发数据流时非常有用。
fun main() = runBlocking {
// 创建一个包含 Flow 的 Flow
val outerFlow: Flow<Flow<Int>> = flow {
emit(flowOf(1, 2, 3).onEach { delay(100) })
emit(flowOf(4, 5, 6).onEach { delay(100) })
emit(flowOf(7, 8, 9).onEach { delay(100) })
}
// 使用 flattenMerge 将其扁平化
val flattenedFlow: Flow<Int> = outerFlow.flattenMerge()
// 收集并打印结果
flattenedFlow.collect { value ->
println(" ${System.currentTimeMillis()} value $value")
}
}
1705719006998 value 1
1705719006999 value 4
1705719006999 value 7
1705719007099 value 2
1705719007099 value 5
1705719007099 value 8
1705719007210 value 3
1705719007210 value 6
1705719007210 value 9
flow
支持异步数据流的处理。它允许您以声明性的方式定义、转换和收集数据流
fun main():Unit = runBlocking {
// 收集并打印结果
fibonacci().take(6).collect { value ->
println(" ${System.currentTimeMillis()} value $value")
}
//emit错误示范
flow {
emit(1) // Ok
withContext(Dispatchers.IO) {
emit(2) // emit应该严格在块的调度器中发生,以便保留流上下文。这会报错 IllegalStateException
}
}.collect{
println(" ${System.currentTimeMillis()} value $it")
}
}
1705722094088 value 0
1705722094088 value 1
1705722094089 value 1
1705722094089 value 2
1705722094089 value 3
1705722094091 value 5
1705722094100 value 1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@3dca89ed, BlockingEventLoop@6b5931f4],
but emission happened in [DispatchedCoroutine{Active}@2aa604a1, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:85)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:106)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:83)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)
at com.yang.myapplication.MainActivityKt$main$1$2$1.invokeSuspend(MainActivity.kt:107)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Process finished with exit code 1
flowOf
创建一个 Flow
val flow: Flow<Int> = flowOf(1, 2, 3)
val flow: Flow<Int> = flowOf(1)
flowOn
改变Flow的执行上下文。这对于在 Android 开发中处理异步任务非常有用,尤其是在 UI 线程和后台线程之间切换的情况下。
fun main():Unit = runBlocking {
fetchData()
.flowOn(Dispatchers.IO) // 切换到 IO 线程执行数据库查询
.collect { data -> // 切回主线程更新 UI
updateUI(data)
}
}
private fun fetchData(): Flow<String> = flow {
// 模拟后台线程的数据库查询
val data = fetchDataFromDatabase()
emit(data)
}
private fun fetchDataFromDatabase(): String {
// 模拟数据库查询
return "Data from database"
}
private fun updateUI(data: String) {
// 在主线程更新 UI
println(data)
}
Data from database
fold
累积Flow的元素。可以参考reduce没有初始累加器的值
fun main():Unit = runBlocking {
val flow = flowOf("一","二", "三", "四", "五")
val result = flow.fold("累加器的初始值") { acc, value ->//当前的累加器值 新的流元素
// 模拟一些异步操作
delay(1000)
println("Accumulating: $acc + $value")
acc + value//返回一个新的累加器值
}
// 打印最终结果
println("Final result: $result")
}
Accumulating: 累加器的初始值 + 一
Accumulating: 累加器的初始值一 + 二
Accumulating: 累加器的初始值一二 + 三
Accumulating: 累加器的初始值一二三 + 四
Accumulating: 累加器的初始值一二三四 + 五
Final result: 累加器的初始值一二三四五
getAndUpdate
MutableStateFlow 扩展函数之一,用于原子性地更新其值并返回先前的值。
fun main():Unit = runBlocking {
val numberState = MutableStateFlow(0)
launch {
numberState.collect { newValue ->
// 在这里处理新的值
println("New value: $newValue")
}
}
// 使用 getAndUpdate 更新 MutableStateFlow 的值
val previousValue = numberState.getAndUpdate { current ->
// 在这里执行逻辑,返回新的值
current + 1
}
// 打印先前的值
println("Previous value: $previousValue")
}
Previous value: 0
New value: 1
last
等待直到Flow中的最后一个可用元素,然后将其返回。如果Flow为空,它会抛出 NoSuchElementException 异常
fun main():Unit = runBlocking {
val flow = flow {
for (i in 1..5) {
delay(1000) // 模拟异步操作
emit(i)
}
}
try {
val lastElement = flow.last()
println("Last element: $lastElement")
} catch (e: NoSuchElementException) {
println("Flow was empty.")
}
}
Last element: 5
lastOrNull
同上,不一样的是如果Flow为空它会返回null
launchIn
用于在给定的协程作用域中启动对Flow的收集。它是 scope.launch { flow.collect() } 的简写。通常,launchIn 与 onEach、onCompletion 和 catch 等操作符一起使用,用于处理Flow中的所有发射值,以及处理上游Flow或处理过程中可能发生的异常。
private val uiScope = CoroutineScope(Dispatchers.Default)
fun main():Unit = runBlocking {
val flow = getSampleFlow()
flow.onEach { value -> updateUi(value) }
.onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
.catch { cause -> handleException(cause) }
.launchIn(uiScope)
delay(5000)
uiScope.cancel()
delay(10000)
}
private fun getSampleFlow(): Flow<String> = flow {
emit("A")
delay(1000)
emit("B")
delay(1000)
emit("C")
throw RuntimeException("Sample exception")
}
private fun updateUi(value: String) {
println("Collected value: $value")
}
private fun handleException(cause: Throwable) {
println("Exception: $cause")
}
Collected value: A
Collected value: B
Collected value: C
Collected value: Failed
Exception: java.lang.RuntimeException: Sample exception
map
对 Flow 中的每个元素进行转换
fun main():Unit = runBlocking {
val originalFlow: Flow<Int> = flow {
for (i in 1..5) {
emit(i)
delay(1000) // 模拟异步操作
}
}
// 使用 map 进行转换
val transformedFlow: Flow<String> = originalFlow.map { value ->
"Transformed: $value"
}
transformedFlow.collect { transformedValue ->
println(transformedValue)
}
}
mapLatest
转换原始流中的元素。该函数的特点是在原始流发出新值时,会取消先前值的转换操作。
fun main():Unit = runBlocking {
val originalFlow: Flow<String> = flow {
emit("a")
delay(100)
emit("b")
}
val transformedFlow: Flow<String> = originalFlow.mapLatest { value ->
println("Started computing $value")
delay(200)
"Computed $value"
}
transformedFlow.collect { value ->
println(value)
}
}
merge
将多个 Flow 合并成一个新的 Flow。合并的 Flow 不保留元素的顺序,而是并发地将元素收集到新的 Flow 中,没有同时收集 Flow 的数量限制
fun main():Unit = runBlocking {
val flow1 = flowOf(1, 2, 3).onEach { delay(150) }
val flow2 = flowOf(4, 5, 6).onEach { delay(100) }
// 使用 Iterable<Flow<T>>.merge()
val mergedFlow1 = listOf(flow1, flow2).merge()
mergedFlow1.collect { println(it) }
println("---->----->---->")
// 使用 vararg merge
val mergedFlow2 = merge(flow1, flow2)
mergedFlow2.collect { println(it) }
}
MutableSharedFlow
SharedFlow的可变版本,参考后面的SharedFlow
MutableStateFlow
继承自 StateFlow 和 MutableSharedFlow。MutableStateFlow 通过提供一个可变的 value 属性来允许修改其当前状态,参考后面的StateFlow
onCompletion
用于在Flow完成或被取消后执行指定的操作。
fun main():Unit = runBlocking {
val myFlow: Flow<Int> = flow {
for (i in 1..3) {
// 发射元素
emit(i)
}
}
myFlow.onCompletion { cause ->
// cause 参数是一个 Throwable,表示取消或失败的原因
if (cause == null) {
println("Flow completed successfully")
} else {
println("Flow completed with an exception: $cause")
}
}
.collect { value ->
// 处理 Flow 发射的元素
println("Received: $value")
}
}
onEach
用于在流的每个元素发射到下游之前执行指定的操作。这个函数通常用于执行一些副作用,而不改变流的元素。
fun main():Unit = runBlocking {
// 创建一个简单的流
val simpleFlow: Flow<Int> = flow {
for (i in 1..5) {
emit(i)
}
}
// 使用 onEach 打印每个元素,并在每个元素发射之前执行一些操作
val modifiedFlow = simpleFlow.onEach { value ->
// 在每个元素发射之前执行的操作
println("Before emission: $value")
}
// 收集修改后的流
modifiedFlow.collect { value ->
// 打印每个元素
println("Collected: $value")
}
}
Before emission: 1
Collected: 1
Before emission: 2
Collected: 2
Before emission: 3
Collected: 3
Before emission: 4
Collected: 4
Before emission: 5
Collected: 5
onEmpty
Flow 完成但未发射任何元素时执行指定的操作
fun main():Unit = runBlocking {
//emptyFlow<Int>() 创建了一个不发射任何元素的空 Flow 执行onEmpty里方法
emptyFlow<Int>().onEmpty {
emit(1)
emit(2)
}.collect { println(it) }
}
Before emission: 1
Collected: 1
Before emission: 2
Collected: 2
Before emission: 3
Collected: 3
Before emission: 4
Collected: 4
Before emission: 5
Collected: 5
onStart
在开始收集流之前执行一个操作。这个操作可以是挂起的,因此您可以在协程中执行异步的初始化工作。
fun main():Unit = runBlocking {
flowOf("a", "b", "c")
.onStart {
emit("Begin")
println("Initialization in progress...")
// 可以进行一些异步的初始化工作
delay(1000)
println("Initialization complete.")
}
.collect { println(it) }
}
Begin
Initialization in progress...
Initialization complete.
a
b
c
onSubscription
在共享流(SharedFlow 后续有提到)开始被收集(订阅注册后)时调用指定的操作。
class SharedFlowExample {
private val sharedFlow: MutableSharedFlow<String> = MutableSharedFlow()
fun startFlow() {
// 启动协程来收集共享流
runBlocking {
launch {
sharedFlow.onSubscription {
// 在订阅开始时执行的操作
println("Subscription started")
// 可以在此处发射额外的元素
emit("Extra element 1")
emit("Extra element 2")
}.collect { value ->
// 收集流中的元素
println("Collected: $value")
}
}
}
}
suspend fun emitValue(value: String) {
// 发射新的值到共享流中
sharedFlow.emit(value)
}
}
fun main():Unit = runBlocking {
val example = SharedFlowExample()
launch {
// 启动共享流的订阅
example.startFlow()
}
launch {
delay(1000)
// 发射新的值到共享流中
example.emitValue("Value 1")
example.emitValue("Value 2")
}
}
Subscription started
Collected: Extra element 1
Collected: Extra element 2
Collected: Value 1
Collected: Value 2
produceIn
在协程中创建生产者协程(produce coroutine),它会启动一个新的协程来收集给定的流,并返回一个 ReceiveChannel,通过该通道可以接收来自流的元素。
private val scope = CoroutineScope(Dispatchers.Default)
fun main():Unit = runBlocking {
// 创建一个简单的流
val simpleFlow = flow {
for (i in 1..5) {
delay(1000) // 模拟异步操作
emit(i)
}
}
// 在生命周期内启动协程来收集流
val channel = simpleFlow.produceIn(scope)
// 启动一个协程来消费通道中的数据
scope.launch {
for (value in channel) {
// 处理接收到的数据
println("Received: $value")
}
}
delay(6000)
}
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
receiveAsFlow
将一个 ReceiveChannel 转换为热流(hot flow)。热流表示每当该流被收集时,都会从通道中以(fan-out)的方式接收元素,即每个收集器(collector)都会收到一个元素。
private val channel = Channel<Int>()
fun main():Unit = runBlocking {
// 使用 receiveAsFlow 将通道转换为热流
val flow = channel.receiveAsFlow()
launch {
// 在流上设置收集器,每当流被收集时,输出元素
flow.collect { value ->
println("Received: $value")
}
}
for (i in 1..5) {
delay(1000) // 模拟一些工作
channel.send(i) // 将数据发送到通道
}
// 关闭通道(通知流收集器)
channel.close()
}
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
reduce
从 Flow 的元素中累积一个值与fold相比没有初始累加器的值。
fun main() {
runBlocking {
val sum: Int = createFlow()
.reduce { accumulator, value ->
accumulator + value //返回
}
println("Sum of elements: $sum")
}
}
fun createFlow(): Flow<Int> = flow {
// Emit some integers
emit(1)
emit(2)
emit(3)
emit(4)
}
Sum of elements: 10
retry
上游 Flow 发生特定异常时进行重试。可填重试次数,默认值为 Long.MAX_VALUE,即无限次重试,谨慎使用
fun main() {
runBlocking {
// 模拟一个可能会失败的流
val flowWithError: Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("Simulated error") // 第三次发射时引发异常
emit(3) // 不会执行到这里
}
// 使用 retry 进行重试
flowWithError.retry(3) { e ->
// 重试条件:发生异常且异常类型为 RuntimeException
(e is RuntimeException).also { if (it) delay(1000) }
}.collect { value ->
// 在重试之后,成功从流中收集到的值
println("Collected: $value")
}
}
}
Collected: 1
Collected: 2
Collected: 1
Collected: 2
Collected: 1
Collected: 2
Collected: 1
Collected: 2
Exception in thread "main" java.lang.RuntimeException: Simulated error
retryWhen
上游流中发生异常时进行重试,通过返回Boolean确定是否重试。
fun main() {
runBlocking {
val retryLimit = 3
// 使用 retryWhen 进行重试,如果是 IOException 则一直重试,否则最多重试 retryLimit 次
val resultFlow = flow {
emit(1)
emit(2)
throw IOException("Simulated IO Exception")
emit(3) // This won't be reached due to the exception
}.retryWhen { cause, attempt ->
if (cause is IOException && attempt < retryLimit) {
println("Retrying... (Attempt $attempt)")
delay(1000) // 延迟 1 秒后重试
true // 返回 true 表示继续重试
} else {
println("Retry limit reached. Not retrying.")
false // 返回 false 表示不再重试
}
}
// 收集并打印结果
resultFlow.collect { value ->
println("Received value: $value")
}
}
}
Received value: 1
Received value: 2
Retrying... (Attempt 0)
Received value: 1
Received value: 2
Retrying... (Attempt 1)
Received value: 1
Received value: 2
Retrying... (Attempt 2)
Received value: 1
Received value: 2
Retry limit reached. Not retrying.
Exception in thread "main" java.io.IOException: Simulated IO Exception
runningFold
在累积的过程中,每次都发射中间结果,包括初始值。参考flod:但是fold 最终只发射最终结果,而不是在每个中间步骤都发射结果
fun main() = runBlocking {
flowOf(1, 2, 3)
.runningFold(emptyList<Int>()) { acc, value -> acc + value }
.collect{
println(it)
}
}
[]
[1]
[1, 2]
[1, 2, 3]
runningReduce
同上 参考Reduce 同样runningReduce每个中间步骤都发射结果
fun main() = runBlocking {
val result = flowOf(1, 2, 3, 4)
.runningReduce { acc, value -> acc + value }
.toList()
println(result) // 输出
}
[1, 3, 6, 10]
sample
在给定的采样周期内,仅发射原始流中最新的值。
private val dataFlow = MutableStateFlow(0)
fun main() = runBlocking {
launch {
// 模拟从网络或数据库获取数据
repeat(10) {
delay(110)
dataFlow.value = it
}
}
dataFlow.sample(200).collect {
// 在这里处理采样后的数据
println(it)
}
}
0
2
4
5
7
9
scan
功能和runningFold一样
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3, 4, 5)
val sumFlow = numbersFlow
.scan(0) { acc, value -> acc + value }
// 当累积完成后,可以得到最终的结果
println("Final sum: ${sumFlow.toList()}")
}
Final sum: [0, 1, 3, 6, 10, 15]
SharedFlow
一种热数据流,可以在所有收集器之间以广播方式共享发射的值。与普通的 Flow 不同,SharedFlow 的活动实例独立于收集器的存在,因此它是“热”的。
data class Event(val message: String)
class EventBus {
private val _events = MutableSharedFlow<Event>(1) // 私有的可变 SharedFlow
val events: SharedFlow<Event> = _events.asSharedFlow() // 公开的只读 SharedFlow
suspend fun produceEvent(event: Event) {
_events.emit(event) // 挂起直到所有订阅者都收到事件
}
}
fun main() = runBlocking {
val eventBus = EventBus()
// 启动第一个订阅者
launch {
eventBus.events.collect { event ->
println("Subscriber 1 received: ${event.message}")
}
}
// 启动第二个订阅者
launch {
eventBus.events.collect { event ->
println("Subscriber 2 received: ${event.message}")
}
}
//延迟一下确保被订阅
//因为MutableSharedFlow<Event>(1)-设置了向新订阅者重播的值数(不能为负数,默认为零)为1 所以不用延迟。
// delay(1000)
// 生产事件
eventBus.produceEvent(Event("Hello from EventBus!"))
}
Subscriber 1 received: Hello from EventBus!
Subscriber 2 received: Hello from EventBus!
shareIn
将冷流(flow)转换为上述热共享流(sharedflow)的操作符
fun main() = runBlocking {
val backendMessages: Flow<String> = flow {
// 模拟从后端获取数据
repeat(5) {
emit("Message $it")
delay(1000)
}
}
// 使用 shareIn 将冷流转换为热共享流
val messages: Flow<String> = backendMessages
.shareIn(this, SharingStarted.Eagerly, 1) // 使用 Eager 策略
}
StateFlow
一个状态容器可观察数据流,可向其收集器发出当前状态更新和新状态更新。
class CounterModel {
private val _counter = MutableStateFlow(0) // 私有可变状态流
val counter: StateFlow<Int> get() = _counter // 公开为只读状态流
fun increment() {
_counter.value += 1 // 增加计数器的值
}
}
fun main():Unit = runBlocking {
val counterModel = CounterModel()
launch {
counterModel.counter.collect { value ->
// 在这里处理状态变化
println("Counter value changed: $value")
}
}
repeat(5){
delay(500)
counterModel.increment()
}
}
Counter value changed: 0
Counter value changed: 1
Counter value changed: 2
Counter value changed: 3
Counter value changed: 4
Counter value changed: 5
stateIn
将冷流(Flow)转换为上述状态流(StateFlow)
take
创建一个新的 Flow,其中包含原始 Flow 中的前 count 个元素。当消耗了 count 个元素后,原始的 Flow 就会被取消。如果 count 不是正数,则会抛出 IllegalArgumentException
fun main():Unit = runBlocking {
val numberFlow = flow {
for (i in 1..10) {
delay(100) // 模拟异步操作
emit(i)
}
}
// 从流中获取前 5 个元素
val result = numberFlow.take(5)
launch {
// 收集并打印结果
result.collect { value ->
println("result $value")
}
}
}
result 1
result 2
result 3
result 4
result 5
takeWhile
返回一个包含满足给定断言的前几个元素的新 Flow 对象。请注意,生成的 Flow 不包含使断言返回 false 的那个元素
fun main():Unit = runBlocking {
val numbersFlow: Flow<Int> = flow {
for (i in intArrayOf(1,2,3,4,5,6,1,2,3,4)) {
delay(1000) // Simulating some asynchronous operation
emit(i)
}
}
// 使用 takeWhile 筛选满足条件的前几个元素
val filteredFlow: Flow<Int> = numbersFlow.takeWhile { it < 5 }
// 收集并打印结果
filteredFlow.collect { println(it) }
}
1
2
3
4
toCollection
将 Flow 中的元素收集到一个目标可变集合(MutableCollection)中。
fun main() = runBlocking {
// 创建一个 Flow,模拟一些异步操作
val myFlow: Flow<Int> = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
// 创建一个可变列表作为目标集合
val resultList = mutableListOf<Int>()
// 使用 toCollection 将 Flow 中的元素收集到列表中
myFlow.toCollection(resultList)
// 打印结果
println(resultList)
}
[1, 2, 3]
toList
将 Flow 中的元素收集到一个 List
fun main() = runBlocking {
// 创建一个 Flow,模拟一些异步操作
val myFlow: Flow<Int> = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
// 创建一个可变列表作为目标集合
val resultList1 = mutableListOf<Int>()
// 使用 toList 将 Flow 中的元素收集到列表中
myFlow.toList(resultList1)
val resultList2 = myFlow.toList()
// 打印结果
println(resultList1)
println(resultList2)
}
[1, 2, 3]
[1, 2, 3]
toSet
将 Flow 中的元素收集到一个 Set 用法同上
transform
对 Flow 中的每个元素进行灵活的转换、过滤或多次发射。
fun main() = runBlocking {
// 创建一个简单的 Flow,包含 1 到 5 的整数
val sourceFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)
// 使用 transform 操作符进行转换,只发射偶数值,并发射两次
val transformedFlow: Flow<Int> = sourceFlow.transform { value ->
if (value % 2 == 0) {
// 发射偶数值两次
emit(value)
emit(value)
}
// 奇数值将被跳过
}
// 收集并打印转换后的 Flow 中的元素
transformedFlow.collect { value ->
println(value)
}
}
2
2
4
4
transformLatest
transform 函数不同的是,transformLatest 在原始流发射新值时会取消之前的 transform 块
fun main() = runBlocking {
// 创建一个简单的 Flow,包含 1 到 5 的整数
val sourceFlow: Flow<Int> = flow {
repeat(5) {
delay(300)
emit(it)
}
}
val transformedFlow: Flow<String> = sourceFlow.transformLatest { value ->
// 模拟操作
emit("value_$value")
delay(500)
emit("Action result for: $value")
}
// 收集并打印转换后的 Flow 中的元素
transformedFlow.collect { value ->
println(value)
}
}
value_0
value_1
value_2
value_3
value_4
Action result for: 4
transformWhile
在给定流的每个值上应用一个转换函数,只要该函数返回true,就会一直继续应用。这个函数的接收者是FlowCollector<R>,因此它是一个灵活的函数,可以在发射元素时进行转换、跳过或多次发射。
data class DownloadProgress(val percentage: Int, val isDone: Boolean)
fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =
transformWhile { progress ->
emit(progress) // 总是发射进度
!progress.isDone // 只要下载未完成,就继续
}
suspend fun simulateDownload(): Flow<DownloadProgress> = flow {
for (percentage in 0..100 step 10) {
emit(DownloadProgress(percentage, false))
delay(500) // 模拟下载延迟
}
emit(DownloadProgress(100, true)) // 下载完成
}
fun main() = runBlocking {
val downloadFlow = simulateDownload()
downloadFlow.completeWhenDone().collect { progress ->
// 处理下载进度
println("Download Progress: ${progress.percentage}%")
}
}
Download Progress: 0%
Download Progress: 10%
Download Progress: 20%
Download Progress: 30%
Download Progress: 40%
Download Progress: 50%
Download Progress: 60%
Download Progress: 70%
Download Progress: 80%
Download Progress: 90%
Download Progress: 100%
Download Progress: 100%
update
MutableStateFlow 类的一个扩展函数,用于原子地使用指定的函数更新MutableStateFlow.value。在多线程或并发环境下,确保更新是原子性的。
fun main():Unit = runBlocking {
val userFlow = MutableStateFlow(User("John", 25))
launch {
// 收集并打印更新后的值
userFlow.collect { updatedUser ->
println(updatedUser)
}
}
delay(100)
// 使用 update 函数更新 MutableStateFlow
userFlow.update { currentUser ->
User(currentUser.name, currentUser.age + 1)
}
}
User(name=John, age=25)
User(name=John, age=26)
updateAndGet
原子地更新 MutableStateFlow 的值,并返回新的值。它通常用于在多线程或并发环境中,确保对状态的更新是原子的。跟updateAndGet 相比它返回新的值,更适用于 Kotlin 协程库中的 MutableStateFlow 类型,用于管理可变的状态。
fun main():Unit = runBlocking {
val mutableStateFlow = MutableStateFlow(0)
// 更新并获取新的值
val newValue = mutableStateFlow.updateAndGet { currentValue ->
currentValue + 1
}
println("New value: $newValue")
println("Current value: ${mutableStateFlow.value}")
}
New value: 1
Current value: 1
withIndex
将每个元素包装成 IndexedValue 对象,该对象包含值和它的索引(从零开始)。这对于需要知道元素在流中的位置的场景非常有用
fun main() = runBlocking {
// 创建一个包含整数的 Flow
val flow: Flow<Int> = (1..5).asFlow()
// 使用 withIndex 处理 Flow 中的元素及其索引
val indexedFlow: Flow<IndexedValue<Int>> = flow.withIndex()
// 收集并打印每个元素及其索引
indexedFlow.collect { indexedValue ->
println("Index: ${indexedValue.index}, Value: ${indexedValue.value}")
}
}
Index: 0, Value: 1
Index: 1, Value: 2
Index: 2, Value: 3
Index: 3, Value: 4
Index: 4, Value: 5
zip
将两个 Flow 进行合并,然后使用提供的 transform 函数应用于每一对值。合并后的 Flow 将在两个输入的任何一个完成时完成,并且在剩余的 Flow 上调用 cancel。
fun main() = runBlocking {
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.zip(flow2) { i, s -> i.toString() + s }.collect {
println(it) // 将打印 "1a 2b 3c"
}
}
1a
2b
3c
原文地址:https://blog.csdn.net/qq_50675668/article/details/135617304
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!