求教 celery worker 开启多进程,模拟进程池,进程意外结束变成僵尸进程。

2019-10-28 23:46:11 +08:00
 hanssx

celery worker 开启多进程来模拟进程池,每个子进程里面执行 requests.get()。大体代码如下:

celery task 定义

@ce.task(name='xxx', bind=True)
def period_external_nascan(self, *args, **kwargs):
        wfd = WebFingerprintDiscern(kwargs)
        wfd.run()

类定义

import billiard as multiprocessing

class WebFingerprintDiscernProcess(multiprocessing.Process):
    def __init__(self, config, queue, lock):  # rules, basic_infos,
        multiprocessing.Process.__init__(self)
        self.config = config
        self.queue = queue
        self.lock = lock

    def run(self):
        while True:
            try:
                url = self.queue.get(block=False)
            except SoftTimeLimitExceeded as e:
                raise
            except queue.Empty as e:
                # logging.debug(e, exc_info=True)  # KLD 记录进程退出,多线程容易导致 logging 死锁
                break
            try:
                try:
                    wa = requests.get(url, xxx)
                    ...还有其他耗时操作,大概耗时 20 分钟。
                except SoftTimeLimitExceeded as e:
                    raise
                except Exception as e:
                    logging.error('update url(%s) state error, %s' % (url, e), exc_info=True)
            except SoftTimeLimitExceeded as e:
                raise
            except Exception as e:
                logging.error(e, exc_info=True)
            finally:
                self.queue.task_done()

class WebFingerprintDiscern:
    def __init__(self, config):
        self.config = config
        self.scan_list = json.loads(self.config['task_target'])
        self.queue = multiprocessing.JoinableQueue()
        self.lock = multiprocessing.Lock()
        self.process_count = 16  # 默认进程数

    def run(self):
        self.scan_list = sorted(self.scan_list)  # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
        for url in self.scan_list:
            self.queue.put(url)
        time.sleep(3)  # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
        self.scan_start()

    def scan_start(self):
        """
        进程池扫描启动
        :return:
        """
        for i in range(self.process_count):
            t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock) 
            t.daemon = True
            t.start()
        self.queue.join()

启动第一个 celery worker

celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker1.state -n worker1@%h

启动第二个 celery worker

celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker2.state -n worker2@%h

每个 worker 里面接收到的 scan_list 大概 254 个域名,每个域名在子进程中 requests.get()之后还需要执行一段耗时约 20 分钟的分析操作,所以创建的 16 个子进程不可能很快就结束,需要跑一段时间。

现在的问题是同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,就像下面这种,有可能是因为 https://docs.python.org/zh-cn/3/library/multiprocessing.html,于是我在上面的代码中用了 time.sleep(3),但是仍然没有解决该问题。

[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+   10503 10509 [celery] <defunct>
Z+   10503 10510 [celery] <defunct>

等待较长一段时间后,发现 worker2 的 16 个子进程均变为僵尸进程,如下图,而 worker2 的任务迟迟没有结束,这个地方一直想不明白,16 个子进程已经全部变为僵尸进程,应该主进程就会结束了,没结束说明队列中还有值?因为最后有 self.queue.join(),但是如果队列中有值,子进程不可能全部结束,因为子进程里面调用了 self.queue.get(),可能是取出来之后没走到 task_done ?或者取的过程出现了问题?这个地方一直没想明白咋回事。

[root@VM_9_196_centos asset]# ps -ef | grep python
root     10485 21930  0 17:27 pts/4    00:00:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root     10503 10485  0 17:27 pts/4    00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h


root     22192 28529  0 15:01 pts/6    00:00:44 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22238 22192  0 15:01 pts/6    00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22245 22238  7 15:01 pts/6    00:30:23 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22258 22238  7 15:01 pts/6    00:30:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h

[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+   10503 10509 [celery] <defunct>
Z+   10503 10510 [celery] <defunct>
Z+   10503 10511 [celery] <defunct>
Z+   10503 10512 [celery] <defunct>
Z+   10503 10513 [celery] <defunct>
Z+   10503 10514 [celery] <defunct>
Z+   10503 10515 [celery] <defunct>
Z+   10503 10516 [celery] <defunct>
Z+   10503 10517 [celery] <defunct>
Z+   10503 10518 [celery] <defunct>
Z+   10503 10519 [celery] <defunct>
Z+   10503 10520 [celery] <defunct>
Z+   10503 10521 [celery] <defunct>
Z+   10503 10522 [celery] <defunct>
Z+   10503 10523 [celery] <defunct>
Z+   10503 10524 [celery] <defunct>
Z+   22238 22243 [celery] <defunct>
Z+   22238 22244 [celery] <defunct>
Z+   22238 22246 [celery] <defunct>
Z+   22238 22247 [celery] <defunct>
Z+   22238 22248 [celery] <defunct>
Z+   22238 22249 [celery] <defunct>
Z+   22238 22250 [celery] <defunct>
Z+   22238 22251 [celery] <defunct>
Z+   22238 22252 [celery] <defunct>
Z+   22238 22253 [celery] <defunct>
Z+   22238 22254 [celery] <defunct>
Z+   22238 22255 [celery] <defunct>
Z+   22238 22256 [celery] <defunct>
Z+   22238 22257 [celery] <defunct>
[root@VM_9_196_centos asset]# 

之前是使用的多线程,但是遇到了这个问题 https://www.v2ex.com/t/597143 ,而且不想去掉 logging,就改成了多进程。

问题:

  1. 同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,原因及解决办法? 价值:30RMB
  2. 发现 worker2 的 16 个子进程均变为僵尸进程,worker2 为啥迟迟没有结束?原因及解决方案? 价值:30RMB

目前实验结果: 如果只运行一个 celery worker 好像是没有问题的,刚开始子进程不会意外结束,持续观察中,猜测最后执行完也能够正常结束?

奖金发放规则: 每个问题优先解答者获奖金,如果对了一半,另外一个人对了一半,合力帮助解决,一人一半的奖金。

联系方式: 扣扣 9614 六二 392

非常感谢你的阅读,让我困扰了好久。

5860 次点击
所在节点    Python
29 条回复
hhbcarl
2019-10-29 00:02:24 +08:00
没仔细分析你的问题的根因,但是有个建议是既然用了 Celery,就应该用好 Celery。它是能够自己管理并行任务的,没必要自己去创建和管理进程。

详见: http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups
ClericPy
2019-10-29 00:53:58 +08:00
太长了... 挣钱机会留给学生吧...

celery 自从在公司里见识过内存泄漏的小坑以后, 基本不敢碰了

不过提到僵尸进程, 感觉有点像我以前碰到过的情况, 先确认下是僵尸进程还是孤儿进程, 两者不太一样.

我最后是用 psutil 粗暴查杀的... 不过之前用的方法可以给你参考下:

1. close_fds 参数
2. kill 子进程的时候一定要 wait, wait 超时(Python3 才有超时... Python2 自己用 timer 做)再去强杀
3. 考虑子进程里带上个 timer kill self 吧, 这个方法最蠢又最简单... 俗称蠢强蠢强的...
lolizeppelin
2019-10-29 09:03:33 +08:00
都是 linux 编程基础,和 python 关系不大
老老实实去熟悉 fork exec wait 等 linux 编程基础吧,这些基础支持不懂,无论你用什么多进程库都会遇到类似问题。

顺便...对于不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing
ytymf
2019-10-29 09:45:39 +08:00
multiprocessing 一定要确保父进程是单线程的,如果不能确保,可以尝试一下启动方式为 spawn 或者 forkserver 启动进程。
hanssx
2019-10-29 10:39:16 +08:00
@hhbcarl 谢谢师父,你说的这个 canvas 之前有了解,但是现在项目代码不太容易改,周末尝试一下。
----------------------------------------------------------------------------------------------------------------------------------------
@ClericPy 是僵尸进程,Z 代表 Zombie,僵尸进程产生的原因是因为子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,产生原因我倒是清楚,只是不明白为什么一开始就有子进程结束,因为队列里面的东西很多,不可能一开始子进程就结束的。
----------------------------------------------------------------------------------------------------------------------------------------
@ytymf “multiprocessing 一定要确保父进程是单线程的”,这个是啥意思呀,是确保父进程是单进程吧? celery worker 里面看确实是单进程。
ytymf
2019-10-29 13:18:43 +08:00
@hanssx 不,就是确保父进程是单线程的,也就是说你要 fork 的那个进程里不能有多个线程。可以了解一下 fork safe 这个概念,fork 出来的子进程并不会继承父进程的所有线程,会造成一些问题。
为了解决这个问题,multiprocessing 给出了 spwan 跟 forkserver 两个启动子进程的方式,这里摘抄一点官方文档:

Depending on the platform, multiprocessing supports three ways to start a process. These start methods are

spawn
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

Available on Unix and Windows. The default on Windows and macOS.

fork
The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Available on Unix only. The default on Unix.

forkserver
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.
ytymf
2019-10-29 13:27:02 +08:00
@hanssx 看 fork 的介绍,Note that safely forking a multithreaded process is problematic. 我的经验是,后果就是 fork 出的子进程会成为僵尸进程
hanssx
2019-10-29 13:36:44 +08:00
@ytymf 多谢指教,学到了。我这个主进程是 celery worker 产生的进程,我代码中并没有用线程,更没有用多线程。僵尸进程产生的原因应该子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,也就是没处理子进程发来的 SIGCHILD KILL 那个信号。
ytymf
2019-10-29 13:59:56 +08:00
@hanssx 很难说 celery worker 本身的实现里面没有线程,而且大概率是有线程的。你可以简单试试改下进程启动方法为 spawn,如果问题解决那就是这个问题。
hanssx
2019-10-29 16:35:57 +08:00
@ytymf 多谢师父,晚上我改一下试试。
lolizeppelin
2019-10-30 11:11:53 +08:00
@ytymf
fork 又不是 python 自己的玩意,都说了好好看 linux 编程基础,光看 python 文档搞得清楚个鬼

什么鬼 fork 出来的就变僵尸进程......
lolizeppelin
2019-10-30 11:16:06 +08:00
都说了, 不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing

好好把 linux 相关的父子进程,信号处理学学也就半天时间
回头你再看 multiprocessing 自然知道到底是什么问题,原理不了解和你说也会变更多基础问题疑问

好好学习下 linux 这部分基础知识真不需要那多时间的,论坛反而是浪费时间。
hanssx
2019-10-30 11:18:49 +08:00
@ytymf 试了一下,celery worker 直接启动不起来,考虑用 canvas 重构一下,还是感谢师父,学到了。
import billiard as multiprocessing
multiprocessing.set_start_method('spawn')
hanssx
2019-10-30 11:20:26 +08:00
@lolizeppelin 那个是这样的,本身使用 multiprocessing 没啥问题,在 celery worker 里面使用就会有问题,celery 也给了一个 billiard,算是 multiprocessing 的 patch。
lolizeppelin
2019-10-30 11:39:56 +08:00
这么说吧, multiprocessing 没有问题,你知道原因根本不用 patch

spwan 也就是是 fork exec 了一个新 python 进程专门执行部分代码避免当前进程的污染
但是这种绕圈的方式是不合理的和混乱的.

当然你觉得能用就行也无所谓.
hanssx
2019-10-30 11:56:27 +08:00
@lolizeppelin 额,spawn 模式启动不起来 celery worker,师父你知道哪儿的问题吗,能说一下吗。
你说的基础知识,我理解可能是信号处理,你是说子进程结束的时候发送 SIGCHILD KILL 之类的信号,如果父进程不处理就会使子进程变为僵尸进程是吧,这个我是知道的,我也避免了,加上了子进程.join()
lolizeppelin
2019-10-30 12:31:34 +08:00
好好学习下 linux 这部分基础知识真不需要那多时间的

先看信号
https://www.ibm.com/developerworks/cn/linux/l-ipc/part2/index1.html

再看僵尸进程
https://monkeysayhi.github.io/2018/12/05/%E6%B5%85%E8%B0%88Linux%E5%83%B5%E5%B0%B8%E8%BF%9B%E7%A8%8B%E4%B8%8E%E5%AD%A4%E5%84%BF%E8%BF%9B%E7%A8%8B/


最好把 ibm 文档理进程间通信都好好看一便,认真看多看几遍,看明白了,想明白了,再回头看你刚才帖子说“我知道的”部分是哪里错了,好好了解下父子进程共享了什么东西会导致什么问题

基础打牢了很多问题就解决了.真的不难,没搞清楚只能瞎问

后面可以 subprocess 的源码熟悉进程间通信和 fd 关闭在 python 的写法和处理方式
然后把 https://github.com/openstack/oslo.service/blob/master/oslo_service/service.py 的代码读透
然后有需要可以读下 multiprocessing 的源码,不需要通读你有上面的知识大致接到 multiprocessing 如何工作的即可

之后你不需要论坛理问什么 celery 多进程的问题了,搞懂原理了才能真正解决问题,因为这些问题都不是 python 的问题都是系统原理性问题
ytymf
2019-10-30 13:21:45 +08:00
@lolizeppelin 只是给楼主一个快速尝试看能不能解决问题的方法,确实是不推荐直接 spawn 的。你说的没错,是要看好基础,可是也得解决具体的问题才行。
ytymf
2019-10-30 13:31:21 +08:00
@hanssx billiard 我没用过,可能跟原生的 multiprocessing 有区别了,最终可以尝试下 forkserver。其实楼上说的有道理,你的用法有点点奇特了,即使用 spwan 或者 forkserver 能够启动,也不是最优雅的办法,还是重构成正经的方法比较好
lolizeppelin
2019-10-30 13:47:37 +08:00
你的快速方法明显是不对的

学习的时候遇到深度问题如果超过能力或者没有必要的确需要跳过,但是非常基础的问题习惯性跳过是不行的

多进程问题是非常基础也常用的知识,只要你用到多进程就会遇到相关问题,这次绕圈解决了下次一样有问题解决不了
老老实实把坑填了用的时间比这次绕圈解决长一点,但是你淌过去了以后就解决起其他多进程问题就有底了

python 残疾的多线程是必须多进程的,不搞懂以后坑的是自己

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/613838

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX