go 的 channel 的一个疑问

2017-03-20 20:51:41 +08:00
 weiweiwitch

我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。

我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。

现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。

我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?

由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。

1940 次点击
所在节点    Go 编程语言
13 条回复
Muninn
2017-03-20 21:10:17 +08:00
这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力

评估下内存够用不 不够了需要改架构持久化
weiweiwitch
2017-03-20 21:28:28 +08:00
@Muninn 内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。

绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。
znood
2017-03-20 21:31:11 +08:00
如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
pkking
2017-03-20 21:36:06 +08:00
可以借鉴下拥塞算法
PhilC
2017-03-20 22:06:27 +08:00
你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
jiumingmao
2017-03-20 22:25:11 +08:00
使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
jiumingmao
2017-03-20 22:27:21 +08:00
不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
iot
2017-03-20 22:48:43 +08:00
生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
ghbai
2017-03-21 08:38:42 +08:00
gocrawl(开源爬虫类库)的一种方案
https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go

```
type popChannel chan []*URLContext
// The stack function ensures the specified URLs are added to the pop channel
// with minimal blocking (since the channel is stacked, it is virtually equivalent
// to an infinitely buffered channel).
func (pc popChannel) stack(cmd ...*URLContext) {
toStack := cmd
for {
select {
case pc <- toStack:
return
case old := <-pc:
// Content of the channel got emptied and is now in old, so append whatever
// is in toStack to it, so that it can either be inserted in the channel,
// or appended to some other content that got through in the meantime.
toStack = append(old, toStack...)
}
}
}
```
weiweiwitch
2017-03-21 09:17:45 +08:00
@ghbai 如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
ghbai
2017-03-21 12:44:15 +08:00
@weiweiwitch 对,是不能保证有序的。
khowarizmi
2017-03-21 15:13:59 +08:00
在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
gwind
2017-03-30 21:28:22 +08:00
好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
纯 golang 的 nanomsg : https://github.com/go-mangos/mangos

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/348943

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX