看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路
协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型
这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?
控制队列容量:make(chan, cap) 第二参数就可以
想要控制协程/线程数量,再辅助一个 chan 就可以了,
代码实现如下,100 行搞定:
我把它放到 github 上 gopool 喜欢的老铁可以给个 star
// GoPool is a minimalistic goroutine pool that provides a pure Go implementation
type GoPool struct {
noCopy
queueLen atomic.Int32
doTaskN atomic.Int32
workerN atomic.Int32
options Options
workerSem chan struct{}
queue chan func()
}
// NewGoPool provite fixed number of goroutines, reusable. M:N model
//
// M: the number of reusable goroutines,
// N: the capacity for asynchronous task queue.
func NewGoPool(opts ...Option) *GoPool {
opt := setOptions(opts...)
if opt.minWorkers <= 0 {
panic("GoPool: min workers <= 0")
}
if opt.minWorkers > opt.maxWorkers {
panic("GoPool: min workers > max workers")
}
p := &GoPool{
options: opt,
workerSem: make(chan struct{}, opt.maxWorkers),
queue: make(chan func(), opt.queueCap),
}
for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
p.workerSem <- struct{}{}
go p.worker(func() {})
}
go p.shrink()
return p
}
// QueueFree returns (capacity of task-queue - length of task-queue)
func (p *GoPool) QueueFree() int {
return int(p.options.queueCap - p.queueLen.Load())
}
// Workers returns current the number of workers
func (p *GoPool) Workers() int {
return int(p.workerN.Load())
}
// Go submits a task to this pool.
func (p *GoPool) Go(task func()) {
if task == nil {
panic("GoPool: Go task is nil")
}
select {
case p.queue <- task:
p.queueLen.Add(1)
case p.workerSem <- struct{}{}:
go p.worker(task)
}
}
func (p *GoPool) worker(task func()) {
p.workerN.Add(1)
defer func() {
<-p.workerSem
p.workerN.Add(-1)
if e := recover(); e != nil {
if p.options.panicHandler != nil {
p.options.panicHandler(e)
}
}
}()
for {
task()
task = <-p.queue
if task == nil {
break
}
p.doTaskN.Add(1)
p.queueLen.Add(-1)
}
}
func (p *GoPool) shrink() {
ticker := time.NewTicker(p.options.shrinkPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
doTaskN := p.doTaskN.Load()
p.doTaskN.Store(0)
if doTaskN < p.options.tasksBelowN {
closeN := p.workerN.Load() - p.options.minWorkers
for closeN > 0 {
p.queue <- nil
closeN--
}
}
}
}
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.