小白学完 channel 马上就不会了 —— 现在会了(大概)

2022-11-07 13:45:50 +08:00
 volvo007

之前的一篇主题发表于 180 天前 都说 go 简单 小白学完 channel 马上就不会了

原来主题的主要目标是遍历一个文件夹内的所有文件,并根据不同的文件后缀进行归类。我也去爆栈问过,但是回答都和我这样操作文件不安全相关,并没有正面解决这个问题。

言之种种,在我做了一些针对 channel 的练习之后,算是大概搞清楚了这个例子要怎么写。可能不是 best practice ,但希望能帮到大家,特别是之前一直关注那个帖子的朋友。(有 18 个收藏,所以我一定要把它搞懂不然对不起这 18 个收藏)

======== 在开始之前,我们稍微回顾一下之前的逻辑:

『函数 1 』 是一个简单、耗时很短的功能,它会不断生成一些中间量等待下游处理;『函数 2 』 则是一个复杂、耗时长的功能。如果单线程运行,那么『函数 1 』会长时间等待浪费很多时间。 我们期待通过 goroutine 来并发处理『函数 2 』以达到提升处理性能的目的。

我用的文件处理案例,根据大佬们所说会有线程安全问题 (多个线程可能会同时创建文件夹,应当加写锁),我们先忽略这个问题,主要还是把握后面的方法论哈

1 函数 getInfo (f []fs.FileInfo, c chan<- string) 通过遍历 []fs.FileInfo 结构,进行了一些简单判断后(例如忽略文件夹、.DS_store 这种),不断将文件名写入 c 这个 string channel

func getInfo(f []fs.FileInfo, c chan string) {
	for _, fs := range f {
		// 这个 if 只是用来简单忽略掉 文件夹或者 .DS_store 这种
		if fs.IsDir() || strings.HasPrefix(fs.Name(), ".") {
			continue
		} else {
			c <- fs.Name()
		}
	}
}

2 函数 dealInfo (path string, typeDict map[string]int, c <-chan string) 通过 range 方法,不断获取 c 之前保存的文件名,截取后缀之后,要么转入对应文件夹,要么创建新文件夹再转入

func dealInfo(path string, typeDict map[string]int, c chan string) {
	for name := range c {
		sp := strings.Split(name, ".")
		suffix := sp[len(sp)-1]

		if _, ok := typeDict[suffix]; ok {
			MoveFile(name, path, suffix)
		} else {
			CreateFolder(path, suffix)
			MoveFile(name, path, suffix)
			typeDict[suffix] = 1

			fmt.Println(name)
		}
	}
}

======== 到这里其实思路上是没有什么问题的,这里最关键的是没有注意到简单练习里不会提到的一个知识点:用 range 遍历 channel 的时候,需要主动 close channel. 否则 range 会阻塞 channel 直到 deadlock panic. 尽管所有 channel 会在 main channel 结束的时候被强制结束. (大概因为 range 遍历 channel 的时候没有错误处理?)

如果不用 range 的方式来遍历的话,我们需要写一个 if name, ok := <- c; ok { ... } 这样的东西放到一个死循环里面,也就是每次循环都要来手动判断一次 c 里面还有没有东西,没东西了我就跳出循环呗。显然 range 遍历的方式更优雅,但要考虑 close(c) 的时机。

第二个点则是如何 “并发” 处理 函数 2 。如果只用 go func(),最多只能实现两个 goroutine 之间的通信,所以我们引入了线程池 sync 库来解决这个问题——我们需要给每个 goroutine 加入到线程池里面,但在某个线程工作结束的时候又要把它从池子里面拿掉。最后,还需要一个 wait 函数来通知主线程等待这些线程工作结束。

具体来说,我们需要改写一下前面的『函数 1 』、『函数 2 』 了:

对于『函数 1 』,原始伪代码:

func getInfo(f []fs.FileInfo, c chan<- string){
	遍历 f { 处理后的 fineName 写入 c }
}

现在应当改写为:

func getInfo(f []fs.FileInfo, c chan<- string){
	// 后面要用 sync.Add 加入池子,所以这里要减去。加入和减去要匹配, 重要!
	defer wg.Done()
    
	遍历 f { 处理后的 fineName 写入 c }

	// 后面其他函数会用 range 来遍历,所以一定要 close ,重要!
	close(c)
}

对于『函数 2 』,由于会用多个 goroutine 并发,那么每一次都需要一个 wg.Add(1) 来加入线程池,所以每一次我们还要从『函数 2 』里减去这个线程

原函数 2 伪代码:

func dealInfo(path string, typeDict map[string]int, c <-chan string){
	for _, filename := range c {
		判断文件;
		处理文件;
	}
}

现在改写为:

func dealInfo(path string, typeDict map[string]int, c <-chan string){
	defer wg.Done()

	for filename := range c {
		判断文件;
		处理文件;
	}
}

非常简单,就是在循环前加一个 defer wg.Done() 就可以了。

最后,我们来写主函数的伪代码:

import ("sync", "fmt", "time", ... )

var wg sync.WaitGroup // 为了创建多线程并发,准备线程池

func getInfo( ... ) // 实现 func1

func dealInfo( ... ) // 实现 func2

func main(){
	c := make(chan string, 1000)
	start := time.Now()

	wg.Add(1)
	go getInfo(...)

	for i:=0; i<16; i++ {
		wg.Add(1)
		go dealInfo(...)
	}

	wg.Wait()
    
	fmt.Println("time: ", time.Since(start))
}

这里应该就能充分暴露前面改写过程中加入的奇怪东西的目的了 😄

可以发现,wg.Add(1) 之后,一定会紧跟一个带有 defer wg.Done() 的函数,来实现线程加减的匹配

而对于比较复杂的『函数 2 』 ,我们通过一个循环来加入 Ngoroutine 线程。wg.Add(1) 放在循环里面,同时每个 wg.Add() 都必然对应一个 defer wg.Done() 来匹配

最后,别忘了放一个 wg.Wait() 来通知主线程等待所有 wg 的线程执行完毕——它靠的就是不断 Add ,之后又不断 Done ,直到池子里线程归零的那一瞬来判断任务全部结束的。所以 AddDone 必须匹配

另外一个之前没有提到的小改动是,我们建立 c (chan string) 的时候,还给了它一些缓存。这样,由于 getInfo 处理得很快,就可以预存一些结果到 c 里面,在面对 16 个 go dealInfo 的时候,就能保证每个 dealInfo 总是能拿到东西来处理,就不会空闲等待了。这个 N ,我在哪看到资料说是最大 10000 个,好像可以通过配置修改。不过对于大部分的场景,如果要修改这个参数,不如优化代码才是正道

还有一个地方是,我们在循环加入『函数 2 』 goroutine 的时候,wg.Add(1) 放在了循环里面。由于我知道这里的循环会创建 16 个 goroutine ,所以我们也可以一开始就在循环外面 wg.Add(16) 把它一口气全加进去。由于每个循环有一个 defer wg.Done() ,所以最后线程池还是可以归零的。只是这样写如果后期要扩充数量的话会有点不好维护,还是每个循环 +1 ,N 则通过配置文件来提供更妥当。

通过这个例子,感觉自己算是摸到了一点 channel 使用的门路。也体会到了一些 『不要通过共享内存来通信,而应该通过通信来共享内存』的设计思路。

这里还有一个不错的例子,是关于并行获取 < N 的所有素数的。它用到了 3 个 channel 来处理 写入、计算、读取打印。通过这里例子,应该能对 close(channel) 的时机有更好的理解。例子,实现不是很严谨

对于 go 小白如我,这里也是班门弄斧。只是希望能够帮助到之前收藏我文章的朋友,或者其他入门 go 的小伙伴。

1771 次点击
所在节点    Go 编程语言
3 条回复
zjj19950716
2022-11-07 14:01:56 +08:00
这东西不叫线程池吧
volvo007
2022-11-07 14:16:34 +08:00
@zjj19950716 嗯嗯,好的,是叫 『等待组』吗?
volvo007
2022-11-07 23:57:34 +08:00
补充一下,WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine ,不是线程池。高手们看看笑笑就好,新手们记得自己脑子里做下替换哈

此外文章里还有些概念、描述上的问题(毕竟不是专业的)。我将测试过的完整代码发到这里大家可以自取

https://pastebin.com/aHCGYfEr

已知问题:由于一开始会有若干线程同时尝试创建文件夹,所以会有几个 "file existed" 错误。捕获打印之后可以继续执行代码

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/893302

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX