多线程消费, 队列没有清空就退出了, 请教原因, 谢谢!

2022-01-18 17:42:56 +08:00
 vegetableChick

我想通过 ThreadPoolExecutor 使用多个线程来消耗 redis 队列。但进程在队列没有消耗完的情况下退出了

下面是实现代码

from concurrent.futures import ThreadPoolExecutor

import redis
from redis import Redis



pool = redis.ConnectionPool(
    max_connections=settings.REDIS_POOL_MAX_CLIENT,
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=8,
    decode_responses=True
)

redis_con = Redis(connection_pool=pool)


class BasicTask(object):
     def __init__(self,
                 consume_queue_name=None,
                 thread_num=50):
        self.consume_queue_name = consume_queue_name
        self.thread_num = thread_num

    def _consume(self):
        try:
            with ThreadPoolExecutor(max_workers=self.thread_num) as e:
                e.map(self._do_request, range(0, self.thread_num))
        except Exception as e:
            self.logger.error(f"[consume error]: {e}")

    def _do_request(self, _):
        try:
            with redis_con as redis_conn:
                while 1:
                    account_id_info = redis_conn.rpop(self.consume_queue_name)
                    if account_id_info:
                        try:
                            # django orm save db
                            ...

                        except Exception as e:
                            import traceback as tb
                            tb.print_exc()
                            self.logger.error(f"[consume error]: {e}. ")

                    else:
                        break

        except Exception as e:
            self.logger.error(f"[Unexpected Error: {e}]")
            import traceback as tb
            tb.print_exc()

    def run(self):
        self._consume()



# run 
BasicTask(consume_queue_name="base_list_queue").run()

请问 bug 写在哪里了? 感谢大佬

python3.7.3

2753 次点击
所在节点    Python
2 条回复
MoYi123
2022-01-18 17:51:13 +08:00
唯一的可能性就是 if account_id_info: 后面的 break 了吧, 不然都是有日志的.
wuwukai007
2022-01-18 18:52:44 +08:00
不是应该 brpop 之后在开启多线程 执行任务吗,怎么多线程 brpop 了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/829047

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX