自学内容网 自学内容网

RxJS基本介绍以及与Promise的区别

基本概念

RxJS (Reactive Extensions for JavaScript) 是一个用于处理异步事件的库,它提供了一种基于观察者模式的方式来组合异步操作。RxJS 通过“可观察对象(Observables)”来表示异步数据流,支持操作符(如 map、filter、merge 等)来转换和操作这些数据流。

核心API

Observable

Observable 是 RxJS 的核心,表示一个可以异步发出多个值的数据流。你可以通过 new Observable() 或者 RxJS 提供的创建函数来创建它。

import { Observable } from 'rxjs';

//创建 Observable 通过构造函数创建 Observable
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
});

//订阅 Observable
observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Completed')
});

Observer

Observer 是用于接收 Observable 发送的通知的对象,通常包含三个回调方法:

next(value) — 接收值
error(err) — 接收错误
complete() — 表示完成

const observer = {
  next: (value) => console.log(value),
  error: (err) => console.error(err),
  complete: () => console.log('Completed')
};
observable.subscribe(observer);

Operators

操作符用于变换、过滤、组合和执行各种操作,最常用的操作符有:

map: 转换每个元素
filter: 过滤元素
mergeMap/concatMap: 扁平化嵌套的 Observables
switchMap: 取消前一个 Observable,转换到新的 Observable
take: 限制订阅次数
debounceTime: 防抖操作

import { map, filter, debounceTime, switchMap } from 'rxjs/operators';

observable.pipe(
  map(value => value * 2),
  filter(value => value > 2),
  debounceTime(300)
).subscribe(value => console.log(value));

Subject

Subject 是一个特殊类型的 Observable,它既是一个 Observer 也可以被订阅。它可以用来广播同一个值给多个订阅者。

import { Subject } from 'rxjs';

const subject = new Subject();

// 订阅 Subject
subject.subscribe(value => console.log('Observer 1:', value));
subject.subscribe(value => console.log('Observer 2:', value));

// 发送数据
subject.next(1);
subject.next(2);

BehaviorSubject

BehaviorSubject 是 Subject 的一个变种,它总是保留最新的值,并将该值提供给所有新的订阅者。

import { BehaviorSubject } from 'rxjs';

const behaviorSubject = new BehaviorSubject(0);

// 订阅时会收到当前值
behaviorSubject.subscribe(value => console.log('Observer 1:', value));

// 发出新值
behaviorSubject.next(1);

Subscription

Subscription 对象用于管理 Observable 的订阅,允许你取消订阅。

import { interval } from 'rxjs';

const subscription = interval(1000).subscribe(value => console.log(value));

// 取消订阅
setTimeout(() => subscription.unsubscribe(), 5000);

与Promise的区别

RxJS 和 Promise 都用于处理异步操作,但它们有很多本质的区别,适用于不同的场景。以下是它们的主要差异:

数据流的类型

Promise:一次性处理单个异步操作,代表一个未来的值或错误。Promise 只能 resolve 或 reject 一次。
RxJS (Observable):处理多个异步事件或值。Observable 是一个可观察的流,可以发出多个值、错误或完成通知。
示例:

// Promise 只能返回一个结果
const promise = new Promise((resolve, reject) => {
  resolve('Hello');
});

promise.then((result) => console.log(result));  // 输出: Hello

// Observable 可以发出多个值
import { Observable } from 'rxjs';

const observable = new Observable((observer) => {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});
observable.subscribe((value) => console.log(value));  // 输出: Hello, World

惰性与即时执行

Promise:一旦创建,Promise 会立即执行,不管有没有订阅它。它会立刻开始处理异步操作。
RxJS (Observable):Observable 是惰性求值的,只有在被订阅时才会执行。如果没有订阅,Observable 就不会执行任何操作。
示例:

// Promise 立即执行
const promise = new Promise((resolve) => {
  console.log('Promise executed');
  resolve('Done');
});

// Observable 只有在订阅时才执行
import { Observable } from 'rxjs';

const observable = new Observable((observer) => {
  console.log('Observable executed');
  observer.next('Done');
});

observable.subscribe();

处理错误的方式

Promise:Promise 只会处理单个错误,错误发生后会进入 catch 或 then 的 reject 回调中。
RxJS (Observable):Observable 可以发出多个错误事件,一个流可以通过多个 catchError 来进行处理。此外,Observable 也支持重试等复杂的错误处理逻辑。
示例:

// Promise 错误处理
const promise = new Promise((resolve, reject) => {
  reject('Error');
});

promise.catch((error) => console.error(error));  // 输出: Error

// Observable 错误处理
import { Observable } from 'rxjs';

const observable = new Observable((observer) => {
  observer.next(1);
  observer.error('Error');
});

observable.subscribe(
  (value) => console.log(value),
  (error) => console.error(error)  // 输出: Error
);

取消操作

Promise:Promise 一旦发起,无法取消执行。即使你不再需要结果,Promise 会继续执行。
RxJS (Observable):Observable 提供了取消操作的机制。通过 unsubscribe 可以取消对流的订阅,从而停止接收值或错误。

// Promise 无法取消
const promise = new Promise((resolve, reject) => {
  setTimeout(() => resolve('Done'), 5000);
});
promise.then((result) => console.log(result));  // 5秒后输出 'Done'

// Observable 可以取消订阅
import { Observable } from 'rxjs';

const observable = new Observable((observer) => {
  const timer = setInterval(() => observer.next('Tick'), 1000);
  return () => clearInterval(timer);  // 清理操作
});

const subscription = observable.subscribe((value) => console.log(value));

setTimeout(() => subscription.unsubscribe(), 3000);  // 3秒后取消订阅,停止打印
  1. 组合和管道
    Promise:Promise 可以通过 .then() 和 .catch() 链式调用,但它的组合能力相对较弱。
    RxJS (Observable):Observable 提供了强大的 pipe() 方法和丰富的操作符(如 map、filter、merge 等),允许对数据流进行复杂的组合和变换。
    示例:
// Promise 链式调用
const promise = new Promise((resolve) => resolve(1));
promise
  .then((value) => value * 2)
  .then((value) => value + 3)
  .then((value) => console.log(value));  // 输出: 5

// Observable 管道操作
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4).pipe(
  filter((value) => value % 2 === 0),
  map((value) => value * 2)
).subscribe((value) => console.log(value));  // 输出: 4, 8

多任务处理

Promise:Promise 适用于单一的异步任务,它的主要特点是链式处理单个异步操作。
RxJS (Observable):Observable 适用于复杂的异步流,它支持多个并发任务、流的合并、组合等操作,适合管理复杂的事件和数据流。
示例:

// Promise 用于单一任务
const promise1 = new Promise((resolve) => resolve('Task 1'));
const promise2 = new Promise((resolve) => resolve('Task 2'));

Promise.all([promise1, promise2]).then((results) => {
  console.log(results);  // 输出: ['Task 1', 'Task 2']
});

// Observable 用于多个流的并发处理
import { of, combineLatest } from 'rxjs';

const observable1 = of('Task 1');
const observable2 = of('Task 2');

combineLatest([observable1, observable2]).subscribe((results) => {
  console.log(results);  // 输出: ['Task 1', 'Task 2']
});

总结

Promise 适合处理单一的异步操作,具有更简单的 API,但缺乏灵活性和对多值的支持。
RxJS (Observable) 适合处理复杂的异步流和多事件流,具有更多的操作符、错误处理机制和强大的组合能力,适用于事件驱动、流式数据处理和复杂的异步任务。


原文地址:https://blog.csdn.net/m0_54944506/article/details/143512322

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!