生产者消费者模型,在生产和消费同时进行的状态下,如何判断消费者应该结束(没有产品可以消费)

2018-05-10 10:59:17 +08:00
 Hopetree

说一下我的使用场景:

首先,我有一个列表,放了很多 URL,用来测试请求的,然后使用 queue 队列把它们添加到一个队列 q 里面备用(这在生产者消费者模型里面算是原材料吧)

然后,设置一个生产者,它其实也是一个消费者,消费原材料,然后把请求的结果添加到另一个队列 w 中,到这一步我觉得还没有什么问题,因为原材料的数量是一开始就订好了,所以当生产者消费的时候,我判断一下原材料是不是为空就可以退出生产,具体的代码是:

    def run(self):
        while not self.q.empty():
            url = self.q.get()
            do something

现在,生产者一直在生产,直到发现原材料为空,就停止了 while 循环

我又有一个消费者,它会消费生产者产生的请求,去解析请求的信息,但是问题来了,如果生产者和消费者同时开启线程的时候,消费者要怎么判断生产者已经没有生产产品了,该退出 while 循环了呢?我的消费者的 run 是这样的:

        while True:
            if self.w.empty():
                pass
            else:
                info = self.w.get()
                do something

很明显,消费者跟生产者不同,它不能去判断当 w 队列为空的时候跳出循环,因为 w 为空有可能只是消费者把当前的产品消费完了,生产者可能还在继续生产,需要等一下而已

所以,现在的问题是,我如果设置多个生产者和消费者线程的话,最后会完成所有的任务,但是消费者线程一直在 while 循环中,不会退出(因为没有退出的条件)

4861 次点击
所在节点    Python
21 条回复
troycheng
2018-05-10 11:07:51 +08:00
消费者没有内容消费的时候,应该阻塞等待,你这样写就变成了死循环的轮训。至于阻塞等待如何实现,方法很多,教科书上的概念叫 PV 原语,具体实现可以是锁,信号量等手段
Hopetree
2018-05-10 11:21:00 +08:00
@troycheng 我看到的例子要么就是设定了指定的生产数量,要么就是跟我的一样,一直在生产和消费,并没有看到可以判断生产者已经停止生产的例子,如果方便,麻烦给我发个例子我看看,谢谢
jfry
2018-05-10 11:36:48 +08:00
通过生产者和消费者共享一个变量即可,这个模型之前我实现过一个,你可以看一下这里:https://github.com/toaco/carry/blob/6d46dd65f3539ba9767281637bb59ad96b0a4b97/carry/task.py#L381
381 行是生产者生产结束之后的逻辑
404 行是消费者等待的时候处理的逻辑
这里也实现了生产者发生异常后通知消费者让消费者自行推出的逻辑.有需要的话也可以参考一下.
CSM
2018-05-10 11:40:47 +08:00
赞同一楼。
生产者可以在生产结束后往队列里添加一个特殊的标志,消费者收到后就可以退出了。
jfry
2018-05-10 11:45:43 +08:00
@CSM 好想法,不过该方法有个局限是无法让消费者立刻收到消息.比如生产者有紧急的信息需要通知给消费者,但是队列里面仍然有许多未消费的内容,这样消费者只能在一段时间后才能收到紧急信息.
troycheng
2018-05-10 11:59:36 +08:00
写 c++的,不过随手搜了一下,semphore,mutex 一类 IPC 的东西,python 里也有,https://blog.csdn.net/u014595589/article/details/53288168,可以看看是否有所帮助
CSM
2018-05-10 12:17:25 +08:00
@jfry 是有这个问题。还有就是需要消费者在拿到标志之后在退出前需要将标志重新加入队列,以通知其他正在等待的消费者退出。
hcymk2
2018-05-10 12:19:49 +08:00
@CSM
消费者要这么做才能在拿到标志之后在退出前将标志重新加入队列?
Wicked
2018-05-10 12:34:10 +08:00
@jfry 这些上层逻辑都可以通过 producer 和 consumer 之间的通讯方式实现
最简单的,用个共享的 atomic 标记,在 costumer 循环里面每处理完一条请求就检查一下这个标记,这就是你所说的“紧急消息”
本质上就是构建异步通讯模型,开源解决方案都很多,asio,zeromq,了解一下
CSM
2018-05-10 12:34:20 +08:00
@hcymk2 就和生产者往队列添加的方法一样啊 q.put
xuxueli
2018-05-10 12:52:39 +08:00
我有一款爬虫框架,异步多线程方式并行采集网页。遇到的问题和你一模一样。

这个问题我解决了,可以通过判断采集线程状态与队列状态来判断。源码如下:

http://www.xuxueli.com/xxl-crawler/#/
CSM
2018-05-10 12:59:58 +08:00
Hopetree
2018-05-10 14:17:48 +08:00
@CSM 你发的我打不开,不过我在 so 上面找了一个例子,这里的做法是在消费者线程 start()之后在队列里面插入一个元素,这个元素就是用来判断的,不过我有点没弄清楚这里为什么可以这样操作,可能是我对 python 的线程的阻塞不够清晰吧,还在找相关资料理解
Hopetree
2018-05-10 14:18:10 +08:00
CSM
2018-05-10 14:28:49 +08:00
@Hopetree gist 需要梯子。
你发的链接里那个回答是在生产者结束之后( producer.join()之后)才插入那个 None 作为标志的。
我估计你是没理解队列那个模块,如果 queue 为空,queue.get()是会阻塞的,直到有新的数据加入才会返回。
ilucio
2018-05-10 15:36:13 +08:00
生产者生产完了后给消费者发送一个 None,然后消费者者会退出执行了
ilucio
2018-05-10 15:38:11 +08:00
消费者那里的代码要改一下:
while True:
if self.w.empty():
pass
else:
info = self.w.get()
if info is not None:
do something
bany
2018-05-10 15:56:47 +08:00
一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。详细可以参考《 python cookbook 》第十二章 21.3 节 线程间通信。
Hopetree
2018-05-10 16:16:55 +08:00
@ilucio
@bany
感谢,我现在是使用的这种方式,只不过没有搞懂为什么是这样使用,目前还在理解
lyc1116
2018-05-10 17:24:11 +08:00
毒丸对象了解一下

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/453689

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX