concurrent.futures 如何从外部停止进程池里的其中一个进程,再进行启用?

2019-11-24 23:53:19 +08:00
 qazwsxkevin

处理函数写得不太稳健,会吊死而导致
FutureRetList[i].running()一直认为是 True
粗暴做了个超时认定
看了 concurrent.futures 的一些方法,看不出有什么方法能在外部把进程池里某个进程推倒?

#是否进行多线程处理
MultiProcFlag == True
#定义进程数量
ProcessAmount = 12


    if MultiProcFlag == True:
        FutureList = []
        FutureRetList = []
        FutureStartTimeList = []
        FutureProcSuit = []

        for i in range(ProcessAmount):
            FutureList.append(futures.ProcessPoolExecutor(max_workers=1))

        for i in range(ProcessAmount):
            FutureRetList.append(futures.Future())

        for i in range(ProcessAmount):
            FutureProcSuit.append('0')

        for i in range(ProcessAmount):
            FutureStartTimeList.append(time.time())
        
        pass
 


            # 多进程处理
            if MultiProcFlag == True:
                insertPoolFlag = False # 在 while 循环中判断是否成功加入进程池
                while insertPoolFlag == False:
                    for i in range(ProcessAmount):
                        Process_i = str(i + 1)
                        if FutureRetList[i].running() == True:
                            #进行时间计算,如果某进程超时,关闭进程,重新提交任务
                            tmpTime = (time.clock() - FutureStartTimeList[i])
                            # 如果进程持续时间超过 15 分钟,认定任务失败,需要重新进行
                            if tmpTime > 900:
                                FutureList[i].shutdown() # <-恐怕不是这样乱来的?
                                #FutureList[i].xxxx? # <-- how?
                                time.sleep(5)
                                FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,FutureProcSuit[i],countt,ErrorLogFilePath)
                                print("进程池:[" + Process_i + "] [重新提交]了: [" + FutureProcIDs[i] + "]",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
                            else:
                                print("进程池:[" + Process_i + "] 已经运行了 [" + '[%.2f] 秒' % (time.clock() - FutureStartTimeList[i]))
                            time.sleep(1)
                            pass
                        else: # 见到有空闲的进程就提交任务
                            FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
                            FutureStartTimeList[i] = time.time()
                            FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止
                            print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
                            insertPoolFlag = True
                            break
                            pass
                        time.sleep(2)
            else: # 单进程处理
            	ProcessRet = SProcessFunc("1",TaskDict,Task[j],countt,ErrorLogFilePath)
786 次点击
所在节点    Python
8 条回复
ManjusakaL
2019-11-25 01:11:57 +08:00
直接记录 PID,然后 kill 不就完了。。。
qazwsxkevin
2019-11-25 08:43:05 +08:00
@ManjusakaL 可以对线程启动的内容纪录 PID ? 假如各个进程跑的都是是 Chrome.exe ,12 个进程跑满,内存会有 N 个 python.exe ,N 个 Chrome.exe ,怎么分辨谁是谁? 能零失误杀进程?
以前只试过用 psutil.pids()来获取 PID 杀进程。。。
ManjusakaL
2019-11-25 10:45:44 +08:00
@qazwsxkevin 如果怕误杀进程的话,可以在提交任务的时候,传入一个 unique 的参数,然后开个队列,将 unique 和 pid 回传,然后在 main process 做一个 map,直接对想杀的组合发送 SIGKILL 就行了
qazwsxkevin
2019-11-25 11:17:49 +08:00
@ManjusakaL 明白思路了,是一个办法。。。,暂时无法理解一个“unique 的参数”?是什么,得琢磨琢磨。。。^_^

看看有没有其它更好的办法?
Latin
2019-11-25 11:28:42 +08:00
executor = ProcessPoolExecutor(1)
executor.submit(xxx,xxx)
pid = list(executor._processes.keys())[0]

就是记录 然后调用 shell 命令 Kill 找了好久,没有优雅的解决方案
ManjusakaL
2019-11-25 11:35:07 +08:00
@qazwsxkevin 就是一个唯一的标识,可以理解是任务 ID,这样你可以将具体的任务和 PID 绑定
ClericPy
2019-11-25 14:35:34 +08:00
qazwsxkevin
2019-11-25 17:17:36 +08:00
做了一个简单测试,情况不妙,估计不好。。。

else: # 见到有空闲的进程就提交任务
FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
FutureStartTimeList[i] = time.time()
FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止
print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")


pid = (list(FutureList[i]._processes.keys()))[0]
print(pid)
time.sleep(20)
#进行 20 秒左右后杀进程
exeCstr = "taskkill -f -pid " + str(pid)
os.system(exeCstr)
time.sleep(10)
#再次提交
FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)




在杀进程后,直接就抛出异常了,再次提交也是不行的,直接报 1 码结束了主程序,整体结束。

outut:

Traceback (most recent call last):
File "D:/Work//SPFromDB.py", line 309, in <module>
FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
raise BrokenProcessPool('A child process terminated '
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

进程已结束,退出代码 1

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/622743

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX