rxjs 如何优雅的处理轮询任务?

2019-08-23 16:43:25 +08:00
 b1anker

需求:

自己的实现

自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?

轮询代码

import { from, Observable, Subscriber } from 'rxjs';
import { delay, mapTo, repeatWhen } from 'rxjs/operators';

interface RetryOptions<T = any, P = any> {
  try: (tryRequest: P) => Promise<T>;
  tryRequest: P;
  retryUntil: (response: T) => boolean;
  maxTimes?: number;
  tick?: number;
}

export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
  options = Object.assign(
    {
      maxTimes: 20,
      tick: 1000
    },
    options
  );
  let result = null;

  const notifier = () => {
    // 计数最大尝试次数
    let count = 0;
    const loop = (producer: Subscriber<any>) => {
      // 超过最大次数强制退出轮询
      if (count >= options.maxTimes) {
        producer.complete();
      } else {
        options
          .try(options.tryRequest)
          .then(res => {
            producer.next(count++);
            // 满足条件则退出轮询
            if (options.retryUntil(res)) {
              producer.complete();
            } else {
            // 不满足条件则继续轮询
              loop(producer);
            }
            // 保存请求结果
            result = res;
          })
          .catch(err => {
            producer.error(err);
          });
      }
    };
    return new Observable(producer => {
      loop(producer);
    });
  };

  return from([0]).pipe(
    delay(options.tick),
    // 当满足条件是,进行一下轮轮询
    repeatWhen(notifier),
    // 转换结果
    mapTo(() => result)
  );
};

测试用例

import { polling } from './polling';

let count = 0;

const mockRequest = (): Promise<string> => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (count < 6) {
        resolve('pending');
      } else {
        resolve('finish');
      }
      count++;
    }, 1000);
  });
};

polling<string, number>({
  try: mockRequest,
  tryRequest: count,
  retryUntil: res => {
    return res === 'finish';
  }
}).subscribe((response) => {
  const result = response();
  console.log(result);
  if (result === 'finish') {
    console.log('轮询结束');
  }
  // 这个轮询结束后应该怎么继续轮询比较好?
  // 继续在这里 polling 下一个轮询吗?容易回调地狱啊
});

结果

null
pending
pending
pending
pending
pending
pending
finish // 上面的都不输出,只输出最后一个结果,因为上面的我并不关注
轮询结束
5614 次点击
所在节点    JavaScript
17 条回复
leemove
2019-08-23 16:50:31 +08:00
是不是有点把问题复杂化了.
b1anker
2019-08-23 16:54:05 +08:00
@leemove 那应该怎么处理呢?
momocraft
2019-08-23 16:54:29 +08:00
concatMap ?
b1anker
2019-08-23 16:57:50 +08:00
发现有个 last 操作符,可以解决最后只输出最终结果
leemove
2019-08-23 17:48:30 +08:00
楼主可以看看我写的这个简单 demo 其实 rxjs 的操作符还是很强大的,而且 rxjs 的重试操作符是很强的.请求被我简化了,最大重试次数的逻辑没加.
地址: https://stackblitz.com/edit/rxjs-playground-test
leemove
2019-08-23 17:50:14 +08:00
@leemove 不好意思上一条地址发错了 正确地址: https://stackblitz.com/edit/rxjs-playground-test-t8quzt
wawaforya
2019-08-23 18:34:49 +08:00
献丑了,有什么错误请轻拍😂

``` typescript
import { Observable, of, race, timer } from 'rxjs';
import { concatMapTo, skipWhile, take, tap } from 'rxjs/operators';

type Result = 'pending' | 'success';
const getResult: () => Observable<Result> = () => of<Result>('success').pipe(tap(() => console.log('Requested.'))); // 向接口请求数据的函数

const limit = 20;
const schedule = timer(0, 1000);
const requestSource = schedule.pipe(concatMapTo(getResult()), skipWhile(result => result === 'pending'));
const upperBound = schedule.pipe(skipWhile(value => value < limit));
race([requestSource, upperBound]).pipe(
take(1)
).subscribe(
result => console.log(result), // 这里需要判断类型,如果是数字,说明 20 次了都还没有 success
error => console.error(error),
() => console.log('Completed')
);
```
wawaforya
2019-08-23 18:44:41 +08:00
#7 好像不会生成一个新的请求,要把 `concatMapTo(getResult())` 改成 `concatMap(() => getResult())` 哈哈
b1anker
2019-08-23 19:00:52 +08:00
@leemove 你这个一直报错呀
b1anker
2019-08-23 19:03:41 +08:00
@wawaforya 你这个 timer,其实跟 interval 差不多,其实我一开始也是这么搞得,但是得考虑一种情况,有可能请求完成超过 1s,这样子就不好控制了
ibufu
2019-08-23 19:46:30 +08:00
所以关键点是解决竞态。你 google 一下 rxjs 竞态,应该能搜到很多。
b1anker
2019-08-24 00:07:13 +08:00
wawaforya
2019-08-24 11:53:00 +08:00
@b1anker 哇,有道理,还要考虑请求时间的问题,学到了👍
leemove
2019-08-27 14:05:12 +08:00
@b1anker 没看到 expand 啊..这版和我那版好像是一样的啊...就是你用 promise 请求,我用的是 of 直接返回 Ob
leemove
2019-08-27 14:07:04 +08:00
@ibufu 轮询不涉及竞态的,就是一个简单的同步循环请求而已.
b1anker
2019-08-27 19:33:08 +08:00
@leemove 你可能看错文件了,你看看 polling.ts
malcolmyu
2019-09-30 17:43:32 +08:00
@b1anker 是否可以把 pending 作为一种异常推出来,然后用 retryWhen 来解决

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/594556

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX