RxJS

一个通过使用可观察序列来编写异步和基于事件的程序的库

Posted by page on July 14, 2023

RxJS

官方文档 中文文档

Think of RxJS as Lodash for events.

Rxjs想象成针对events的lodash,即Rxjs本质是个工具库,处理对象是事件(又称流)

流:异步行为产生的数据,即将来的数据的集合;

RxJs支持对流进行处理,如合并、截断、延迟、抖动等

基础对象

observable

可观察对象(Observable流)

import { Observable } from 'rxjs';

// 流
const observable = new Observable((subscriber) => {
  document.addEventListener('click', function(){
    subscriber.next(event)
  })
});

new Observalbe(fn) 定义一个流,即将来数据的集合,subscriber.next 输出流数据

subscription

观察者(Observer)

即定义如何处理流产生的数据(消费),observable.subscribe 启动一个流

只有启动了的流

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  setTimeout(() => {
   subscriber.next(2);
   subscriber.complete();
  }, 1000);
});

// 启动流
const subscription = observable.subscribe({
  // 流的处理逻辑
  next(val) {
    console.log('got value ' + val);
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
    console.log('done');
  },
});

注: 支持多次启动Observable,即开启等多个新流,他们互相独立互不干扰

关闭流

通过 subscription.unsubscribe 关闭/停止流

subscription.unsubscribe();  // 停止一个启动后的流

理解

  单身的 多种的
Function Interator
Promise Observabale

拉: 主动访问确定数据消费

推: 被动接受将来数据消费

subject

import { Subject } from "rxjs";

// 创建subject
const subject = new Subject();

// 订阅一个observer
subject.subscribe(v => console.log("stream 1", v));
// 再订阅一个observer
subject.subscribe(v => console.log("stream 2", v));
// 延时1s再订阅一个observer
setTimeout(() => {
  subject.subscribe(v => console.log("stream 3", v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
  subject.next(3);
}, 3000);
// output
// stream 1 1 //立即输出
// stream 2 1 //立即输出
// stream 1 2 //立即输出
// stream 2 2 //立即输出
// stream 1 3 //3s后输出
// stream 2 3 //3s后输出
// stream 3 3 //3s后输出

Subject 的基础使用与发布订阅模式极其相似

操作符

允许以声明方式轻松组合复杂异步代码

管道运算符

不更改现有的 Observable 实例,且它们返回一个新的 Observable;

本质是纯函数处理,其订阅逻辑基于第一个 Observable;

observable.pipe(operator)

observable.pipe(operatorFactory()): filter(...) 和 mergeMap(...)

创建运算符

作为独立函数调用来创建新的 Observable

of

立即创建一个 observable,它会依次发出指定的值,然后完成

of(10, 20, 30)
  .subscribe({
    next: value => console.log('next:', value),
    error: err => console.log('error:', err),
    complete: () => console.log('the end'),
  });

from

将数组、类数组对象、Promise、迭代器、甚至是函数转换为 observable

// const array = [1, 2, 3, 4, 5];
// const observable = from(array);

const promise = new Promise(resolve => {
  setTimeout(() => {
    resolve('Data from Promise');
  }, 1000);
});

const observable = from(promise);

observable.subscribe(
  value => console.log(value),
  error => console.error(error),
  () => console.log('Observable completed')
);

pipe

将多个操作符串联在一起(操作符链),以便在 observable 上进行一系列的操作

observable.pipe(
  filter(value => value % 2 === 0),
  map(value => value * 2)
).subscribe(
  result => console.log(result)
);

map/filter

mergeMap

将源值投射到一个 Observable 中,并对其合并(flat)后输出为 Observable

const observable = of(1, 2, 3);

observable.pipe(
  mergeMap(value => of(value * 2, value * 3))
).subscribe(
  result => console.log(result)
);

// 2  1*2
// 3  1*3
// 4  2*2
// 6  2*3
// 6  3*2
// 9  3*3

scan

对 Observable 累积操作,类似 Array.reduce

const observable = of(1, 2, 3, 4, 5);

observable.pipe(
  scan((acc, value) => acc + value, 0)
).subscribe(
  result => console.log(result)
);

debounceTime

在发出值之前等待一段时间,如果在此时间内没有新值发出,则发出该值

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const input = document.getElementById('searchInput');

const observable = fromEvent(input, 'input').pipe(
  map(event => event.target.value), // 提取输入框的值
  debounceTime(300) // 在发出值之前等待300毫秒
);

observable.subscribe(
  searchValue => {
    // 在这里执行搜索操作,只有在用户停止输入300毫秒后才会执行
    console.log('Search:', searchValue);
  }
);

distinctUntilChanged

过滤操作符,确保只有当前值与前一个值不相同时才将值传递给下游的观察者

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const observable = of(1, 1, 2, 2, 3, 3, 3, 4);

observable.pipe(
  distinctUntilChanged()
).subscribe(
  value => console.log(value)
);

// 1, 2, 3, 4

combineLatest

组合操作符,用于将多个 observable 流中的最新值组合成一个数组,并在任何一个 observable 中发生变化时发射新值

import { interval } from 'rxjs';
import { combineLatest } from 'rxjs/operatos';

const observable1 = interval(1000);
const observable2 = interval(500);

combineLatest(observable1, observable2).subscribe(
  values => console.log(values)
);
// [0, 0]
// [1, 0]
// [1, 1]
// [2, 1]
// [2, 2]
// [3, 2]

zip

合并多个观察值,创建一个观察值,所有观察值完成后按其值按顺序从计算得出数组湖泊自定义计算值

import { of, zip, map , interval} from 'rxjs';

const age$ = of(27, 25, 29);
const name$ = of('Foo', 'Bar', 'Beer');

zip(age$, name$, interval(3000)).pipe(
  map(([age, name, s]) => ({ age, name, s }))
)
.subscribe(x => console.log(x)); 

// 间隔3s
// {age: 27, name: "Foo", isDev: 0}
// {age: 25, name: "Bar", isDev: 1}
// {age: 29, name: "Beer", isDev: 2}