celery 任务代码大致如下,此任务未加 soft_time_limit 或 time_limit 其中 Session 是根据以下四行代码得来:
SQLALCHEMY_DATABASE_URI = 'mysql://xx'
some_engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=False, pool_pre_ping=True)
session_factory = sessionmaker(autocommit=False, bind=some_engine) # autoflush=False,
Session = scoped_session(session_factory)
celery 任务大致代码:
'''
class NmapThread_(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
try:
ip = self.queue.get(block=False)
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error(e, exc_info=True) # 记录线程退出
break
try:
ns = nmap.scan(ip) # 使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库
ns.run()
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error(e, exc_info=True)
finally:
Session.remove()
self.queue.task_done()
class IpScan:
def __init__(self):
self.queue = Queue()
self.thread_count = 2
def run(self):
# 此处入队列的 IP 个数非常多,大概需要多线程运行 1 周
self.queue.put('1.1.1.1')
self.queue.put('2.2.2.2')
self.queue.put('3.3.3.3')
self.queue.put('4.4.4.4')
self.queue.put('5.5.5.5')
self.queue.put('6.6.6.6')
self.scan_start()
def scan_start(self):
for i in range(self.thread_count):
t = NmapThread_(self.queue)
t.setDaemon(True)
t.start()
self.queue.join()
def web_fingerprint_discern(*args, **kwargs):
print('web_fingerprint_discern begin!')
print(args)
print(kwargs)
@ce.task(name='default.test4queue', bind=True)
def test4queue(self):
ips = IpScan()
ips.run()
web_fingerprint_discern()
'''
现在的问题是多线程运行期间一直没有问题,直到最后多线程执行完(2019-08-31),抛出异常 break 跳出各线程之后,celry worker 就卡住假死了(2019-08-31),具体表现在:
然后我今天(2019-09-02)上班的时候强制 ctrl+c 之后,输出了一些日志,
大家可以看到, web_fingerprint_discern()的 3 条 print 语句,第 1 条发生 2019-08-31,后 2 条发生在 2019-09-02 我 ctrl+c 的时候,其他附加表现:
一直不能解决这个 celery 问题,由于时间原因也不方便换其他类 celery 架构,而且调试发现如果不是扫描任务就不会假死(当然测试时间肯定没有一周那么长,只有几分钟,所以如果完整模拟测试非常耗时,想寻找可能出现的问题点修复后再行测试)
求各位大佬帮忙解决,若能解决,50 红包奉上以表谢意。
1
hanssx OP celery 版本
``` (asset) [root@VM_9_196_centos asset]# celery --version 4.3.0 (rhubarb) ``` 多线程应该是结束了,我在日志中收到了 break 线程的 while 循环之前,打印出的日志,一共 30 个日志,我也启了 30 个线程。 ``` [2019-08-31 22:47:31] [ERROR] - (asscan.py:38) Traceback (most recent call last): File "/root/python/asset/scan/asscan.py", line 34, in run task_host = self.queue.get(block=False) File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get raise Empty _queue.Empty [2019-08-31 22:48:12] [ERROR] - (asscan.py:38) Traceback (most recent call last): File "/root/python/asset/scan/asscan.py", line 34, in run task_host = self.queue.get(block=False) File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get raise Empty _queue.Empty [2019-08-31 22:48:12] [WARNING] - web_fingerprint_discern begin! (log.py:235) [2019-09-02 10:21:21] [WARNING] - () (log.py:235) [2019-09-02 10:21:21] [WARNING] - {'task_uuid': '95c3dda4-cb04-11e9-a344-52540 ``` |
2
neoblackcap 2019-09-02 12:38:10 +08:00 1
没记错的话,celery 自身实现是对 fork 之类有限制的,所以你不应该在任务里面进行类似 fork 之类的操作,线程 pthread_create 同理了。
而且线程的支持我记得已经被 celery 自身抛弃的,所以应该是有缺陷的,建议不使用线程。 根据我以前的做法,我一般都是将网络 IO 与逻辑处理分离。celery 对 gevent 跟进程支持都相当好,因此我会选用个 gevent 处理所有网络 IO (网络 IO,通过 IO 复用,几百万个任务都可以轻松搞定,前提是不能有任何 CPU 密集型处理)。然后通过跟进程型任务结合,组成流水线,在 celery 对应 chain 操作。那么就可以稳定地运行。 因为 gevent 是处理网络是不堵塞的,所以你还是可以继续发任务给该 worker 可以参考一下 |
3
hanssx OP @neoblackcap 谢谢 neoblackcap 师父指点,我还有几点想请教师父,
1. 总体而言就是不使用线程而使用进程或 gevent,是吧? 2. 之前每个线程执行的内容是使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库,主要是两个方面:一是调用 busprocess.Popen(),也就是终究还是会使用子进程;二是会有入库操作,这个应该不算是 CPU 密集型处理?怎么界定 CPU 密集型处理呢? 3. 我这个 nmap 扫描应属于网络 IO 密集型,是不是使用 gevent 比较好? 4. 我对 gevent 不熟悉,不知改动量大不大,师父能提供一些更改的方法吗? |
4
sazima 2019-09-02 13:27:31 +08:00
用 task.apply_async() 可以吗, 同样是异步的.
|
6
neoblackcap 2019-09-02 16:30:44 +08:00 1
@hanssx cpu 密集型是相对的,关键是你的任务类型不能堵塞整个处理逻辑,凡是耗时长的,不需要 IO 的任务都是 IO 密集型
看了一下你用 subprocess.Popen 去调用 nmap,你如果要改的话,请使用 gevent 的网络接口实现你 nmap 的功能,如果不会的话,此方法无解,你还是另寻他法吧。 |
7
hanssx OP @neoblackcap 谢谢,gevent 实现 nmap 的功能基本不可能,必须得使用 subprocess.Popen 去调用 nmap,这种情况下,我使用多进程代替多线程可以吗?你之前说进程或者 gevent 都可以。
|
8
neoblackcap 2019-09-02 18:25:30 +08:00 via iPhone
@hanssx 不可以,可以的前提是你改得动网络请求的部分
|
9
lovedebug 2019-09-02 18:39:57 +08:00 via Android
我碰到的卡住都是程序自己的 bug,异常没抓住。
|
10
hanssx OP @neoblackcap 我不明白为啥不可以,我先试试用多进程,你也说了 celery 对线程支持有缺陷,网络请求的阻塞是必然的。
|
11
xixijun 2019-09-03 10:19:33 +08:00 1
我之前也写过 celery 调度 nmap 的扫描器,答案是可以的。
celery task 里面用 subprocess.Popen 调用 nmap celery 的启动用默认的 execute pool 即 prefork。 还要注意 soft time limit 的设置,超时时需要手动 kill nmap 子进程,防止孤儿进程和僵尸进程 |
12
hanssx OP @xixijun 感谢 xixijun 师父的回答,请问师父你说的可以,是指使用多进程来代替多线程吗?我这边扫的是公司全网,就是扫完之后 celery worker 就卡住假死了,具体详情可查看一下问题描述。
|
13
hanssx OP 2019-09-03 Update:
已经修改为 multiprocessing 多线程,但实际测试时,celery 不能直接使用 multiprocessing,解决方案参见 https://stackoverflow.com/questions/30624290/celery-daemonic-processes-are-not-allowed-to-have-children 因为使用 cavas 改动比较大,所以我直接使用得 import billiard as multiprocessing 目前运行中,持续观察。 |
14
hanssx OP 多进程可以解决这个问题,之前 @崔庆才师父说可能是 logging 死锁的问题,很有可能,待下一步确定。
|
15
hanssx OP |
16
neoblackcap 2019-09-08 12:53:43 +08:00 via iPhone 1
@hanssx 现在我想起来了,我建议你还用 nmap 的 Python 封装库,而不是直接用 subprocess,这样就比较少一些问题,好像叫 Python-nmap,搜一下就可以了。钱就不需要了
|
17
hanssx OP @neoblackcap 嗯,我使用得是你说的 nmap 的 Python 封装库,源码里面使用得也是 subprocess.Popen(),额,需要时可加我扣扣,随时欢迎师父加我。
|