V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
PungentSauce
V2EX  ›  Go 编程语言

看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?

  •  
  •   PungentSauce · 43 天前 · 6025 次点击
    这是一个创建于 43 天前的主题,其中的信息可能已经有所发展或是发生改变。

    这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

    util.TaskConsumer[[]string](10).
    		SetP(lineopt.IterExcel2("xxx.xlsx")).
    		SetC(func(index int, row []string) (err error) {
    			if index == 1 {
    				return
    			}
    			// .....
    			// 这里是逻辑处理函数
    			return
    		}).
    		Run()
    
    

    这是两个封装的函数的源码。

    package lineopt
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/xuri/excelize/v2"
    	"iter"
    	"log/slog"
    	"os"
    )
    
    func IterLine2(filePath string) iter.Seq2[int, string] {
    	return func(yield func(int, string) bool) {
    		f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)
    		if errF != nil {
    			return
    		}
    		defer func(f *os.File) {
    			err := f.Close()
    			if err != nil {
    				fmt.Println(err)
    			}
    		}(f)
    		scanner := bufio.NewScanner(f)
    		index := 1
    		for scanner.Scan() {
    			line := scanner.Text()
    			if !yield(index, line) {
    				return
    			}
    			index += 1
    		}
    	}
    }
    
    func IterLine(filePath string) iter.Seq[string] {
    	return func(yield func(string) bool) {
    		for _, item := range IterLine2(filePath) {
    			if !yield(item) {
    				return
    			}
    		}
    	}
    }
    
    func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		f, err := excelize.OpenFile(config.FilePath)
    		if err != nil {
    			slog.Error(err.Error())
    			return
    		}
    		defer f.Close()
    		targetSheet := config.TargetSheet
    		if targetSheet == "" {
    			targetSheet = f.GetSheetName(0)
    		}
    		rows, err := f.Rows(targetSheet)
    		if err != nil {
    			slog.Error(err.Error())
    			return
    		}
    		index := 1
    		for rows.Next() {
    			row, err := rows.Columns()
    			if err != nil {
    				slog.Error(err.Error())
    				return
    			}
    			if !yield(index, row) {
    				return
    			}
    			index += 1
    		}
    		return
    	}
    }
    
    func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(config) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcel2(filePath string) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
    			if !yield(index, value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcel(filePath string) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		for index, value := range MapIterExcel2(ExcelTarget{
    			FilePath:    filePath,
    			TargetSheet: sheetName,
    		}) {
    			if !yield(index, value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(ExcelTarget{
    			FilePath:    filePath,
    			TargetSheet: sheetName,
    		}) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    package util
    
    import (
    	"dt/app/util/lineopt"
    	"errors"
    	"iter"
    	"sync"
    )
    
    func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {
    	counter := 10
    	if len(number) == 1 && number[0] > 0 {
    		counter = number[0]
    	}
    	return StartTogether(func() {
    		for item := range queue {
    			job(item)
    		}
    	}, counter)
    }
    
    // Together 并行执行
    func Together(job func(), counter int) {
    	var wg sync.WaitGroup
    	for i := 1; i <= counter; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			job()
    		}()
    	}
    	wg.Wait()
    }
    
    func StartTogether(job func(), counter int) *sync.WaitGroup {
    	var wg sync.WaitGroup
    	for i := 1; i <= counter; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			job()
    		}()
    	}
    	return &wg
    }
    
    type chanData[d any] struct {
    	index int
    	data  d
    }
    
    func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {
    	counter := 10
    	if len(number) == 1 && number[0] > 0 {
    		counter = number[0]
    	}
    	return StartTogether(func() {
    		for item := range queue {
    			err := job(item.index, item.data)
    			if errors.Is(err, lineopt.Stop) {
    				// 目前不可以直接停止,会导致消费者阻塞掉
    				//return
    			}
    		}
    	}, counter)
    }
    
    type ProducerConsumer[T any] struct {
    	consumerNumber int
    	queue          chan chanData[T]
    	p              iter.Seq2[int, T]
    	c              func(index int, item T) (err error)
    	once           sync.Once
    }
    
    func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {
    	itself.c = c
    	return itself
    }
    
    func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {
    	itself.p = p
    	return itself
    }
    
    // 生产者消费者都有可能发生阻塞,
    // 生产者阻塞的原因是因为 queue 容量不够了
    // 消费者阻塞的原因的是因为 queue 没有 close
    // 生产者只需要实现即可
    func (itself *ProducerConsumer[T]) do() {
    	task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {
    		return itself.c(index, item)
    	}, itself.consumerNumber)
    	defer task.Wait()
    	defer close(itself.queue)
    	for index, v := range itself.p {
    		select {
    		case itself.queue <- chanData[T]{
    			index,
    			v,
    		}:
    			break
    			// 需要一个可以知道提前截止的操作
    		}
    	}
    
    }
    
    func (itself *ProducerConsumer[T]) Run() {
    	itself.once.Do(func() {
    		itself.do()
    	})
    }
    
    func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {
    	n := 1
    	if len(consumerNumber) > 0 {
    		n = consumerNumber[0]
    	}
    	return &ProducerConsumer[T]{
    		queue:          make(chan chanData[T], n),
    		consumerNumber: n,
    	}
    }
    
    
    28 条回复    2025-07-28 17:40:02 +08:00
    PungentSauce
        1
    PungentSauce  
    OP
       43 天前
    感觉调用的地方有点不像 go 了
    aladdinding
        2
    aladdinding  
       43 天前
    yield 乍一看以为是 py
    henix
        3
    henix  
       43 天前   ❤️ 1
    用了 Go 最新的 iterator 搞出了一些偏函数式风格的东西。似乎是为了方便并发处理

    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方
    RedisMasterNode
        4
    RedisMasterNode  
       43 天前
    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方

    或许没人管,或许也不是多好的代码,但是要有职业素养...
    vincentWdp
        5
    vincentWdp  
       43 天前
    一股 Java 味儿
    PungentSauce
        6
    PungentSauce  
    OP
       43 天前
    @RedisMasterNode @henix 同学的本地工具代码,不是生产环境用的。
    PungentSauce
        7
    PungentSauce  
    OP
       43 天前
    @PungentSauce 不是项目使用的。公司代码都是 java
    PungentSauce
        8
    PungentSauce  
    OP
       43 天前
    感觉比较 6 ,但是风格和其他看到的不大一样。
    Nanosk
        9
    Nanosk  
       43 天前   ❤️ 4
    这写法看到就烦 谁爱维护谁维护。。
    fumeboy
        10
    fumeboy  
       43 天前
    大哥。函数式编程都没见过?
    jheroy
        11
    jheroy  
       43 天前   ❤️ 1
    这种代码就是写的人感觉很爽,看的人想骂娘。
    kk2syc
        12
    kk2syc  
       43 天前
    链式编程……这不是基本功吗……反过来说,java 那套继承就很……
    archxm
        13
    archxm  
       43 天前
    前端风格吗?
    mightybruce
        14
    mightybruce  
       43 天前   ❤️ 1
    这代码没什么问题, 不知道你想说什么。
    lysShub
        15
    lysShub  
       43 天前   ❤️ 1
    垃圾代理,而且函数变量很影响性能
    strobber16
        16
    strobber16  
       43 天前
    这就是 go ,不爽不要来
    nkidgm
        17
    nkidgm  
       43 天前
    go 我都是用来做命令行小程序,业务层面的代码可不敢这样子写。
    kneo
        18
    kneo  
       43 天前
    有啥问题?
    iceheart
        19
    iceheart  
       42 天前 via Android   ❤️ 1
    过两年作者自己也看不懂了。
    Silicon
        20
    Silicon  
       42 天前
    这种风骚属于脱离生态后不得已而为之,带有强迫的属性,所以是其他「受害者」。
    上游先有一个预处理器把 excel 干掉,剩下的事情就好办很多……
    geebos
        21
    geebos  
    PRO
       42 天前   ❤️ 2
    写了好几年的 go ,很难接受这样的写法
    feedcode
        22
    feedcode  
       42 天前
    用了新的 feature, range over functions

    https://go.dev/blog/range-functions
    As of Go 1.23 it now supports ranging over functions that take a single argument. The single argument must itself be a function that takes zero to two arguments and returns a bool; by convention, we call it the yield function.

    ```
    func(yield func() bool)

    func(yield func(V) bool)

    func(yield func(K, V) bool)
    ```
    leowyzhuang
        23
    leowyzhuang  
       42 天前
    人和程序有一个能跑就行
    kfpenn
        24
    kfpenn  
       40 天前
    看了下这个新特性,所以这个东西只是让以后的库能有一个统一的遍历方法,但里面可能还是用的 scanner.Scan(),Rows.Next () 这些?
    bronyakaka
        25
    bronyakaka  
       40 天前
    因为 go 社区很多人吹 go 是函数式编程 结果啥函数式 api 都没有,可能还是泛型的问题
    bronyakaka
        26
    bronyakaka  
       40 天前
    可以看下社区模拟函数式 api 的 lo 库,基本上包含了楼主示例里这些封装
    leokun
        27
    leokun  
       40 天前
    我说 fp 是防御性编程的一种,大家没意见吧
    R136a1
        28
    R136a1  
       40 天前
    函数式编程没啥问题,但是 go 的函数式是真难看,通篇读下来我只看到一堆花括号在天上飞
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1471 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:47 · PVG 00:47 · LAX 09:47 · JFK 12:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.