在 RxJS 中为我们提供了很多创建 Observable 对象的方法,其中 create
是最基本的方法。它是 Observable 类的静态属性 —— static create: Function
,也是创建 Observable 对象的工厂方法。
1 2 3 4 5 6 7 8 9 10 import { Observable } from "rxjs" ;const observable$ = Observable.create(observer => { observer.next('Semlinker' ); observer.next('Lolo' ); }); observable$.subscribe(value => { console .log(value); });
以上代码运行后,控制台会依次输出 ‘Semlinker’ 和 ‘Lolo’ 。
需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为 。具体示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 import { Observable } from "rxjs" ;const observable$ = Observable.create(observer => { observer.next('Semlinker' ); observer.next('Lolo' ); }); console .log('start' );observable$.subscribe(function (value ) { console .log(value); }); console .log('end' );
以上代码运行后,控制台的输出结果:
1 2 3 4 start Semlinker Lolo end
当然我们也可以用它处理异步行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import { Observable } from "rxjs" ;const observable$ = Observable.create(observer => { observer.next('Semlinker' ); observer.next('Lolo' ); setTimeout(() => { observer.next('RxJS Observable' ); }, 300 ); }); console .log('start' );observable$.subscribe(function (value ) { console .log(value); }); console .log('end' );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 start Semlinker Lolo end RxJS Observable
从以上例子中,我们可以得出一个结论 —— Observable 可以应用于同步和异步的场合。
Observer Observer(观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。
Observer 接口定义 :
1 2 3 4 5 6 interface Observer<T> { closed?: boolean ; next: (value: T ) => void ; error: (err: any ) => void ; complete: () => void ; }
Observer 中的三个方法的作用:
next —— 每当 Observable 发送新值的时候,next 方法会被调用。 error —— 当 Observable 内发生错误时,error 方法就会被调用。 complete —— 当 Observable 数据终止后,complete 方法会被调用。在调用 complete 方法之后,next 方法就不会再次被调用。 接下来我们来看个具体示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import { Observable } from "rxjs" ;const observable$ = Observable.create(observer => { observer.next('Semlinker' ); observer.next('Lolo' ); observer.complete(); observer.next('not work' ); }); const observer = { next: function (value ) { console .log(value); }, error: function (error ) { console .log(error); }, complete: function ( ) { console .log('complete' ); } } observable$.subscribe(observer);
以上代码运行后,控制台的输出结果:
上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work
。
另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:
1 2 3 4 5 var observer = { next: function (value ) { console .log(value); } };
有时候 Observable 可能是一个无限的序列,比如监听 click 事件,对于这种场景,complete 方法就永远不会被调用。
我们也可以在调用 Observable 对象的 subscribe
方法时,依次传入 next、error、complete 三个函数,来创建观察者:
1 2 3 4 5 observable.subscribe( value => { console .log(value); }, error => { console .log('Error: ' , error); }, () => { console .log('complete' ); } );
Subscription 有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription
对象的 unsubscribe
方法来释放资源。具体示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import { timer } from "rxjs" ;const source$ = timer(1000 , 1000 );const subscription = source$.subscribe({ next: function (value ) { console .log(value); }, complete: function ( ) { console .log('complete!' ); }, error: function (error ) { console .log('Throw Error: ' + error); } }); setTimeout(() => { subscription.unsubscribe(); }, 5000 );
常见 creation 操作符 除了上面介绍的 create 方法之外,RxJS 还提供了很多操作符,用于创建 Observable 对象,比如:
of from range empty throwError fromEvent interval timer of 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { of } from "rxjs" ;const source$ = of ('Semlinker' , 'Lolo' );source$.subscribe({ next: function (value ) { console .log(value); }, complete: function ( ) { console .log('complete!' ); }, error: function (error ) { console .log(error); } });
以上代码运行后,控制台的输出结果:
1 2 3 Semlinker Lolo complete!
from 数据源为数组 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { from } from "rxjs" ;const source$ = from ([1 , 2 , 3 ]); source$.subscribe({ next: function (value ) { console .log(value); }, complete: function ( ) { console .log("complete!" ); }, error: function (error ) { console .log(error); } });
以上代码运行后,控制台的输出结果:
数据源为 Promise 对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { from } from "rxjs" ;const promiseSource$ = from (new Promise (resolve => resolve("Hello World!" )));promiseSource$.subscribe({ next: function (value ) { console .log(value); }, complete: function ( ) { console .log("complete!" ); }, error: function (error ) { console .log(error); } });
以上代码运行后,控制台的输出结果:
range 1 2 3 4 import { range } from "rxjs" ;const source$ = range(1 , 5 );const example = source$.subscribe(val => console .log(val));
以上代码运行后,控制台的输出结果:
empty empty就是产生一个直接完结的Observable对象,没有参数,不产生任何数据,直接完结。
1 2 3 4 5 6 import { empty } from "rxjs" ;const subscribe = empty().subscribe({ next: () => console .log("Next" ), complete: () => console .log("Complete!" ) });
以上代码运行后,控制台的输出结果:
throwError 1 2 3 4 5 6 7 8 import { throwError } from "rxjs" ;const source$ = throwError("This is an error!" );source$.subscribe({ next: val => console .log(val), complete: () => console .log("Complete!" ), error: val => console .log(`Error: ${val} ` ) });
以上代码运行后,控制台的输出结果:
1 Error: This is an error! # throwError 只做一件事就是抛出异常。
fromEvent 1 2 3 4 5 6 import { fromEvent } from "rxjs" ;import { map } from "rxjs/operators" ;const source$ = fromEvent(document , "click" );const example$ = source$.pipe(map(event => `Event time: ${event.timeStamp} ` ));const subscribe = example$.subscribe(val => console .log(val));
interval 1 2 3 4 import { interval } from "rxjs" ;const source$ = interval(1000 );source$.subscribe(val => console .log(val));
以上代码运行后,控制台的输出结果:
interval 支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。
timer 1 2 3 4 import { timer } from "rxjs" ;const source$ = timer(1000 , 5000 );const subscribe = source$.subscribe(val => console .log(val));
以上代码运行后,控制台的输出结果:
1 2 3 4 0 # 1s后 1 # 5s后 2 # 5s后 ...
timer 支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 也可以只传一个参数,比如:
1 2 3 4 5 6 7 8 import { timer } from "rxjs" ;const source$ = timer(1000 );source$.subscribe( val => console .log(val), () => console .error("error!" ), () => console .log("complete!" ) );
以上代码运行后,控制台的输出结果:
参考资源 欢迎小伙伴们订阅全栈修仙之路,及时阅读 TypeScript、Node/Deno、Angular 技术栈最新文章。