[go]golang 的协程池本应该是这样的

2023-09-12 15:56:11 +08:00
 shaoyie

看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路

协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型

这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?

控制队列容量:make(chan, cap) 第二参数就可以

想要控制协程/线程数量,再辅助一个 chan 就可以了,

代码实现如下,100 行搞定:

我把它放到 github 上 gopool 喜欢的老铁可以给个 star

// GoPool is a minimalistic goroutine pool that provides a pure Go implementation
type GoPool struct {
	noCopy

	queueLen atomic.Int32
	doTaskN  atomic.Int32
	workerN  atomic.Int32
	options  Options

	workerSem chan struct{}
	queue     chan func()
}

// NewGoPool provite fixed number of goroutines, reusable. M:N model
//
// M: the number of reusable goroutines,
// N: the capacity for asynchronous task queue.
func NewGoPool(opts ...Option) *GoPool {
	opt := setOptions(opts...)
	if opt.minWorkers <= 0 {
		panic("GoPool: min workers <= 0")
	}
	if opt.minWorkers > opt.maxWorkers {
		panic("GoPool: min workers > max workers")
	}
	p := &GoPool{
		options:   opt,
		workerSem: make(chan struct{}, opt.maxWorkers),
		queue:     make(chan func(), opt.queueCap),
	}
	for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
		p.workerSem <- struct{}{}
		go p.worker(func() {})
	}
	go p.shrink()
	return p
}

// QueueFree returns (capacity of task-queue - length of task-queue)
func (p *GoPool) QueueFree() int {
	return int(p.options.queueCap - p.queueLen.Load())
}

// Workers returns current the number of workers
func (p *GoPool) Workers() int {
	return int(p.workerN.Load())
}

// Go submits a task to this pool.
func (p *GoPool) Go(task func()) {
	if task == nil {
		panic("GoPool: Go task is nil")
	}
	select {
	case p.queue <- task:
		p.queueLen.Add(1)
	case p.workerSem <- struct{}{}:
		go p.worker(task)
	}
}

func (p *GoPool) worker(task func()) {
	p.workerN.Add(1)
	defer func() {
		<-p.workerSem
		p.workerN.Add(-1)
		if e := recover(); e != nil {
			if p.options.panicHandler != nil {
				p.options.panicHandler(e)
			}
		}
	}()

	for {
		task()
		task = <-p.queue
		if task == nil {
			break
		}
		p.doTaskN.Add(1)
		p.queueLen.Add(-1)
	}
}
func (p *GoPool) shrink() {
	ticker := time.NewTicker(p.options.shrinkPeriod)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			doTaskN := p.doTaskN.Load()
			p.doTaskN.Store(0)
			if doTaskN < p.options.tasksBelowN {
				closeN := p.workerN.Load() - p.options.minWorkers
				for closeN > 0 {
					p.queue <- nil
					closeN--
				}
			}
		}
	}
}
2172 次点击
所在节点    Go 编程语言
27 条回复
csh010101
2023-09-12 18:47:52 +08:00
好好好
Nazz
2023-09-12 19:16:27 +08:00
应该是这样的:

```go
type channel chan struct{}

func (c channel) add() { c <- struct{}{} }

func (c channel) done() { <-c }

func (c channel) Go(f func()) {
c.add()
go func() {
f()
c.done()
}()
}
```
lan171
2023-09-12 19:25:55 +08:00
都协程了还要池化?
ylc
2023-09-13 00:26:40 +08:00
@lan171 并发太大时,代码没写好的话会造成协程数激增,池化能控制数量,而且性能也能得到提升
lovelylain
2023-09-13 08:24:12 +08:00
@Nazz 看上去好像也行,OP 代码这么多是完善了哪些方面?
shaoyie
2023-09-13 08:32:40 +08:00
@lovelylain 这不算多吧,这已经算是最精简的了吧,我就是因为看到别人实现的太复杂了,所以写了一个,功能就是可以定时收缩,加了几个计数而已
shaoyie
2023-09-13 08:35:24 +08:00
@shaoyie 抱歉回复错了
Nazz
2023-09-13 08:47:56 +08:00
@lovelylain 最精简的版本,没有之一
Mohanson
2023-09-13 09:46:44 +08:00
XY 问题.

都协程了就没必要池化, 协程模型需要的是速率控制. 用令牌桶算法 10 行代码就能解决问题. 谈起协程就自动类比为进程, 然后想当然认为应该有一个"协程池", "完全是 java/c++之类的外行实现思路"
dyllen
2023-09-13 10:42:52 +08:00
之前看了有的框架有协程池化的组件,看里面的测试结果,除了内存占用优势,性能上没任何优势。
keakon
2023-09-13 10:54:11 +08:00
@shaoyie @Nazz 当队列满的时候,Go() 会阻塞调用线程,而不是加到任务队列里立刻返回,后面空闲了再由 worker 线程去执行。
troywinter
2023-09-13 13:49:06 +08:00
一个 semaphore 就能解决的问题,不需要写这么多代码
shaoyie
2023-09-13 13:50:03 +08:00
@dyllen 是的,不会有肉眼可见的性能提升,至于协程复用啥的只是减少一些内存分配而已,主要还是控制数量
shaoyie
2023-09-13 13:50:44 +08:00
@troywinter 有 sem 不也得有个 job 队列吗
shaoyie
2023-09-13 13:53:18 +08:00
@Mohanson 只是一个小品,总有它存在的应用场景。
kneo
2023-09-13 13:57:01 +08:00
国内大厂自己轮的东西基本都是性能驱动。如果不是为了解决他们自己生产环境里遇见的性能问题,应该不会搞这么个东西出来。
ZSeptember
2023-09-13 14:05:33 +08:00
学过 Go 的都能写出这个代码,,但是并不一定能写出那些复杂的 pool 。
你这个本质上是一个并发控制而已,最基本的优雅关闭也没处理。
shaoyie
2023-09-13 14:33:21 +08:00
@kneo 是的,自己造的轮子肯定是为了解决自己生产环境的问题,不用考虑通用性
shaoyie
2023-09-13 14:37:32 +08:00
@ZSeptember 也不一定,有些人的实现思路就够 gopher ,自己实现一套条件变量 + queue ,我是觉得没必要,go 天生就带这些东西。另外,你说的优雅关闭 是指关闭 pool 吗?我是觉得没必要,本来 go pool 就不是很必须,再加上一个动态的 pool 就更没必要了
bv
2023-09-13 14:43:28 +08:00
@Nazz 代码就应该简单直接

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/973059

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX