V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
zong400
V2EX  ›  Go 编程语言

channel 的关闭时机

  •  
  •   zong400 · 2022-12-14 11:56:04 +08:00 · 1763 次点击
    这是一个创建于 712 天前的主题,其中的信息可能已经有所发展或是发生改变。

    go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。

    目前这个代码跑起来的问题是会一直阻塞,要手动关闭

    func main() {
    	var wg sync.WaitGroup
    
    	objchan := make(chan []string, 10)
    
    	wg.Add(1)
    	go func(och <-chan []string) {
    		defer wg.Done()
    		for objs := range och {
    			do_something(objs)
    		}
    	}(objchan)
    
    	for _, perfix := range []string{"test", "tc"} {
    		go Getfile(perfix, objchan)
    	}
    
    	wg.Wait()
    }
    
    func Getfile(dir string, filechan chan<- []string) {
    	// send files 
        ...
        filechan <- files
        // 子目录递归
        if dir {
        	go Getfile(dir, filechan)
        }
    }
    
    20 条回复    2022-12-20 21:01:45 +08:00
    bebop
        1
    bebop  
       2022-12-14 13:53:51 +08:00
    wangyu17455
        2
    wangyu17455  
       2022-12-14 14:03:58 +08:00
    for objs := range och 改成 for objs, ok := range och
    ok 会在 channel 关闭后变成 false
    wangyu17455
        3
    wangyu17455  
       2022-12-14 14:06:16 +08:00
    记错了,for 不能用这个写法,正常读取可以
    zong400
        4
    zong400  
    OP
       2022-12-14 14:33:18 +08:00
    @bebop 问题是递归,Getfile 不知道会跑多少次
    sduoduo233
        5
    sduoduo233  
       2022-12-14 15:11:46 +08:00
    感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)
    zong400
        6
    zong400  
    OP
       2022-12-14 16:14:39 +08:00
    @bebop
    @sduoduo233
    改成了在历遍时候 add ,但是结果有点奇怪,只能随机处理"test", "tc"中的一个。
    在 wg.Wait()后面 time.Sleep ,才能显示完整
    zong400
        7
    zong400  
    OP
       2022-12-14 16:34:19 +08:00
    发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
    所以还是在接收端处理 chan 关闭比较好?
    bebop
        8
    bebop  
       2022-12-14 17:28:53 +08:00
    使用协程池,而不是每次都创建一个 chan 。
    和是不是递归没有关系,只要能把数据全部写到 chan 就行。

    func main() {
    poolNum := 10

    var wg sync.WaitGroup
    pool := make(chan string, poolNum)

    // 处理文件
    for i := 0; i < poolNum; i++ {
    wg.Add(1)

    go func(wg *sync.WaitGroup, ch <-chan string) {
    defer wg.Done()

    for filename := range ch {
    fmt.Println(filename)
    }
    }(&wg, pool)
    }

    // 遍历文件
    err := filepath.Walk(".",
    func(path string, info os.FileInfo, err error) error {
    if err != nil {
    return err
    }

    pool <- path
    return nil
    })
    if err != nil {
    log.Println(err)
    }

    close(pool)
    wg.Wait()
    }
    zjj19950716
        9
    zjj19950716  
       2022-12-14 17:30:52 +08:00
    @zong400 关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了
    sibowen
        10
    sibowen  
       2022-12-14 17:53:34 +08:00
    ```golang

    import (
    "fmt"
    "io/ioutil"
    "os"
    "sync"
    )

    var DirPrefix string

    func main() {
    DirPrefix, _ = os.Getwd()
    DirPrefix += "/dir/"
    var wg sync.WaitGroup
    objchan := make(chan string, 10)
    wg.Add(1)
    go func(och <-chan string) {
    defer wg.Done()
    for objs := range och {
    fmt.Println(objs)
    }
    }(objchan)

    wg.Add(1)
    go func(och chan string) {
    defer wg.Done()
    var wgDir sync.WaitGroup
    for _, perfix := range []string{"test", "tc"} {
    wgDir.Add(1)
    go GetFile(perfix, och, &wgDir)
    }
    wgDir.Wait()
    close(objchan)
    }(objchan)
    wg.Wait()
    }

    func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    // send files
    dirNow := DirPrefix+dir
    files, _ := ioutil.ReadDir(dirNow)
    // 子目录递归
    for _, v := range files {
    filePath := DirPrefix+dir+"/"+v.Name()
    if IsDir(filePath) {
    wg.Add(1)
    go GetFile(filePath, fileChan, wg)
    } else {
    fileChan <- filePath
    }
    }
    }

    func IsDir(path string) bool{
    s, err := os.Stat(path)
    if err != nil {
    return false
    }
    return s.IsDir()
    }

    ```

    把读取文件的操作包装到单独的协程里;
    在读取操作完成后,close chan ;
    试试上面这段。
    zong400
        11
    zong400  
    OP
       2022-12-14 18:17:14 +08:00
    @sibowen
    @zjj19950716
    @bebop
    我试试,谢谢
    xingjue
        12
    xingjue  
       2022-12-14 20:50:24 +08:00
    您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:

    ```
    func Getfile(dir string, filechan chan<- []string) {
    // send files
    ...
    filechan <- files

    // 子目录递归
    if dir {
    // 关闭当前 goroutine 中的 channel
    close(filechan)
    // 在新 goroutine 中继续遍历子目录
    go Getfile(dir, filechan)
    }
    }
    ```
    这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。
    zong400
        13
    zong400  
    OP
       2022-12-15 09:23:27 +08:00
    @xingjue channel 能重新打开?
    zong400
        14
    zong400  
    OP
       2022-12-15 09:44:19 +08:00
    目前代码是这样,问题是为什么后面不加 sleep 就只能随机显示 test ,tc 其中一个的内容?

    ```
    func main() {
    var wg sync.WaitGroup

    objchan := make(chan []string, 10)

    go func(och <-chan []string) {
    for objs := range och {
    println(objs)
    }
    }(objchan)

    for _, perfix := range []string{"test", "tc"} {
    wg.Add(1)
    go Getfile(perfix, objchan, &wg)
    }

    wg.Wait()
    time.Sleep(1)
    }

    func Getfile(dir string, filechan chan<- []string, wg *sync.WaitGroup) {
    defer wg.Done()
    // send files
    ...
    filechan <- files
    // 子目录递归
    for _, dir := range dirs {
    wg.Add(1)
    go Getfile(dir, filechan)
    }
    }
    ```
    zong400
        15
    zong400  
    OP
       2022-12-15 09:56:41 +08:00
    @sibowen 按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行

    ```
    wg.Add(1)
    go func(och chan<- []cos.Object) {
    defer wg.Done()
    var wgg sync.WaitGroup
    for _, perfix := range []string{"test", "tc"} {
    wgg.Add(1)
    go tools.GetObjs(cosClient, perfix, objchan, &wgg)
    }
    wgg.Wait()
    close(och)
    }(objchan)
    ```
    zong400
        16
    zong400  
    OP
       2022-12-15 11:47:21 +08:00
    用#1 介绍的协程池方法,目前可行
    zong400
        17
    zong400  
    OP
       2022-12-15 11:50:26 +08:00
    但是协程池 感觉复杂了一层,一定要这样?
    sibowen
        18
    sibowen  
       2022-12-15 17:26:01 +08:00
    @zong400 能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么
    zong400
        19
    zong400  
    OP
       2022-12-16 10:02:59 +08:00
    @sibowen 腾讯的对象存储,读写都是用 sdk 的
    yaott2020
        20
    yaott2020  
       2022-12-20 21:01:45 +08:00 via Android
    https://github.com/smallnest/chanx

    无限缓存 channel ,可以实现无限写,写完再读
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3041 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 14:18 · PVG 22:18 · LAX 06:18 · JFK 09:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.