celery worker 开启多进程来模拟进程池,每个子进程里面执行 requests.get()。大体代码如下:
celery task 定义
@ce.task(name='xxx', bind=True)
def period_external_nascan(self, *args, **kwargs):
wfd = WebFingerprintDiscern(kwargs)
wfd.run()
类定义
import billiard as multiprocessing
class WebFingerprintDiscernProcess(multiprocessing.Process):
def __init__(self, config, queue, lock): # rules, basic_infos,
multiprocessing.Process.__init__(self)
self.config = config
self.queue = queue
self.lock = lock
def run(self):
while True:
try:
url = self.queue.get(block=False)
except SoftTimeLimitExceeded as e:
raise
except queue.Empty as e:
# logging.debug(e, exc_info=True) # KLD 记录进程退出,多线程容易导致 logging 死锁
break
try:
try:
wa = requests.get(url, xxx)
...还有其他耗时操作,大概耗时 20 分钟。
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error('update url(%s) state error, %s' % (url, e), exc_info=True)
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error(e, exc_info=True)
finally:
self.queue.task_done()
class WebFingerprintDiscern:
def __init__(self, config):
self.config = config
self.scan_list = json.loads(self.config['task_target'])
self.queue = multiprocessing.JoinableQueue()
self.lock = multiprocessing.Lock()
self.process_count = 16 # 默认进程数
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
for url in self.scan_list:
self.queue.put(url)
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
def scan_start(self):
"""
进程池扫描启动
:return:
"""
for i in range(self.process_count):
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
self.queue.join()
启动第一个 celery worker
celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker1.state -n worker1@%h
启动第二个 celery worker
celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker2.state -n worker2@%h
每个 worker 里面接收到的 scan_list 大概 254 个域名,每个域名在子进程中 requests.get()之后还需要执行一段耗时约 20 分钟的分析操作,所以创建的 16 个子进程不可能很快就结束,需要跑一段时间。
现在的问题是同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,就像下面这种,有可能是因为 https://docs.python.org/zh-cn/3/library/multiprocessing.html,于是我在上面的代码中用了 time.sleep(3),但是仍然没有解决该问题。
[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+ 10503 10509 [celery] <defunct>
Z+ 10503 10510 [celery] <defunct>
等待较长一段时间后,发现 worker2 的 16 个子进程均变为僵尸进程,如下图,而 worker2 的任务迟迟没有结束,这个地方一直想不明白,16 个子进程已经全部变为僵尸进程,应该主进程就会结束了,没结束说明队列中还有值?因为最后有 self.queue.join(),但是如果队列中有值,子进程不可能全部结束,因为子进程里面调用了 self.queue.get(),可能是取出来之后没走到 task_done ?或者取的过程出现了问题?这个地方一直没想明白咋回事。
[root@VM_9_196_centos asset]# ps -ef | grep python
root 10485 21930 0 17:27 pts/4 00:00:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root 10503 10485 0 17:27 pts/4 00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root 22192 28529 0 15:01 pts/6 00:00:44 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22238 22192 0 15:01 pts/6 00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22245 22238 7 15:01 pts/6 00:30:23 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22258 22238 7 15:01 pts/6 00:30:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+ 10503 10509 [celery] <defunct>
Z+ 10503 10510 [celery] <defunct>
Z+ 10503 10511 [celery] <defunct>
Z+ 10503 10512 [celery] <defunct>
Z+ 10503 10513 [celery] <defunct>
Z+ 10503 10514 [celery] <defunct>
Z+ 10503 10515 [celery] <defunct>
Z+ 10503 10516 [celery] <defunct>
Z+ 10503 10517 [celery] <defunct>
Z+ 10503 10518 [celery] <defunct>
Z+ 10503 10519 [celery] <defunct>
Z+ 10503 10520 [celery] <defunct>
Z+ 10503 10521 [celery] <defunct>
Z+ 10503 10522 [celery] <defunct>
Z+ 10503 10523 [celery] <defunct>
Z+ 10503 10524 [celery] <defunct>
Z+ 22238 22243 [celery] <defunct>
Z+ 22238 22244 [celery] <defunct>
Z+ 22238 22246 [celery] <defunct>
Z+ 22238 22247 [celery] <defunct>
Z+ 22238 22248 [celery] <defunct>
Z+ 22238 22249 [celery] <defunct>
Z+ 22238 22250 [celery] <defunct>
Z+ 22238 22251 [celery] <defunct>
Z+ 22238 22252 [celery] <defunct>
Z+ 22238 22253 [celery] <defunct>
Z+ 22238 22254 [celery] <defunct>
Z+ 22238 22255 [celery] <defunct>
Z+ 22238 22256 [celery] <defunct>
Z+ 22238 22257 [celery] <defunct>
[root@VM_9_196_centos asset]#
之前是使用的多线程,但是遇到了这个问题 https://www.v2ex.com/t/597143 ,而且不想去掉 logging,就改成了多进程。
问题:
目前实验结果: 如果只运行一个 celery worker 好像是没有问题的,刚开始子进程不会意外结束,持续观察中,猜测最后执行完也能够正常结束?
奖金发放规则: 每个问题优先解答者获奖金,如果对了一半,另外一个人对了一半,合力帮助解决,一人一半的奖金。
联系方式: 扣扣 9614 六二 392
非常感谢你的阅读,让我困扰了好久。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.