观察者模式 观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。
我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:
期刊出版方 —— 负责期刊的出版和发行工作。 订阅者 —— 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知。 在观察者模式中也有两个主要角色:Subject(主题)和 Observer (观察者),它们分别对应例子中的期刊出版方和订阅者。
订阅 Observable 在介绍 RxJS Subject 之前,我们先来看个示例:
1 2 3 4 5 6 7 8 9 10 import { interval } from "rxjs" ;import { take } from "rxjs/operators" ;const interval$ = interval(1000 ).pipe(take(3 ));interval$.subscribe(value => console .log("Observer A get value: " + value)); setTimeout(() => { interval$.subscribe(value => console .log("Observer B get value: " + value)); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 6 Observer A get value: 0 Observer A get value: 1 Observer B get value: 0 Observer A get value: 2 Observer B get value: 1 Observer B get value: 2
通过以上示例,我们可以得出以下结论:
Observable 对象可以被重复订阅。 Observable 对象每次被订阅后,都会重新执行。 上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:
1 2 3 4 5 6 7 8 9 function interval ( ) { setInterval(() => console .log('..' ), 1000 ); } interval(); setTimeout(() => { interval(); }, 1000 );
Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播。
上述的需求要如何实现呢?我们已经知道了观察者模式定义了一对多的关系,我们可以让多个观察者对象同时监听同一个主题,这里就是我们的时间序列流。当数据源发出新值的时,所有的观察者就能接收到新的值。
自定义 Subject Subject 类定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Subject { observers = []; addObserver(observer) { this .observers.push(observer); } next(value) { this .observers.forEach(o => o.next(value)); } error(error) { this .observers.forEach(o => o.error(error)); } complete() { this .observers.forEach(o => o.complete()); } }
使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 const interval$ = interval(1000 ).pipe(take(3 ));const subject = new Subject();let observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; var observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.addObserver(observerA); interval$.subscribe(subject); setTimeout(() => { subject.addObserver(observerB); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 6 7 Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
RxJS Subject 其实 RxJS 也为我们提供了 Subject 类,接下我们来利用 RxJS 的 Suject 重写一下上面的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import { interval, Subject } from "rxjs" ;import { take } from "rxjs/operators" ;const interval$ = interval(1000 ).pipe(take(3 ));const subject = new Subject();const observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; const observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.subscribe(observerA); interval$.subscribe(subject); setTimeout(() => { subject.subscribe(observerB); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 6 7 Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
根据上述的示例代码和控制台的输出结果,我们来总结一下 Subject 的特点:
Subject 既是 Observable 对象,又是 Observer 对象。 当有新消息时,Subject 会通知内部的所有观察者。 RxJS Subject & Observable Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next()
方法,把值一一送出。
Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:
next —— 每当 Subject 对象接收到新值的时候,next 方法会被调用。 error —— 运行中出现异常,error 方法会被调用。 complete —— Subject 订阅的 Observable 对象结束后,complete 方法会被调用。 subscribe —— 添加观察者。 unsubscribe —— 取消订阅(设置终止标识符、清空观察者列表)。 除了 Subject 之外,RxJS 还为我们提供了 Subject 的几种变体,如 BehaviorSubject、ReplaySubject 和 AsyncSubject。下面我们来分别介绍一下它们。
BehaviorSubject 有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。
为了说明上述的情景,我们先来分析一下以下示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import { Subject } from "rxjs" ;const subject = new Subject();const observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; const observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.subscribe(observerA); subject.next(1 ); subject.next(2 ); subject.next(3 ); setTimeout(() => { subject.subscribe(observerB); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3
通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next()
方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。
BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。
下面我们来使用 BehaviorSubject 重写上面的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import { BehaviorSubject } from "rxjs" ;const subject = new BehaviorSubject(0 );const observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; const observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.subscribe(observerA); subject.next(1 ); subject.next(2 ); subject.next(3 ); setTimeout(() => { subject.subscribe(observerB); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 Observer A get value: 0 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3
通过以上示例,我们知道 BehaviorSubject 会记住最近一次发送的值,当新的观察者进行订阅时,就会接收到最新的值。然后有些时候,我们新增的订阅者,可以接收到数据源最近发送的几个值,针对这种场景,我们就需要使用 ReplaySubject。
ReplaySubject 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import { ReplaySubject } from "rxjs" ;const subject = new ReplaySubject(2 );const observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; const observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.subscribe(observerA); subject.next(1 ); subject.next(2 ); subject.next(3 ); setTimeout(() => { subject.subscribe(observerB); }, 1000 );
以上代码运行后,控制台的输出结果:
1 2 3 4 5 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 2 Observer B get value: 3
可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放 。
AsyncSubject AsyncSubject 类似于 last
操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import { AsyncSubject } from "rxjs" ;const subject = new AsyncSubject();const observerA = { next: value => console .log("Observer A get value: " + value), error: error => console .log("Observer A error: " + error), complete: () => console .log("Observer A complete!" ) }; const observerB = { next: value => console .log("Observer B get value: " + value), error: error => console .log("Observer B error: " + error), complete: () => console .log("Observer B complete!" ) }; subject.subscribe(observerA); subject.next(1 ); subject.next(2 ); subject.next(3 ); subject.complete(); setTimeout(() => { subject.subscribe(observerB); }, 1000 );
最后我们来介绍一下在 Angular 项目中,RxJS Subject 的应用。
Angular RxJS Subject 应用 在 Angular 中,我们可以利用 RxJS Subject 来实现组件间通信,具体示例如下:
message.service.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import { Injectable } from '@angular/core' ;import { Observable, Subject } from 'rxjs' ;@Injectable ({ providedIn: 'root' }) export class MessageService { private subject = new Subject<any >(); sendMessage(message: string ) { this .subject.next({ text: message }); } clearMessage() { this .subject.next(); } getMessage(): Observable<any > { return this .subject.asObservable(); } }
home.component.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import { Component } from '@angular/core' ;import { MessageService } from '../message.service' ;@Component ({ selector: 'app-home' , template: ` <div> <h1>Home</h1> <button (click)="sendMessage()">Send Message</button> <button (click)="clearMessage()">Clear Message</button> </div> ` }) export class HomeComponent { constructor (private messageService: MessageService ) { } sendMessage(): void { this .messageService.sendMessage('Message from Home Component to App Component!' ); } clearMessage(): void { this .messageService.clearMessage(); } }
app.component.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import { Component, OnDestroy } from '@angular/core' ;import { Subscription } from 'rxjs' ;import { MessageService } from './message.service' ;@Component ({ selector: 'my-app' , template: ` <div *ngIf="message">{{message.text}}</div> <app-home></app-home> ` }) export class AppComponent implements OnDestroy { message: any ; subscription: Subscription; constructor (private messageService: MessageService ) { this .subscription = this .messageService.getMessage().subscribe(message => { this .message = message; }); } ngOnDestroy() { this .subscription.unsubscribe(); } }
感兴趣的同学可以查看 Stackblitz 完整示例。
参考资源