channel 的关闭时机

2022-12-14 11:56:04 +08:00
 zong400

go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。

目前这个代码跑起来的问题是会一直阻塞,要手动关闭

func main() {
	var wg sync.WaitGroup

	objchan := make(chan []string, 10)

	wg.Add(1)
	go func(och <-chan []string) {
		defer wg.Done()
		for objs := range och {
			do_something(objs)
		}
	}(objchan)

	for _, perfix := range []string{"test", "tc"} {
		go Getfile(perfix, objchan)
	}

	wg.Wait()
}

func Getfile(dir string, filechan chan<- []string) {
	// send files 
    ...
    filechan <- files
    // 子目录递归
    if dir {
    	go Getfile(dir, filechan)
    }
}
1759 次点击
所在节点    Go 编程语言
20 条回复
bebop
2022-12-14 13:53:51 +08:00
wangyu17455
2022-12-14 14:03:58 +08:00
for objs := range och 改成 for objs, ok := range och
ok 会在 channel 关闭后变成 false
wangyu17455
2022-12-14 14:06:16 +08:00
记错了,for 不能用这个写法,正常读取可以
zong400
2022-12-14 14:33:18 +08:00
@bebop 问题是递归,Getfile 不知道会跑多少次
sduoduo233
2022-12-14 15:11:46 +08:00
感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)
zong400
2022-12-14 16:14:39 +08:00
@bebop
@sduoduo233
改成了在历遍时候 add ,但是结果有点奇怪,只能随机处理"test", "tc"中的一个。
在 wg.Wait()后面 time.Sleep ,才能显示完整
zong400
2022-12-14 16:34:19 +08:00
发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
所以还是在接收端处理 chan 关闭比较好?
bebop
2022-12-14 17:28:53 +08:00
使用协程池,而不是每次都创建一个 chan 。
和是不是递归没有关系,只要能把数据全部写到 chan 就行。

func main() {
poolNum := 10

var wg sync.WaitGroup
pool := make(chan string, poolNum)

// 处理文件
for i := 0; i < poolNum; i++ {
wg.Add(1)

go func(wg *sync.WaitGroup, ch <-chan string) {
defer wg.Done()

for filename := range ch {
fmt.Println(filename)
}
}(&wg, pool)
}

// 遍历文件
err := filepath.Walk(".",
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

pool <- path
return nil
})
if err != nil {
log.Println(err)
}

close(pool)
wg.Wait()
}
zjj19950716
2022-12-14 17:30:52 +08:00
@zong400 关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了
sibowen
2022-12-14 17:53:34 +08:00
```golang

import (
"fmt"
"io/ioutil"
"os"
"sync"
)

var DirPrefix string

func main() {
DirPrefix, _ = os.Getwd()
DirPrefix += "/dir/"
var wg sync.WaitGroup
objchan := make(chan string, 10)
wg.Add(1)
go func(och <-chan string) {
defer wg.Done()
for objs := range och {
fmt.Println(objs)
}
}(objchan)

wg.Add(1)
go func(och chan string) {
defer wg.Done()
var wgDir sync.WaitGroup
for _, perfix := range []string{"test", "tc"} {
wgDir.Add(1)
go GetFile(perfix, och, &wgDir)
}
wgDir.Wait()
close(objchan)
}(objchan)
wg.Wait()
}

func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// send files
dirNow := DirPrefix+dir
files, _ := ioutil.ReadDir(dirNow)
// 子目录递归
for _, v := range files {
filePath := DirPrefix+dir+"/"+v.Name()
if IsDir(filePath) {
wg.Add(1)
go GetFile(filePath, fileChan, wg)
} else {
fileChan <- filePath
}
}
}

func IsDir(path string) bool{
s, err := os.Stat(path)
if err != nil {
return false
}
return s.IsDir()
}

```

把读取文件的操作包装到单独的协程里;
在读取操作完成后,close chan ;
试试上面这段。
zong400
2022-12-14 18:17:14 +08:00
@sibowen
@zjj19950716
@bebop
我试试,谢谢
xingjue
2022-12-14 20:50:24 +08:00
您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:

```
func Getfile(dir string, filechan chan<- []string) {
// send files
...
filechan <- files

// 子目录递归
if dir {
// 关闭当前 goroutine 中的 channel
close(filechan)
// 在新 goroutine 中继续遍历子目录
go Getfile(dir, filechan)
}
}
```
这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。
zong400
2022-12-15 09:23:27 +08:00
@xingjue channel 能重新打开?
zong400
2022-12-15 09:44:19 +08:00
目前代码是这样,问题是为什么后面不加 sleep 就只能随机显示 test ,tc 其中一个的内容?

```
func main() {
var wg sync.WaitGroup

objchan := make(chan []string, 10)

go func(och <-chan []string) {
for objs := range och {
println(objs)
}
}(objchan)

for _, perfix := range []string{"test", "tc"} {
wg.Add(1)
go Getfile(perfix, objchan, &wg)
}

wg.Wait()
time.Sleep(1)
}

func Getfile(dir string, filechan chan<- []string, wg *sync.WaitGroup) {
defer wg.Done()
// send files
...
filechan <- files
// 子目录递归
for _, dir := range dirs {
wg.Add(1)
go Getfile(dir, filechan)
}
}
```
zong400
2022-12-15 09:56:41 +08:00
@sibowen 按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行

```
wg.Add(1)
go func(och chan<- []cos.Object) {
defer wg.Done()
var wgg sync.WaitGroup
for _, perfix := range []string{"test", "tc"} {
wgg.Add(1)
go tools.GetObjs(cosClient, perfix, objchan, &wgg)
}
wgg.Wait()
close(och)
}(objchan)
```
zong400
2022-12-15 11:47:21 +08:00
用#1 介绍的协程池方法,目前可行
zong400
2022-12-15 11:50:26 +08:00
但是协程池 感觉复杂了一层,一定要这样?
sibowen
2022-12-15 17:26:01 +08:00
@zong400 能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么
zong400
2022-12-16 10:02:59 +08:00
@sibowen 腾讯的对象存储,读写都是用 sdk 的
yaott2020
2022-12-20 21:01:45 +08:00
https://github.com/smallnest/chanx

无限缓存 channel ,可以实现无限写,写完再读

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/902426

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX