[go]golang 的协程池本应该是这样的

2023-09-12 15:56:11 +08:00
 shaoyie

看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路

协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型

这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?

控制队列容量:make(chan, cap) 第二参数就可以

想要控制协程/线程数量,再辅助一个 chan 就可以了,

代码实现如下,100 行搞定:

我把它放到 github 上 gopool 喜欢的老铁可以给个 star

// GoPool is a minimalistic goroutine pool that provides a pure Go implementation
type GoPool struct {
	noCopy

	queueLen atomic.Int32
	doTaskN  atomic.Int32
	workerN  atomic.Int32
	options  Options

	workerSem chan struct{}
	queue     chan func()
}

// NewGoPool provite fixed number of goroutines, reusable. M:N model
//
// M: the number of reusable goroutines,
// N: the capacity for asynchronous task queue.
func NewGoPool(opts ...Option) *GoPool {
	opt := setOptions(opts...)
	if opt.minWorkers <= 0 {
		panic("GoPool: min workers <= 0")
	}
	if opt.minWorkers > opt.maxWorkers {
		panic("GoPool: min workers > max workers")
	}
	p := &GoPool{
		options:   opt,
		workerSem: make(chan struct{}, opt.maxWorkers),
		queue:     make(chan func(), opt.queueCap),
	}
	for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
		p.workerSem <- struct{}{}
		go p.worker(func() {})
	}
	go p.shrink()
	return p
}

// QueueFree returns (capacity of task-queue - length of task-queue)
func (p *GoPool) QueueFree() int {
	return int(p.options.queueCap - p.queueLen.Load())
}

// Workers returns current the number of workers
func (p *GoPool) Workers() int {
	return int(p.workerN.Load())
}

// Go submits a task to this pool.
func (p *GoPool) Go(task func()) {
	if task == nil {
		panic("GoPool: Go task is nil")
	}
	select {
	case p.queue <- task:
		p.queueLen.Add(1)
	case p.workerSem <- struct{}{}:
		go p.worker(task)
	}
}

func (p *GoPool) worker(task func()) {
	p.workerN.Add(1)
	defer func() {
		<-p.workerSem
		p.workerN.Add(-1)
		if e := recover(); e != nil {
			if p.options.panicHandler != nil {
				p.options.panicHandler(e)
			}
		}
	}()

	for {
		task()
		task = <-p.queue
		if task == nil {
			break
		}
		p.doTaskN.Add(1)
		p.queueLen.Add(-1)
	}
}
func (p *GoPool) shrink() {
	ticker := time.NewTicker(p.options.shrinkPeriod)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			doTaskN := p.doTaskN.Load()
			p.doTaskN.Store(0)
			if doTaskN < p.options.tasksBelowN {
				closeN := p.workerN.Load() - p.options.minWorkers
				for closeN > 0 {
					p.queue <- nil
					closeN--
				}
			}
		}
	}
}
2173 次点击
所在节点    Go 编程语言
27 条回复
TanKuku
2023-09-13 14:48:35 +08:00
不懂就问,也没改 runtime 复用什么东西,就单纯控制数量也叫池了吗?
Pythoner666666
2023-09-13 15:15:27 +08:00
所有的池化技术,无论是 mysql redis 还是 http 的池化技术核心都是复用连接再然后是控制连接的数量不至于把 server 的连接资源都耗尽,私以为你这顶多算是控制并发。但是如果只需要控制并发你这未免也太繁琐了
shaoyie
2023-09-13 16:29:00 +08:00
@Pythoner666666 是的用个计数器也可以达到效果,但这样不就是更通用一点嘛,
huija
2023-09-13 19:00:19 +08:00
官方池化实现不给出来了么,sync.Pool ,如果用不到的,那就说明压根不需要池化(比如协程)
dyllen
2023-09-14 10:55:06 +08:00
@TanKuku 他这个代码控制了协程数量,也复用了协程,shrink 这个函数就是发消息结束超时生存的协程的,worker 里面没任务会阻塞等待任务来,直到 shrink 发了结束信号退出。
lysS
2023-09-15 15:13:37 +08:00
为什么要控制协程数量?即使要限制也是限制请求并发
index90
2023-09-18 18:42:57 +08:00
协程池与最大并发数控制傻傻分不清?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/973059

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX