编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~
通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞
但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况
taskChan 的缓冲区为 1000 时,阻塞的日志如下:
[DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
[DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
[DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
[DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]
如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
[INFO]:目录扫描完毕
[DEBUG]:func GetAllFilePath end
[DEBUG]:func StartScan end
[DEBUG]:func btnStartScanOnclick end
代码如下:
package core
import (
"DopliGo/logs"
"github.com/panjf2000/ants/v2"
"os"
"path/filepath"
"sync"
"sync/atomic"
)
func GetAllFilePath(rootPath string) {
//logs.IsLogDebug = false
logs.Debug("func GetAllFilePath start")
// 创建任务通道和结果通道
var taskChan = make(chan string, 1000000)
var resultChan = make(chan string, 1000000)
var wg sync.WaitGroup
var counter int64 = 0
// 创建生产者 goroutine 池
producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
})
logs.Debug("cap:%d", producerPool.Cap())
defer producerPool.Release()
taskChan <- rootPath
wg.Add(1) // 这里增加计数器
atomic.AddInt64(&counter, 1)
logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))
// 启动生产者
go func() {
//defer logs.Debug("生产者退出")
for task := range taskChan {
err := producerPool.Invoke(task)
if err != nil {
logs.Error("failed to producerPool Invoke, err: %s", err)
return
}
}
}()
// 启动结果处理 goroutine
go func() {
//defer logs.Debug("消费者退出")
for result := range resultChan {
_ = result
}
}()
// 等待所有任务完成
wg.Wait()
close(resultChan)
close(taskChan)
logs.Info("目录扫描完毕")
logs.Debug("func GetAllFilePath end")
}
func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) {
defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done
// logs.Debug("func produceTasks start")
entries, err := os.ReadDir(rootPath)
if err != nil {
logs.Error("failed to read dir: %s , err: %s", rootPath, err)
return
}
for _, entry := range entries {
path := filepath.Join(rootPath, entry.Name())
if entry.IsDir() {
wg.Add(1)
atomic.AddInt64(counter, 1)
select {
case taskChan <- path:
// 发送成功
default:
// 发送失败,通道已满
logs.Error("写入通道失败...")
}
logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
} else {
resultChan <- path
}
}
atomic.AddInt64(counter, -1)
logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
//logs.Debug("func produceTasks end")
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.