multiprocessing 的进程池,能否做到监控一些状态?

2019-10-24 00:17:21 +08:00
 qazwsxkevin
from multiprocessing import Pool

def doSomething(caseNumber):
    # do xxx use caseNumber.
    return Value

def SomeFunc(poolpool):
    #do xxx
    if xxx > 10:
        return True
    else:
        return False
    pass

if __name__ == '__main__':
    #建池
    Apool = Pool(10)

    # 向 Apool 加异步
    Apool.apply_async(doSomething, someArgs)
    Apool.apply_async(Afunc, someArgs)
    Apool.apply_async(Bfunc, someArgs)
    Apool.apply_async(Cfunc, someArgs)
    # Cret = Apool.apply_async(Cfunc, someArgs)
    #省略...

    #测试用,已注释,留着
    # Bret = Apool.apply_async(Bfunc, someArgs)
    
    # while True:
    for i in SomeSuit:
        #1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
        #2、在这里能否做到判断异步的 Bfunc 执行完毕没?
        #3、假设 2 的想法可以做到,并给出判断结果(True or False),
        #   在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?
        pass

    print("结束 for SomeSuit.")

    Apool.close()
    Apool.join()

1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
2、在这里能否做到判断异步的 Bfunc 执行完毕没?
3、假设 2 的想法可以做到,并给出判断结果(True or False),
在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?

3161 次点击
所在节点    Python
4 条回复
ClericPy
2019-10-24 00:41:25 +08:00
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
'''
Asynchronous version of `apply()` method.
'''
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result


1 2 3 基本都有办法的, 在一切皆对象的 Python 里, 几乎所有玩意都能自省
看看源码去吧, 一点点说太麻烦了
比如 apply_async 方法的返回值就是 ApplyResult 对象, ApplyResult 对象里可以判断是否完成以及立刻取得结果
Apool 的 self._pool = [] 这里也可以看有多少
多看源码吧


友情提醒, 你这个用法已经过时了, 现在多进程多线程的池都建议使用 concurrent.futures 里面那俩, 借助很多语言都在流行的 Future 概念, 可以在同步代码里面把异步操作简化. 尤其是借助 callback 方式(虽然你上面的代码也可以用回调)也算不难理解
ClericPy
2019-10-24 00:43:02 +08:00
然后还有 close 和 join 忘了说
前者的意思是进程池已经关闭, 如果再添加新任务, 会直接抛错, 而不是真正关闭了所有进程
后者意思是, 主线程 /主进程 整个阻塞住, 直到进程池里的任务全都完成

你想直接拿那个结果, 别 join, 直接对那个提交后得到的对象使用 get 方法
qazwsxkevin
2019-10-26 17:25:50 +08:00
@ClericPy,有不明白的地方,concurrent.futures,比如:

```
eStatusSuit = []
e = futures.ProcessPoolExecutor(max_workers=5)
eStatus = e.submit(ProcessCaseID,someVarA ,someVarB)
eStatusSuit.append(eStatus)
#
eStatus = e.submit(ProcessCaseID,someVarC ,someVarD)
eStatusSuit.append(eStatus)
#
eStatus = e.submit(ProcessCaseID,someVarE ,someVarF)
eStatusSuit.append(eStatus)

#此时是向 e 提交了 3 个任务
#eStatus 对象,我看了一下,似乎是无法查看到 33 个任务具体状态,只能等待 eStatus 全体执行完毕,全部返回 eStatus.result()?
#eStatus.result()是个阻塞式,想不到怎么用。。。

#我是想建立能跑 5 个进程的可控队列,不知道这么干是否合适,还是有更方便的方式?

aExecutor = futures.ProcessPoolExecutor(max_workers=1)
bExecutor = futures.ProcessPoolExecutor(max_workers=1)
cExecutor = futures.ProcessPoolExecutor(max_workers=1)
dExecutor = futures.ProcessPoolExecutor(max_workers=1)
eExecutor = futures.ProcessPoolExecutor(max_workers=1)

然后做个
aExecutorStatus = aExecutor.submit(ProcessCaseID,someVarA ,someVarB)
bExecutorStatus = bExecutor.submit(ProcessCaseID,someVarC ,someVarD)
#省略...

#对各个 ExecutorStatus 的 running(),done()进行循环判断,哪个 False/True 了,就从 queue 里取任务提交过去,哪个失败了,再调度一下优先权
if aExecutorStatus.running():
xxx
#省略...

不知道是不是这样乱来的?
```
ClericPy
2019-10-26 18:26:31 +08:00
ProcessPoolExecutor 可以看做一个进程池执行器, 朝里面提交函数和参数以后, 会返回一个 Future, 这时候任务就开始执行了, 所以常见的用法就是:
1. 新建一个进程池执行器, 设置好并发数 pool = ProcessPoolExecutor(5)
2. futures = [pool.submit(func, var[0], var[1]) for var in var_list]
这时候任务都在后台派出的线程执行中
3. 然后就该等待任务完成了, 如果想要按执行结束的顺序来处理, 就
from concurrent.futures import as_completed
for future in as_completed(futures):
result = future.result(timeout=None)
如果无所谓完成顺序, 但是在意任务匹配顺序, 就
for future in futures:
result = future.result(timeout=None)
这里 timeout 可以配置成一个 float, 然后 try catch 住 timeouterror, 不过不确定多进程会不会杀死超时任务, 因为平时我大都用线程, 线程是肯定杀不死的...

如上, 并发的好处就体现出来了, 也就是说, 在没达到并发限制的情况下, 整个任务理论上完成耗时不会超过最慢任务的耗时, 虽然实际上会受并发限制和 CPU 数量影响
@qazwsxkevin

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/612299

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX