JS 中,串行异步任务的取消是否有更好的处理方式

159 天前
 dvsilch

最近遇到的业务场景:

  1. 用户输入时进行初始化,初始化过程中包含了异步任务的串行
  2. 初始化过程中,不阻塞用户输入

如果用户输入时上一次的初始化还没结束,就会出现多次初始化的并行。我目前的处理方式是初始化开始时记下当前状态,每结束一个串行的异步任务,都进行一次状态比对。简化后的代码如下:

/**
 * @type {number | undefined}
 */
let state = undefined

/**
 * @param {number} ms
 * @returns {Promise<void>}
 * @description sleep for ms milliseconds to simulate async task
 */
function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

/**
 * @returns {Promise<void>}
 * @description init async task
 */
async function init() {
  const current = Date.now()
  state = current

  await sleep(500)
  if (state !== current) {
    console.warn(`state ${current} init: canceled`)
    return
  }

  await sleep(500)
  if (state !== current) {
    console.warn(`state ${current} init: canceled`)
    return
  }

  console.log(`state ${current} init: done`)
  state = undefined
}

但我感觉这种做法不太合理,虽然上面用当前时间戳模拟了状态,但实际上业务场景中可能会出现某几次用户输入一致,所以状态一致,进而导致这几次输入执行的初始化过程无法正常中断。像 C#中有 CancellationToken 可以直接调用 Cancel 取消异步任务。不知道 JS 是否拥有类似的设计,或者对于这类业务场景有更好的处理方式?

3057 次点击
所在节点    JavaScript
33 条回复
chenliangngng
159 天前
为什么要用时间戳,直接存储状态不就可以了
不论是 promise 和 ajax 都可以取消
Pencillll
159 天前
我一般把整个流程写成一个 task ,并提供 cancel 函数,需要取消时就主动调用 task.cancel()
pursuer
159 天前
js 现在还没有自动取消异步任务的方法。可以用 AbortSignal ,但要每次 await 前 throwIfAbort 。或实现一个类似效果的东西。
rabbbit
159 天前
const sleep = (time) => {
return new Promise((resolve) => {
setTimeout(() => resolve(), time);
});
};

const task = (id) => {
let _reject;
const p = new Promise(async (resolve, reject) => {
_reject = reject;
const time = Math.random() * 10;
await sleep(time);
resolve({ id, time });
}).then(({ id, time }) => console.log(id, time));

return () => _reject();
};

const main = () => {
let cancelFn;
for (let i = 0; i < 5; i++) {
cancelFn?.();
cancelFn = task(i);
}
};
main();
DOLLOR
159 天前
不要用时间戳,可以用 symbol 来标记状态。Symbol()创建的每个 symbol 都是唯一的。
dvsilch
159 天前
@rabbbit 这个方式我之前实现过,但是存在麻烦的地方是:业务场景需要外部和内部都拥有 cancel 的能力,结果就是需要暴露一个额外的方法同时内部持有这个 reject ,担心会出现因为持有 reject 导致 promise 无法从内存中释放的情况
dvsilch
159 天前
@chenliangngng
@DOLLOR
@pursuer
不好意思主贴里没说清楚,代码里的状态只是随便写了一个东西,实际业务并不是这么实现的。而且,与其说会因为状态出现 bug ,更不如说是我感觉这种「每次异步任务后需要主动判断状态」的做法太傻逼了...
doommm
159 天前
我在业务中也遇到了这种问题,有一连串的异步调用链需要取消(可能执行到任意一步,不需要撤销,只要停止执行后续就行),没啥头绪。
有一些比较简单的场景,像异步拉取一些列表数据之类的,我尝试过用 Rxjs 来处理。switchMap 还是挺好用的
rabbbit
159 天前
脑洞,不推荐这么写
const sleep = (time) => {
return new Promise((resolve) => {
setTimeout(() => resolve(), time);
});
};

async function* task(id) {
yield sleep(Math.random() * 100);
console.log('id: ', id, 'num: ', 1);
yield sleep(Math.random() * 100);
console.log('id: ', id, 'num: ', 2);
yield sleep(Math.random() * 100);
console.log('id: ', id, 'num: ', 3);
}

const main = async () => {
for (let i = 0; i < 5; i++) {
const start = Date.now();
for await (const t of task(i)) {
const diff = Date.now() - start;
if (diff > 100) {
break;
} else {
console.log("time: ", diff);
}
}
console.log('end ---')
}
};
main();
rabbbit
159 天前
排版不好,放 github 了,另外还有一些例子
https://gist.github.com/Aaron-Bird/42359bf78fe0e868946cb5897f6ca7cd
chnwillliu
159 天前
rxjs switchMap ?
Projection
159 天前
用 RxJS 可以方便地实现,跟 AI 交流了一会后给出了如下代码:

import { Subject, from } from 'rxjs';
import { switchMap } from 'rxjs/operators';

// 模拟异步任务 A 、B 、C
const taskA = (input) => {
console.log(`Starting task A with input: ${input}`);
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Finishing task A with input: ${input}`);
resolve(input + 'A');
}, 1000); // 假设任务 A 需要 1 秒钟
});
};

const taskB = (input) => {
console.log(`Starting task B with input: ${input}`);
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Finishing task B with input: ${input}`);
resolve(input + 'B');
}, 1000); // 假设任务 B 需要 1 秒钟
});
};

const taskC = (input) => {
console.log(`Starting task C with input: ${input}`);
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Finishing task C with input: ${input}`);
resolve(input + 'C');
}, 1000); // 假设任务 C 需要 1 秒钟
});
};

const subject = new Subject();

subject
.pipe(
switchMap((value) =>
from(taskA(value)).pipe(
switchMap((resultA) => from(taskB(resultA))),
switchMap((resultB) => from(taskC(resultB))),
),
),
)
.subscribe((result) => {
console.log('Final result:', result);
});

// 发出一些值
subject.next('1'); // 发出第一个值
setTimeout(() => subject.next('2'), 500); // 在 0.5 秒后发出第二个值,中止第一个任务并开始新的任务
setTimeout(() => subject.next('3'), 2000); // 在 2 秒后发出第三个值

输出结果:

Starting task A with input: 1
Starting task A with input: 2
Finishing task A with input: 1
Finishing task A with input: 2
Starting task B with input: 2A
Starting task A with input: 3
Finishing task B with input: 2A
Finishing task A with input: 3
Starting task B with input: 3A
Finishing task B with input: 3A
Starting task C with input: 3AB
Finishing task C with input: 3AB
Final result: 3ABC

实际上用全套 RxJS API 更简单,无奈 API 忘了好多
Projection
159 天前
给输出结果加个时间线:

[1] Starting task A with input: 1
[514] Starting task A with input: 2
[1010] Finishing task A with input: 1
[1537] Finishing task A with input: 2
[1539] Starting task B with input: 2A
[2019] Starting task A with input: 3
[2542] Finishing task B with input: 2A
[3023] Finishing task A with input: 3
[3024] Starting task B with input: 3A
[4027] Finishing task B with input: 3A
[4028] Starting task C with input: 3AB
[5038] Finishing task C with input: 3AB
[5040] Final result: 3ABC
Projection
159 天前
@rabbbit 用生成器确实是一个好办法,在可被取消的时间点添加 yield 。暂停后只要不调用 next() 方法就不会继续执行,这样就需要自己写一个执行器,调度的时机可以自己灵活控制,像 co 那种执行器是自动执行的,async 函数也是自动执行的。
Projection
159 天前
我用生成器简单实现了一下,代码 https://pastebin.com/hi04KbPd

function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

const then = Date.now();

async function task(name, input) {
console.log(`[${Date.now() - then}] ${name}: start, input: ${input}`);
await delay(1_000);
console.log(`[${Date.now() - then}] ${name}: done, input: ${input}`);
return input + name;
}

let token = 0;
async function onUpdate(input) {
let current = ++token;
async function* gen() {
let result = await task('a', input);
yield;
result = await task('b', result);
yield;
result = await task('c', result);
console.log(`[${Date.now() - then}] final result: ${result}`);
return result;
}
for await (const _ of gen()) {
if (token !== current) break;
}
}

onUpdate('1');
await delay(500);
onUpdate('2');
await delay(2_000);
onUpdate('3');

输出结果:

[0] a: start, input: 1
[510] a: start, input: 2
[1006] a: done, input: 1
[1526] a: done, input: 2
[1527] b: start, input: 2a
[2516] a: start, input: 3
[2533] b: done, input: 2a
[3520] a: done, input: 3
[3520] b: start, input: 3a
[4528] b: done, input: 3a
[4530] c: start, input: 3ab
[5536] c: done, input: 3ab
[5537] final result: 3abc
paopjian
159 天前
这好像是个面试题,串行异步任务链的中止与继续,你看是这个么
function processTasks(...tasks) {
let isRunning = false; // 是否正在执行,初始化 false,不然第一次启动不了
const results = []; // 保存执行结果
let i = 0; // 当前任务索引
let prom = null;
return {
start() {
return new Promise(async (resolve, reject) => {
if (prom) {
// 结束了
prom.then(resolve, reject);
return;
}
if (isRunning) return;
isRunning = true;
while (i < tasks.length) {
try {
results.push(await tasks[i]());
} catch (err) {
isRunning = false;
reject(err);
prom = Promise.reject(err);
return;
}
i++;
if (!isRunning && i < tasks.length) return; //中断
}
// 全部执行完毕
isRunning = false;
resolve(results);
prom = Promise.resolve(results);
});
},
pause() {
console.log('暂停任务');
isRunning = false;
},
}
}
amlee
159 天前
https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal

有一个 AbortSignal ,似乎可以做到,看下上面那个链接里面的 implementing_an_abortable_api 这一节
iidear2015
158 天前
感觉你逻辑挺合理的,好像只能写个工具方法重构一下

```
// 一个 task 包含多个异步 job
// 每个 job 运行前需要校验当前 task 是否在运行
// 运行新 task 时取消之前的 task

const createScheduler = (jobs) => {
let runningTask = 0;

const run = async () => {
runningTask += 1;
const thisTask = runningTask;

for (let job of jobs) {
if (thisTask === runningTask) {
await job();
} else {
return;
}
}
};

return run;
};

const init = createScheduler([
() => sleep(500),
() => sleep(500),
() => sleep(500)
]);

```
jones2000
158 天前
做一个任务队列,输入一次,移除上一次剩余的没有完成的任务,再往队列里面加一组异步的任务, 异步的任务处理从任务队列里面取。
dvsilch
158 天前
@pursuer
@amlee
看了下,AbortSignal 大概也需要在没做支持的异步任务上套一层 Promise 或者每次异步结束前做状态检测,也不是很好

@rabbbit
@Projection
rxjs 之前也有看过,无奈确实是经验不够一眼瞎。目前看来对我而言修改起来最简单的方式,就是自己写一个 generator 然后来手动判定是否执行下一步了,至少把状态判断统一在了 for 循环里,少写不少东西

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1057003

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX