求教 celery worker 开启多进程,模拟进程池,进程意外结束变成僵尸进程。

2019-10-28 23:46:11 +08:00
 hanssx

celery worker 开启多进程来模拟进程池,每个子进程里面执行 requests.get()。大体代码如下:

celery task 定义

@ce.task(name='xxx', bind=True)
def period_external_nascan(self, *args, **kwargs):
        wfd = WebFingerprintDiscern(kwargs)
        wfd.run()

类定义

import billiard as multiprocessing

class WebFingerprintDiscernProcess(multiprocessing.Process):
    def __init__(self, config, queue, lock):  # rules, basic_infos,
        multiprocessing.Process.__init__(self)
        self.config = config
        self.queue = queue
        self.lock = lock

    def run(self):
        while True:
            try:
                url = self.queue.get(block=False)
            except SoftTimeLimitExceeded as e:
                raise
            except queue.Empty as e:
                # logging.debug(e, exc_info=True)  # KLD 记录进程退出,多线程容易导致 logging 死锁
                break
            try:
                try:
                    wa = requests.get(url, xxx)
                    ...还有其他耗时操作,大概耗时 20 分钟。
                except SoftTimeLimitExceeded as e:
                    raise
                except Exception as e:
                    logging.error('update url(%s) state error, %s' % (url, e), exc_info=True)
            except SoftTimeLimitExceeded as e:
                raise
            except Exception as e:
                logging.error(e, exc_info=True)
            finally:
                self.queue.task_done()

class WebFingerprintDiscern:
    def __init__(self, config):
        self.config = config
        self.scan_list = json.loads(self.config['task_target'])
        self.queue = multiprocessing.JoinableQueue()
        self.lock = multiprocessing.Lock()
        self.process_count = 16  # 默认进程数

    def run(self):
        self.scan_list = sorted(self.scan_list)  # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
        for url in self.scan_list:
            self.queue.put(url)
        time.sleep(3)  # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
        self.scan_start()

    def scan_start(self):
        """
        进程池扫描启动
        :return:
        """
        for i in range(self.process_count):
            t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock) 
            t.daemon = True
            t.start()
        self.queue.join()

启动第一个 celery worker

celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker1.state -n worker1@%h

启动第二个 celery worker

celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker2.state -n worker2@%h

每个 worker 里面接收到的 scan_list 大概 254 个域名,每个域名在子进程中 requests.get()之后还需要执行一段耗时约 20 分钟的分析操作,所以创建的 16 个子进程不可能很快就结束,需要跑一段时间。

现在的问题是同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,就像下面这种,有可能是因为 https://docs.python.org/zh-cn/3/library/multiprocessing.html,于是我在上面的代码中用了 time.sleep(3),但是仍然没有解决该问题。

[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+   10503 10509 [celery] <defunct>
Z+   10503 10510 [celery] <defunct>

等待较长一段时间后,发现 worker2 的 16 个子进程均变为僵尸进程,如下图,而 worker2 的任务迟迟没有结束,这个地方一直想不明白,16 个子进程已经全部变为僵尸进程,应该主进程就会结束了,没结束说明队列中还有值?因为最后有 self.queue.join(),但是如果队列中有值,子进程不可能全部结束,因为子进程里面调用了 self.queue.get(),可能是取出来之后没走到 task_done ?或者取的过程出现了问题?这个地方一直没想明白咋回事。

[root@VM_9_196_centos asset]# ps -ef | grep python
root     10485 21930  0 17:27 pts/4    00:00:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root     10503 10485  0 17:27 pts/4    00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h


root     22192 28529  0 15:01 pts/6    00:00:44 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22238 22192  0 15:01 pts/6    00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22245 22238  7 15:01 pts/6    00:30:23 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root     22258 22238  7 15:01 pts/6    00:30:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h

[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+   10503 10509 [celery] <defunct>
Z+   10503 10510 [celery] <defunct>
Z+   10503 10511 [celery] <defunct>
Z+   10503 10512 [celery] <defunct>
Z+   10503 10513 [celery] <defunct>
Z+   10503 10514 [celery] <defunct>
Z+   10503 10515 [celery] <defunct>
Z+   10503 10516 [celery] <defunct>
Z+   10503 10517 [celery] <defunct>
Z+   10503 10518 [celery] <defunct>
Z+   10503 10519 [celery] <defunct>
Z+   10503 10520 [celery] <defunct>
Z+   10503 10521 [celery] <defunct>
Z+   10503 10522 [celery] <defunct>
Z+   10503 10523 [celery] <defunct>
Z+   10503 10524 [celery] <defunct>
Z+   22238 22243 [celery] <defunct>
Z+   22238 22244 [celery] <defunct>
Z+   22238 22246 [celery] <defunct>
Z+   22238 22247 [celery] <defunct>
Z+   22238 22248 [celery] <defunct>
Z+   22238 22249 [celery] <defunct>
Z+   22238 22250 [celery] <defunct>
Z+   22238 22251 [celery] <defunct>
Z+   22238 22252 [celery] <defunct>
Z+   22238 22253 [celery] <defunct>
Z+   22238 22254 [celery] <defunct>
Z+   22238 22255 [celery] <defunct>
Z+   22238 22256 [celery] <defunct>
Z+   22238 22257 [celery] <defunct>
[root@VM_9_196_centos asset]# 

之前是使用的多线程,但是遇到了这个问题 https://www.v2ex.com/t/597143 ,而且不想去掉 logging,就改成了多进程。

问题:

  1. 同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,原因及解决办法? 价值:30RMB
  2. 发现 worker2 的 16 个子进程均变为僵尸进程,worker2 为啥迟迟没有结束?原因及解决方案? 价值:30RMB

目前实验结果: 如果只运行一个 celery worker 好像是没有问题的,刚开始子进程不会意外结束,持续观察中,猜测最后执行完也能够正常结束?

奖金发放规则: 每个问题优先解答者获奖金,如果对了一半,另外一个人对了一半,合力帮助解决,一人一半的奖金。

联系方式: 扣扣 9614 六二 392

非常感谢你的阅读,让我困扰了好久。

5870 次点击
所在节点    Python
29 条回复
ytymf
2019-10-30 13:55:27 +08:00
@lolizeppelin 楼主给的题目是有限定的,是个命题作文,那就是如何在 celery worker 下启动子进程,这个问题要怎么解决?即使通晓了你给的所有知识,遇到这个具体的问题不还是得看 celery 具体的实现,找出冲突的共享资源?如果父进程是楼主自己写的,我一定会推荐他搞明白。可面对的是一个庞杂的第三方库,又是具体的命题作文,那要怎么回答呢?
hanssx
2019-10-30 14:11:25 +08:00
@ytymf 确实是的,这个父进程是 celery worker 的,如果不用 celery,这套东西是没问题的,我修改了如下代码:
```
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
for url in self.scan_list:
self.queue.put(url)
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
```
改为
```
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
scan_list_len = len(self.scan_list)
# self.total_domain_cnt = len(self.scan_list)
for url in self.scan_list:
self.queue.put(url)
while self.queue.qsize() != scan_list_len:
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
```

```
def scan_start(self):
"""
进程池扫描启动
:return:
"""
for i in range(self.process_count):
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
self.queue.join()
```
改为
```
def scan_start(self):
"""
进程池扫描启动
:return:
"""
process_list = []
for i in range(self.process_count):
# As far as possible one should try to avoid shifting large amounts of data between processes.
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
process_list.append(t)
for p in process_list:
p.join()
self.queue.join()
```
不过我的用法确实太奇怪,celery 官方是不推荐在里面使用多进程的,billiard 看 issue 里面好像也要换的意思。
hanssx
2019-10-30 14:13:05 +08:00
这回复不支持 markdown 也有点难受。
lolizeppelin
2019-10-30 14:17:34 +08:00
无论什么扩展,最后都是要 fork

搞清楚多进程知识以后, 是不是 celery 都没关系,知道什么时候退出,什么时候回收就是

哪有那么麻烦, 只要你处理好异常,os._exit 的时候我管你是 celery 还是 flask 还是 dj
ytymf
2019-10-30 14:32:03 +08:00
@lolizeppelin 现实中是麻烦的,如果父进程不是 fork safe 的,就是很难处理,不是任何父进程都可以很安全地被 fork 的(与 python 无关),参考
https://stackoverflow.com/questions/6078712/is-it-safe-to-fork-from-within-a-thread

不知道是不是这个原因,python3.8 中 macos 下的 multiprocessing 的默认启动方式也被改成了 spwan,之前是 fork。
lolizeppelin
2019-10-30 14:52:51 +08:00
没有区别, posix_spawn 就是 fork+exec,以前没有 posix_spawn 系统调用而已

搞清楚了原理,自然知道如何避免 celery 有可能带来的负面影响

multiprocessing 本来就是就是单次脚本中让你快速多进程跑代码的库,自己里面生线程还有管道 /socket 通信,里面不说多复杂但进程肯定干净,混合到复杂代码里基础不牢问题都不知道出哪,本来就不适合用到服务中。
特别是 multiprocessing 进程池中可复用的进程除非你能确定进程不被污染,否则跑起来就是自己找坑。

所以我早说了, 对于不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing。
lolizeppelin
2019-10-30 14:53:18 +08:00
是进程肯定不干净
ytymf
2019-10-30 17:19:48 +08:00
@lolizeppelin “搞清楚了原理,自然知道如何避免 celery 有可能带来的负面影响”, 这一点具体要怎么做呢,读 celery 的源代码么
hanssx
2019-11-01 18:47:05 +08:00
我之前写的用 join 的方法不对,join 方法虽然也有 wait 的功能,但是如果单个进程 join 还好,多个进程的话可能在 join 之前进程就已经结束变为僵尸进程,现在的方法是处理 SIGCHILD 信号,这样保证不会有僵尸进程了,至于还会不会有其他问题,下下周再跑程序时反馈,感谢各位的帮助。如果有更好的办法,请告知,红包以表谢意。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/613838

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX