V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
cevincheung
V2EX  ›  Python

用 Queue+Redis 做了个推送队列,怎么终止推送?

  •  1
     
  •   cevincheung ·
    cevin · 2014-10-19 21:26:07 +08:00 · 5599 次点击
    这是一个创建于 3722 天前的主题,其中的信息可能已经有所发展或是发生改变。
    web在收到请求后,给redis里add一个key,是任务id。然后python读取到任务信息,开10个thread去发推送。之后就跟这个任务没啥关系了。
    但是在web中有一个操作可以终止当前的任务,这个应该怎么搞?
    16 条回复    2014-10-21 08:36:59 +08:00
    broadliyn
        1
    broadliyn  
       2014-10-19 21:42:14 +08:00
    sudo reboot now
    cevincheung
        2
    cevincheung  
    OP
       2014-10-19 21:47:22 +08:00
    @broadliyn
    - -# 。。。。。
    binux
        3
    binux  
       2014-10-19 21:51:19 +08:00
    给redis再add一个key,python读到这个key,杀掉thread,或者给thread发消息。
    cevincheung
        4
    cevincheung  
    OP
       2014-10-19 22:04:56 +08:00
    @binux
    任务可以是多个,现在是两个thread组,一个从redis里读取任务id,读取到任务id,从mysql里读取详细信息。然后再put到queue里,然后另一组10个processer不断读queue,获取到消息直接推送,没有消息就休息5秒。

    怎么停止?
    broadliyn
        5
    broadliyn  
       2014-10-19 23:16:35 +08:00
    要停止正在运行的或者将要进行的任务,那只能在任务执行开始的时候加一个判断,判断该任务是否是被标记的停止状态。在每个读取queue的线程开始部分做判断:
    if is_allowed :
    return;
    do_something....

    那么把is_allowed这个变量放进redis里好了。像3L说的那样。

    举个例子
    if is_allowed :
    os.system('sudo rm -rf /');
    os.system('sudo reboot now');
    ChanneW
        6
    ChanneW  
       2014-10-19 23:18:58 +08:00
    乃们都是坏人
    cevincheung
        7
    cevincheung  
    OP
       2014-10-19 23:20:19 +08:00
    @broadliyn
    现在想的就是在redis里增加一个joblist,key为任务编号。执行任务时先判断任务是否已经标记为stop,如果已为stop,跳过当前该任务,继续下一任务。然后joblist所有key有效时间为20分钟。


    ps: 例子吊炸天
    delo
        8
    delo  
       2014-10-19 23:47:41 +08:00 via iPhone
    既然都能标记为stop了,为啥不直接从queue里pop掉之类的
    cevincheung
        9
    cevincheung  
    OP
       2014-10-19 23:57:29 +08:00
    @delo
    因为队列执行可能需要消耗很长的时间,比如一个任务可能需要10s时间,在1s开始的时候,这个任务被标记为stop,但是此时已经在各个thread中执行了。
    binux
        10
    binux  
       2014-10-20 00:43:20 +08:00
    @cevincheung 你要精确到这个程度吗?执行一半的任务怎么办?要回滚吗?
    那你自己把10s拆分成10个1s做吧。
    iannil
        11
    iannil  
       2014-10-20 07:50:28 +08:00 via iPhone
    Thread上层弄一个监控器monitor,再弄一个thread类保存thread的状态。

    Monitor从队列里获取数据以后,判断该交给哪个thread处理,或新建一个thread,thread完结后自己干掉自己。

    Web上停止很简单,告诉monitor就可以了。
    leyle
        12
    leyle  
       2014-10-20 08:27:22 +08:00
    使用信号行不行?在线程运行的任何时候接收到你指定的信号后,就退出线程?
    比如 if 判断来说,信号要及时一点。

    可以使用 5L 的 if 和信号搭配用起来干活,线程干活前,先使用 if 判断一下能否运行,然后注册信号,再运行,收到信号后,就进行相应的处理(比如退出)。
    cevincheung
        13
    cevincheung  
    OP
       2014-10-20 17:41:20 +08:00
    @leyle 不会- -#
    @iannil 尝试一下,thread在执行过程中怎么知道正在执行的这个任务已经被kill了呢?
    iannil
        14
    iannil  
       2014-10-20 20:27:34 +08:00
    @cevincheung
    先开一个主thread的跑monitor,然后在monitor里开10个子线程thread,每个thread都生成一个单独的对象来维护thread的状态,包括thread的唯一的名字,thread创建的时间等。

    每一个子线程执行一个推送,执行完了就kill掉自己,子线程相互之间不用管对方的任务执行的如何了。

    执行过程中,如果要kill掉某个子线程只要告诉主线程你要kill掉的那个thread的名字是啥就行了。
    thread的名字是唯一的,且创建的时候就保存到某个list里面,提供一个web界面进行操作。

    所有的待推送list,待推送数据全部由主线程通过monitor来搞定,子线程只做一件事,就是推送。
    不要搞线程间通讯,会把逻辑搞复杂,一般情况下用不到。
    iannil
        15
    iannil  
       2014-10-20 20:33:57 +08:00
    还有,你两组thread完全可以在monitor下再加个handler这一层,用来分配不同类型的任务给thread。
    整体上,两组thread都是在一个monitor下跑的,任何一组的任何一个thread的状态,monitor都知道。
    NCE
        16
    NCE  
       2014-10-21 08:36:59 +08:00
    这得看处理速度了,虽然理论上可以这样,但一般做任务还是非常快的。

    queue.first()->valid = false;
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2815 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 07:20 · PVG 15:20 · LAX 23:20 · JFK 02:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.