参加 2018 ngChina 开发者大会,特别喜欢 Michael Hladky 奥地利帅哥的 RxJS 分享,现在拿出来好好学习工作坊的内容(工作坊 Demo 地址),结合这个示例,做了一个改进版本,实现更简洁,逻辑更直观。
了解者可跳过次章节
摩斯密码(Morse),是一种时通时断的信号代码,这种信号代码通过不同的排列组合来表达不同的英文字母、数字和标点符号等。 地球人都知道的 SOS 求救信号,就是 Morse,三短( S ) 三长( O ) 三短( S )。
信号对应表如下:
分析关键步骤,很巧,和把大象装进冰箱里同样都只需要三步耶:
第一步,识别点信号,短为 “滴” 长为“嗒”。
第二步,根据 “长间隔” 来切片分组。
第三步,分组数据根据对应表转化出最终结果。
开始前要做好热身活动:
Morse 的最小单元,"." 代表嘀,"-" 代表嗒,点击事件用 Down 代表 mousedown,Up 代表 mouseup。 200ms 间隔用来区别嘀嗒,1s 间隔用来区分一个 Morse 单元组的结束。
// 点信号 = Down - Up = 间隔 < 200ms ?"." : "-";
// Down <200ms Up >1s = "." = E
// Down <200ms Up <1s Down >200ms Up >1s = ".", "-" = A
// 直接使用 fromEvent 操作符,来生成点击操作的流,然后用 map 操作符转化成时间戳,
// takeUntil 用来控制流的结束,避免重复订阅。
const clickBegin$ = fromEvent(this.sendButtonElementRef.nativeElement, 'mousedown')
.pipe(
takeUntil(this.onDestroy$),
map(n => Date.now())
)
const clickEnd$ = fromEvent(this.sendButtonElementRef.nativeElement, 'mouseup')
.pipe(
takeUntil(this.onDestroy$),
map(n => Date.now())
)
前面代码已经拿到点击事件的流,并且用 "map" 操作符,把数据转化为当前的时间戳。
下面开始计算 Down & Up 之间的间隔时间,思考,合并两个流的的操作符有哪些呢?
forkJoin、concat ? 需要两个流 complate 状态后才返回数据,不适应数据持续输出的场景。
merge ? Down & Up 的时间戳不会同时获得,还需要处理存储的问题,不完全适应场景。
combineLatest ? 满足数据持续输出,满足同时获得,哎哟,还不错。 但是这个操作符的特点是,会缓存上一次的值,所以第二次 Down 也会获得到数据,Up - Down 也就会为负值,取绝对值后可以用来判断是否 >1s,来区分一个 Morse 单元组的结束。
zip ? 哎呀哈,这个更合适呢,盘它! 单词选的很到位,这个操作符功能可以理解为像拉链一样,确保获得数据每一次都是一个纯净的 Down & Up。 但是需要注意 zip 会自动缓存数据,例如 zip(A$, B$),A$收到的数据一直比 B$多太多,有内存溢出风险,就像拉错位的拉链,很蓝瘦。
// zip 的实现
zip(clickBegin$, clickEnd$)
.pipe(
// 计算 Down - Up 间隔时间
map(this.toTimeDiff),
// 根据间隔时间,转化为嘀嗒替代字符 "." "-"
map(this.msToMorseCharacter)
)
.subscribe(result => {
// 发送到主信号流
morseSignal$.next(result);
});
分组的操作符有哪些?
partition ? 根据函数拆成两个流。
groupBy ? 根据函数拆成 n 个流。
window ? 根据流拆成 n 个流。以上各位都打扰了,我还要自己处理数据缓存,再见。
buffer ? 哇,初恋般的感觉,用流控制来做切片数据成数组,拿到数组只需要 join 一下就好,就可以去去匹配对应表了,好棒! “长间隔”的切片流,怎么获得呢?拿出法宝 debounceTime(1000) ,当点击的 Down Up 周期完成后,间隔 1s 就认为是一个 Morse 单元组的结束。 然后又遇到了问题,怎么判断一个点击周期呢?不用单纯用 Up,因为下一个 Down Up 周期可能会超出 1s,就会导致切片时机错误。所以模拟了点击持续的流 clickKeeping$,用 switchMap 替换为新的流且不影响原来的流,timer 产生一个小于 1s 间隔的持续流信号,用 takeUntil 在 Up 事件流 clickEnd$ 后把整个流结束。
// 点击持续状态流
const clickKeeping$ = clickBegin$
.pipe(
// 替换为新的流,不影响原来的流
switchMap(() => {
// 定时在持续发送数据,维持点击中状态
return timer(0, morseTimeRanges.lessThenlongBreak).pipe(
// 直到 Up 后结束点击状态
takeUntil(clickEnd$)
);
})
)
// “长间隔”的切片流
const morseBreak$ = clickKeeping$.pipe(
debounceTime(morseTimeRanges.longBreak)
);
// 获得 Morse 单元组
morseSignal$
.pipe(
// 切片分组主信号流
buffer(morseBreak$) // 转化为,例如 ['.', '.', '.']
)
join('') Morse 单元组去匹配对应表,很简单不用说。
错误发生在 switchMap 中,分支流报错,但是主流不会收到影响,然后用 catchError 捕捉错误。
// Morse 单元组去匹配对应表
private translateSymbolToLetter = morseArray => {
const morseCharacters = morseArray.join('');
const find = morseTranslations.find(n => n.symbol === morseCharacters)
// 这里 find 可能为 undefined 导致报错,但是错误会被 catchError 捕捉
return find.char;
}
// 转化+错误处理,最终完成
morseSignal$
.pipe(
buffer(morseBreak$),
switchMap(n => {
return of(n).pipe(
// 只为了 Demo 演示中的展示用
tap(n => this.lastMorseGroupCharacters = n.join(' ')),
// 转化成对应表中字符
map(this.translateSymbolToLetter),
// 捕捉错误
catchError(n => {
return of(morseCharacters.errorString);
})
)
})
)
.subscribe(result => {
// 输出最终转化结果
this.morseLog.push(result);
console.log('结果:', result)
});
整体上,把 “嘀嗒” “短间隔” “长间隔” 都转化成替代符,过滤无用的替代符,然后 filter “长间隔” 替代符的流,来做 buffer 切片数据。其他还有因为使用 combineLatest 操作符导致的不同。
// 识别 “嘀” “嗒”
const morseCharFromEvents$ = observableCombineLatest(this.startEvents$, this.stopEvents$)
.pipe(
// 计算 mousedown mouseup 时间间隔
map(this.toTimeDiff),
// 转化成标识符
map(this.msToMorseChar),
// 过滤 Morse 单元组中的 “短间隔“ 标识符
filter(this.isCharNoShortBreak as any)
);
// 主信号流
this.morseChar$ = observableMerge(morseCharFromEvents$, this.injectMorseChar$)
// 识别 “长间隔“ 标识符,来作为切片流
const longBreaks$ = this.morseChar$
.pipe(filter(this.isCharLongBreak as any));
// 切片成 Morse 单元组
this.morseSymbol$ = this.morseChar$
.pipe(
buffer(longBreaks$),
map(this.charArrayToSymbol),
filter(n => (n !== '') as any)
)
// 错误处理 + 标识符对应表转化
this.morseLetter$ = this.morseSymbol$
.pipe(
switchMap(n => observableOf(n).pipe(this.saveTranslate('ERROR')))
);
// Up 后补 4 个 “长间隔“ 标识符,用来做 Morse 单元组的结束
const breakEmitter$ = observableTimer(this.msLongBreak, this.msLongBreak)
.pipe(
mapTo(this.mC.longBreak),
take(4)
);
this.stopEventsSubject
.pipe(
switchMapTo(
breakEmitter$.pipe(takeUntil(this.startEventsSubject))
)
)
.subscribe(n => this.injectMorseChar(n));
下图是读完《深入浅出 RxJS 》后的学习笔记,标注了一些操作符的快速记忆特点,方便使用的适合查阅。