golang 分享: 60 行代码巧妙实现一个高性能无 channel 任务队列

2023-03-03 20:12:39 +08:00
 Nazz

话不多说, 先上测试数据, 在各种负载下均有良好表现:

// small task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 1000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	    3302	    357841 ns/op	   55977 B/op	    2053 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	    4426	    319383 ns/op	   20000 B/op	    1173 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	    3026	    399899 ns/op	   16047 B/op	    1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	    4314	    259668 ns/op	   48028 B/op	    3000 allocs/op
PASS
// medium task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 10000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	    1491	    808853 ns/op	   57635 B/op	    2008 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	    1377	    870051 ns/op	   17266 B/op	    1029 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     886	   1324236 ns/op	   16054 B/op	    1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	    1324	    836092 ns/op	   48000 B/op	    3000 allocs/op
PASS
// large task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 100000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	     193	   6026196 ns/op	   58162 B/op	    2004 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	     178	   6942255 ns/op	   17108 B/op	    1019 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     174	   6300705 ns/op	   16157 B/op	    1002 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	     176	   7084957 ns/op	   48071 B/op	    2995 allocs/op
PASS

测试代码 Benchmark

代码实现

package bench

import (
	"sync"
)

type (
	WorkerQueue struct {
		mu             *sync.Mutex // 锁
		q              []Job       // 任务队列
		maxConcurrency int32       // 最大并发
		curConcurrency int32       // 当前并发
	}

	Job func()
)

// NewWorkerQueue 创建一个任务队列
func NewWorkerQueue(maxConcurrency int32) *WorkerQueue {
	return &WorkerQueue{
		mu:             &sync.Mutex{},
		maxConcurrency: maxConcurrency,
		curConcurrency: 0,
	}
}

// 获取一个任务
func (c *WorkerQueue) getJob(delta int32) Job {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.curConcurrency += delta
	if c.curConcurrency >= c.maxConcurrency {
		return nil
	}
	if n := len(c.q); n == 0 {
		return nil
	}
	var result = c.q[0]
	c.q = c.q[1:]
	c.curConcurrency++
	return result
}

// 递归地执行任务
func (c *WorkerQueue) do(job Job) {
	job()
	if nextJob := c.getJob(-1); nextJob != nil {
		go c.do(nextJob)
	}
}

// Push 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) Push(job Job) {
	c.mu.Lock()
	c.q = append(c.q, job)
	c.mu.Unlock()
	if item := c.getJob(0); item != nil {
		go c.do(item)
	}
}

如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)

4686 次点击
所在节点    分享创造
29 条回复
ihciah
2023-03-03 22:40:45 +08:00
说实话这代码我是真没看懂。。
Mitt
2023-03-03 22:45:51 +08:00
不要 channel 反手加了个锁可还行
Glauben
2023-03-03 22:56:00 +08:00
感觉就是普通的做法,不太理解这样的写法有什么特别的,少的时间是从功能削减上得来的吧。
Nazz
2023-03-03 23:15:51 +08:00
@ihciah 递归
Nazz
2023-03-03 23:20:29 +08:00
@Glauben 加上 recover 结果也差不多
Nazz
2023-03-03 23:22:29 +08:00
@Mitt mutex 比 channel 轻量
littlewing
2023-03-04 00:15:29 +08:00
巧妙?
妹想到啊 妹想到啊
hsfzxjy
2023-03-04 00:32:04 +08:00
建议把 q 当成循环队列,复用前面空的位置,可以减少 alloc 次数
maocat
2023-03-04 00:58:38 +08:00
@hsfzxjy 既然循环队列有了,要不再加一个 sendq 和 recvq ,直接从对应的 g 上操作 \dog
voidmnwzp
2023-03-04 01:26:25 +08:00
这跟 go 有啥关系啊 没了 channel 任何语言都能更轻松实现啊
Trim21
2023-03-04 01:27:30 +08:00
只看标题我以为是没 mutex 的…
securityCoding
2023-03-04 02:30:21 +08:00
看看 ring buffer 无锁队列实现方式。。。
Nazz
2023-03-04 04:26:31 +08:00
@hsfzxjy 是一个优化点
Nazz
2023-03-04 04:28:30 +08:00
@voidmnwzp 你不妨说明白点具体是什么语言
Nazz
2023-03-04 04:51:15 +08:00
@hsfzxjy
@securityCoding
其实最完美的结构是 stack. 我想到一种优化方式,先 push, 然后 swap(q[0], q[n-1]), 最后 pop
Nazz
2023-03-04 05:01:22 +08:00
@Nazz 这种方式不能保证 fifo ,并发小的话还是 heap 好点
rrfeng
2023-03-04 07:42:04 +08:00
benchmark 没有对比 channel 的吗?
Nazz
2023-03-04 07:56:16 +08:00
@rrfeng 另外几种"协程池"都用了 channel
chuanqirenwu
2023-03-04 21:31:18 +08:00
gws 的思路是自己实现一个极简的 eventloop ,而不用 go 自带的协程机制,从而没有什么额外的开销,提高性能?
Nazz
2023-03-04 23:13:29 +08:00
@chuanqirenwu 同步模式没开额外协程,异步模式会开非常驻的协程,执行完任务就退出, 两种模式都没使用 channel.

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/920932

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX