RxJS
Think of RxJS as Lodash for events.
Rxjs想象成针对events的lodash,即Rxjs本质是个工具库,处理对象是事件(又称流)
流:异步行为产生的数据,即将来的数据的集合;
RxJs支持对流进行处理,如合并、截断、延迟、抖动等
基础对象
observable
可观察对象(Observable流)
import { Observable } from 'rxjs';
// 流
const observable = new Observable((subscriber) => {
document.addEventListener('click', function(){
subscriber.next(event)
})
});
new Observalbe(fn)
定义一个流,即将来数据的集合,subscriber.next
输出流数据
subscription
观察者(Observer)
即定义如何处理流产生的数据(消费),observable.subscribe
启动一个流
只有启动了的流
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
setTimeout(() => {
subscriber.next(2);
subscriber.complete();
}, 1000);
});
// 启动流
const subscription = observable.subscribe({
// 流的处理逻辑
next(val) {
console.log('got value ' + val);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
注: 支持多次启动Observable,即开启等多个新流,他们互相独立互不干扰
关闭流
通过 subscription.unsubscribe
关闭/停止流
subscription.unsubscribe(); // 停止一个启动后的流
理解
单身的 | 多种的 | |
---|---|---|
拉 | Function | Interator |
推 | Promise | Observabale |
拉: 主动访问确定数据消费
推: 被动接受将来数据消费
subject
import { Subject } from "rxjs";
// 创建subject
const subject = new Subject();
// 订阅一个observer
subject.subscribe(v => console.log("stream 1", v));
// 再订阅一个observer
subject.subscribe(v => console.log("stream 2", v));
// 延时1s再订阅一个observer
setTimeout(() => {
subject.subscribe(v => console.log("stream 3", v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
subject.next(3);
}, 3000);
// output
// stream 1 1 //立即输出
// stream 2 1 //立即输出
// stream 1 2 //立即输出
// stream 2 2 //立即输出
// stream 1 3 //3s后输出
// stream 2 3 //3s后输出
// stream 3 3 //3s后输出
Subject
的基础使用与发布订阅模式极其相似
操作符
允许以声明方式轻松组合复杂异步代码
管道运算符
不更改现有的 Observable 实例,且它们返回一个新的 Observable;
本质是纯函数处理,其订阅逻辑基于第一个 Observable;
observable.pipe(operator)
observable.pipe(operatorFactory())
: filter(...)
和 mergeMap(...)
创建运算符
作为独立函数调用来创建新的 Observable
of
立即创建一个 observable,它会依次发出指定的值,然后完成
of(10, 20, 30)
.subscribe({
next: value => console.log('next:', value),
error: err => console.log('error:', err),
complete: () => console.log('the end'),
});
from
将数组、类数组对象、Promise、迭代器、甚至是函数转换为 observable
// const array = [1, 2, 3, 4, 5];
// const observable = from(array);
const promise = new Promise(resolve => {
setTimeout(() => {
resolve('Data from Promise');
}, 1000);
});
const observable = from(promise);
observable.subscribe(
value => console.log(value),
error => console.error(error),
() => console.log('Observable completed')
);
pipe
将多个操作符串联在一起(操作符链),以便在 observable 上进行一系列的操作
observable.pipe(
filter(value => value % 2 === 0),
map(value => value * 2)
).subscribe(
result => console.log(result)
);
map/filter
mergeMap
将源值投射到一个 Observable 中,并对其合并(flat)后输出为 Observable
const observable = of(1, 2, 3);
observable.pipe(
mergeMap(value => of(value * 2, value * 3))
).subscribe(
result => console.log(result)
);
// 2 1*2
// 3 1*3
// 4 2*2
// 6 2*3
// 6 3*2
// 9 3*3
scan
对 Observable 累积操作,类似 Array.reduce
const observable = of(1, 2, 3, 4, 5);
observable.pipe(
scan((acc, value) => acc + value, 0)
).subscribe(
result => console.log(result)
);
debounceTime
在发出值之前等待一段时间,如果在此时间内没有新值发出,则发出该值
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
const input = document.getElementById('searchInput');
const observable = fromEvent(input, 'input').pipe(
map(event => event.target.value), // 提取输入框的值
debounceTime(300) // 在发出值之前等待300毫秒
);
observable.subscribe(
searchValue => {
// 在这里执行搜索操作,只有在用户停止输入300毫秒后才会执行
console.log('Search:', searchValue);
}
);
distinctUntilChanged
过滤操作符,确保只有当前值与前一个值不相同时才将值传递给下游的观察者
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
const observable = of(1, 1, 2, 2, 3, 3, 3, 4);
observable.pipe(
distinctUntilChanged()
).subscribe(
value => console.log(value)
);
// 1, 2, 3, 4
combineLatest
组合操作符,用于将多个 observable 流中的最新值组合成一个数组,并在任何一个 observable 中发生变化时发射新值
import { interval } from 'rxjs';
import { combineLatest } from 'rxjs/operatos';
const observable1 = interval(1000);
const observable2 = interval(500);
combineLatest(observable1, observable2).subscribe(
values => console.log(values)
);
// [0, 0]
// [1, 0]
// [1, 1]
// [2, 1]
// [2, 2]
// [3, 2]
zip
合并多个观察值,创建一个观察值,所有观察值完成后按其值按顺序从计算得出数组湖泊自定义计算值
import { of, zip, map , interval} from 'rxjs';
const age$ = of(27, 25, 29);
const name$ = of('Foo', 'Bar', 'Beer');
zip(age$, name$, interval(3000)).pipe(
map(([age, name, s]) => ({ age, name, s }))
)
.subscribe(x => console.log(x));
// 间隔3s
// {age: 27, name: "Foo", isDev: 0}
// {age: 25, name: "Bar", isDev: 1}
// {age: 29, name: "Beer", isDev: 2}