请教 concurrent.Future()线程池重启的问题

2020-07-08 10:07:32 +08:00
 uti6770werty

v2 的跟帖似乎无法用 markdown 语法? 所以就重新开个帖子请教这个问题。。。

# coding=utf-8
import codecs
import difflib
import os.path
import re
import time
import string
import chardet
import shutil
import copy
from concurrent.futures import ProcessPoolExecutor

import datetime

def PrintCount(PName):
    if PName == '甲':
        DelayTime = 0.9
    if PName == '乙':
        DelayTime = 1.2
    if PName == '丙':
        DelayTime = 1.7
    if PName == '丁':
        DelayTime = 2
    if PName == '戊':
        DelayTime = 2.5

    countt = 0
    while True:
        countt += 1
        time.sleep(DelayTime)
        print(f'{PName}池:->',f'{countt}')


if __name__ == '__main__':

    StartTime = time.clock()
    FutureDict = {}
    FutureRetDict = {}
    FutureTimeRecoderDict = {}

    PoolNameList = ["甲", "乙", "丙", "丁", "戊"]

    # 初始化进程池
    for i in range(len(PoolNameList)):
        # 进程
        FutureDict.update({PoolNameList[i]: ProcessPoolExecutor(max_workers=1)})
        # 进程 Ret
        FutureRetDict.update({PoolNameList[i]: futures.Future()})
        # 进程启动时间
        FutureTimeRecoderDict.update({PoolNameList[i]: None})

    # 模拟测试清空进程池的信号
    closeflag = True

    # 开始工作
    while True:
        ProcessNum = 0

        # 增加任务
        for ProcessName,FutureRet in FutureRetDict.items():

            # 模拟 40 秒后终结 [乙] 进程池
            if closeflag == True:
                if time.clock() - StartTime >= 40: # 在启动 40 秒后触发
                    print(f"开始强制结束 [乙] 进程池")
                    for pid, process in FutureDict['乙']._processes.items():
                        process.terminate()
                        FutureDict['乙'].shutdown()
                    closeflag = False
                    time.sleep(15)

            # 如果进程池在运行
            if FutureRet.running() == True:
                pass
            else:
                # 增加任务
                FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
                print(f'{ProcessName} 进程池提交了开始.')
                time.sleep(2)
                break
        time.sleep(3)

40 秒之后,乙进程池的确被停了,但是再向乙进程池提交任务的时候,会提示:

甲池:-> 57
甲池:-> 58
丙池:-> 25
戊池:-> 13
甲池:-> 59
丁池:-> 19
甲池:-> 60
丙池:-> 26
Traceback (most recent call last):
File "D:/TestForMu.py", line 80, in <module>
FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
raise BrokenProcessPool('A child process terminated '
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
甲池:-> 61
戊池:-> 14
丁池:-> 20
丙池:-> 27

应该要做些什么事情,再次启用?
原谅我这样设计流程,也许不是科学的。。。^_^ 谢谢大家解答!

2490 次点击
所在节点    Python
6 条回复
Latin
2020-07-08 11:05:08 +08:00
标题不严谨
已经停止少了一个,肯定有一个提交不了
或者重新实例化一个进程池提交一个乙进程
linw1995
2020-07-08 11:05:11 +08:00
pool 就是为了复用多个进程,创建那么多个 pool 干嘛……只要一个就好了
uti6770werty
2020-07-08 12:48:13 +08:00
@Latin 哦,原来是这样的,请教一个问题,被.shutdown()后的进程池,一直存在吗,系统会自动回收资源吗?

@linw1995 流程的设计,max_workers=1,想法上来讲,一池一个比较好控制,有其中一个原因是,执行函数有些不稳定,一个进程崩溃了,如 @Latin 大大所说,推倒,再实例一个就应该 OK 了,如果一个池里有 N 个,很难收敛,也不好统计和校验殃及了多少个进程需要重来。。。
uti6770werty
2020-07-08 12:51:38 +08:00
@linw1995 还有一个就是,Pool 实例自带了 Pool 中止的方法,如果一个 Pool 有 N 个进程,就放飞了。。。
uti6770werty
2020-07-08 19:31:59 +08:00
报告一下,下午做了个测试,在 process.terminate()后,对 Pool.shutdown(),系统看到进程池开的进程消失了,重新开实例继续也没问题,系统资源应该是被回收了。
oahebky
2020-07-16 09:31:51 +08:00
看了这个帖子楼主的逻辑。

楼主自己已经有答案了。对于怎么解决问题我就不多说了。

======

说一个我个人看这个贴子的程序设计的看法。
我觉得用 concurrent.futures.ProcessPoolExecutor 来达到楼主的目的,有种拿着锤子看什么问题都是钉子的感觉。

其实担心子进程异常死掉(比如 hang 死)之类的,不如直接使用 multiprocessing.Process 来创建子进程执行 work 。


既然是 concurrent.futures 包里面的类,其实已经说明了,这个类是为了 future 思想设计的。
说得简单点就是即提供高度封装的多进程 API,更重要的是为了兼容异步编程,和其它异步库同用。


楼主的程序设计其实重点不在异步思想上面( future ),所以楼主的这种并发编程其实可以考虑其它库(非 concurrent.futures ),因为在考虑到对子进程的“精细控制”上,concurrent.futures.ProcessPoolExecutor 提供的 API 实质上并不是很洽合这种场景。

当然,这是我个人的一点看法,仅供参考。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/688123

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX