V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
Gota
V2EX  ›  Go 编程语言

Go 1.21 新加的 log/slog 大家开始用了吗? 写了一个输出到阿里云日志的 writer 扩展, 顺带问个跟 slog 扩展相关的问题.

  •  1
     
  •   Gota ·
    gota33 · 212 天前 · 2932 次点击
    这是一个创建于 212 天前的主题,其中的信息可能已经有所发展或是发生改变。

    先是分享下我写的扩展

    扩展 writer 的地址: https://github.com/gota33/aliyun-log-writer

    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    

    用的时候把参数中的 os.Stdout 换成这个 writer 就行了.

    然后问两个问题

    一个是 slog 想 Hook 的话就两个接口可以入手, slog.Handlerio.Writer.

    我这里选择了 io.Writer, 主要是因为自己写 slog.Handler 的话, 得把 slog.commonHandler 里的逻辑再实现一遍, 实在有点麻烦, 有没有像 logrus.Hook 那样比较简单的实现方式?


    还有一个问题跟 channel 有关.

    有这样的工作队列, submit() 提交任务, stop() 阻止新任务提交, 并处理完存量任务后返回.

    var (
    	ErrClosed = errors.New("closed")
    	chQuit    = make(chan struct{})
    	chData    = make(chan int, 10)
    )
    
    // func start() { ... }
    
    func submit(n int) error {
    	select {
    	case <-chQuit:
    		return ErrClosed
    	case chData <- n:
    		return nil
    	}
    }
    
    func stop() {
    	close(chQuit)
    	close(chData)
    	
    	for n := range chData {
    		// process data
    		_ = n
    	}
    }
    

    但是 selectswitch 不一样, case 的选择不是有序的, 导致有时候会选到第二个 case 然后 panic.

    所以后来把 stop() 改成这样了

    func stop() {
    	close(chQuit)
    
    	ch := chData
    	chData = make(chan int)
    
    	for len(ch) > 0 {
    		// process data
    		select {
    		case n := <-ch:
    			_ = n
    		default:
    		}
    	}
    }
    

    这种情况大家一般是怎么处理的?

    40 条回复    2023-10-16 13:53:34 +08:00
    hallDrawnel
        1
    hallDrawnel  
       212 天前
    看你的逻辑是想阻止新任务提交,但是已经提交的任务要继续处理完是吧?

    如果可以可以控制上游,那应该把 close chan 返回回去,让上游自己停止提交。
    Gota
        2
    Gota  
    OP
       212 天前
    @hallDrawnel 上游是 io.Writer 的 Write(), 用户用 slog 写日志的时候触发, 所以控制不了.
    wentx
        3
    wentx  
       212 天前
    close(chQuit) 改成 chQuit <- struct{}{}

    试试
    hsfzxjy
        4
    hsfzxjy  
       212 天前 via Android
    chData 不要 close 呗
    Gota
        5
    Gota  
    OP
       212 天前
    #3 @wentx 只要 select 是无序的, 都有可能选到第二个. https://stackoverflow.com/questions/68650423/do-select-statements-guarantee-order-of-channel-selection

    #4 @hsfzxjy 结尾那套写法就没 close, 想看看有没有其他的处理方式.
    wentx
        6
    wentx  
       212 天前
    @Gota 不不不,你 chQuit 是无缓冲的 channel, 你在往里写的时候,会阻塞,等到 submit 函数里面
    case <-chQuit:
    return ErrClosed

    这段执行到,才会走到下一步 close(chData),这个时候 submit 函数已经退出,所以 submit 不会 panic
    wentx
        7
    wentx  
       212 天前
    @wentx 然后 stop 的最后再去 close(chQuit)
    Gota
        8
    Gota  
    OP
       212 天前
    @wentx 但 submit() 不一定只有一个线程在调用. 而且如果在 stop 清空存量任务的过程中, 有另一个 submit() 调用, 还是会走到第二个 case 的吧? 可能还有一个问题, 如果没有 submit() 直接调用 stop() 程序就卡住了.
    hsfzxjy
        9
    hsfzxjy  
       212 天前 via Android
    @wentx 要是此时没有 submit ,那 stop 就塞住了
    要是有多个 submit 在并发,那只有一个是正确的
    你这问题更大
    wentx
        10
    wentx  
       212 天前
    @Gota 这个是队列设计的问题了,如何保证 producer 关闭后,其他 goroutine 调用 submit 没有副作用。
    wentx
        11
    wentx  
       212 天前
    在这个场景下面,甚至都不需要使用 channel 来通知,你直接一个全局变量,在 submit 的时候,判断一下是否是 close 就可以了。
    Gota
        12
    Gota  
    OP
       212 天前
    @wentx 没听太明白, 可以具体描述下吗?
    wentx
        13
    wentx  
       212 天前
    @Gota
    ```
    var (
    ErrClosed = errors.New("closed")
    chQuit bool
    chData = make(chan int, 10)
    )

    // func start() { ... }

    func submit(n int) error {
    if chQuit {
    return ErrClosed
    }

    chData <- n:

    return nil
    }

    func stop() {
    chQuit = true
    close(chData)

    for n := range chData {
    // process data
    _ = n
    }
    }
    ```
    pkoukk
        14
    pkoukk  
       212 天前
    stop 里不要 close(chData)啊 , defer close(chData) 不就可以了?
    Gota
        15
    Gota  
    OP
       212 天前
    #13 @wentx 嗯... 并发环境下直接用 bool 值已经很危险了. 而且即使这样写, 程序卡住和 panic 的问题依然会存在.
    #14 @pkoukk defer 也不行, 哪怕 stop() 已经完全执行完了. 这时候调 submit() 还是有概率选到第二个 case.
    wentx
        16
    wentx  
       212 天前
    @Gota 会啥会卡住 和 panic 呢?
    并发的话,atomic.LoadXXX 或者 atomic.StoreXXX
    Gota
        17
    Gota  
    OP
       212 天前
    @wentx

    因为并发环境下函数执行随时会被挂起. 如果 submit 执行完 if 判断被挂起, 去执行 stop, 等恢复执行 submit 的时候就会 panic

    即使正常执行, 如果 submit 执行到 chData <- n 如果因为 buffer 满了开始等待, 此时执行 stop, 会 100% panic.
    pkoukk
        18
    pkoukk  
       212 天前
    @pkoukk #14 stop 都执行完了,为什么还有线程可以调用 submit 啊?
    通道关闭了,还尝试往通道写入数据也是一种 panic 行为啊
    我们一般的做法是用 context.Done 作为标志,因为 contenxt 是可以逐层传递的
    当 sumbit 这边收到 Done 的时候,生产者同时也应该停止了
    Gota
        19
    Gota  
    OP
       212 天前
    @pkoukk 因为这是个 Logger, 调用者从各个线程触发写日志的操作. 在主线程调用 stop() 的时候没法确保其他线程都提前停下来不写日志. 如果 writer 返回 ErrClose 的话 slog 是能处理的, 但直接 panic 掉就不行了.
    qing18
        20
    qing18  
       212 天前
    为啥要 close chData 呢?
    ```
    func stop() {
    select {
    case <-chQuit:
    return
    default:
    close(chQuit)
    }
    }
    ```
    Gota
        21
    Gota  
    OP
       212 天前
    @qing18 不 close(chData) 就是帖子末尾处的写法.
    这里的 submit 接口需要确保: 如果不返回错误的话, 写入的 data 是一定要被处理的.
    所以如果不 close 也不换成一个无缓冲 channel 的话, 会出现调用者认为数据成功提交了, 但实际上却没处理的情况.
    soap520
        22
    soap520  
       212 天前
    ```
    func submit(n int) error {
    select {
    case <-chQuit:
    return ErrClosed
    default:
    }

    chData <- n
    return nil

    }

    func stop() {
    close(chQuit)

    for n := range chData {
    // process data
    _ = n
    }

    close(chData)
    }
    ```

    看看这样行不行,
    submit 里面先判断一下 chQuit 是不是已经 close 了。
    stop 处理完再 close chData 。

    一种可能让人看起来有点担心的执行顺序是,1. submit 里, 判断 chQuit 还没关闭。2. stop 里,执行 close(chQuit)。3. submit 里,接着 chData <- n 。不过应该在你的用例中年问题不大。
    Gota
        23
    Gota  
    OP
       212 天前
    @soap520 你这里把 close(chData) 放到 for 循环之后执行, 那 for 循环就永远结束不了了.
    realpg
        24
    realpg  
       212 天前
    没有
    1.21 自己瞎鸡儿改 xml excel 功能都用不了,被迫退回 1.20
    Gota
        25
    Gota  
    OP
       212 天前
    @realpg 升到 1.21.1 试试呢? 一般我都等大版本之后的一个小版本才开始正式用, 最开始那个版本确实容易出一些小问题.
    soap520
        26
    soap520  
       212 天前
    @Gota 确实,我把 stop 改成这样是不是就可以了。
    ```
    func stop() {
    close(chQuit)

    for {
    select {
    case n := <-chData:
    _ = n
    default:
    close(chData)
    return
    }
    }
    }
    ```
    realpg
        27
    realpg  
       212 天前
    @Gota #25
    之前说了,下个小版本会合并修复
    还不是 golang 自己修的,excel 那个库大佬给提的 pr
    没有特别频繁检查版本的习惯,过两天再说
    Gota
        28
    Gota  
    OP
       212 天前
    @soap520 那就剩下 #22 里你自己提到的那个 panic 问题了. 这里的用例是 slog 的 hook, 所以 submit 可能会在任意线程中被调用, 数量和时机都是没办法控制的, 也就是说 submit 里那个过了 if 之后的挂起其实很容易触发.
    soap520
        29
    soap520  
       212 天前
    @Gota 明白了,那我把 stop 最后 close(chData)去掉是不是就行了。去掉之后看起来和你 1L 的方法就差不多了,只是没有重新给 chData 赋值(我也不清楚 slog hook 的用例里需不需要再给 chData 一个 channel )。
    如果要很“完美”的话,我除了弄一个锁把 submit 里的 read chDone, enqueue data 保护起来之外想不到更好的办法了。
    Gota
        30
    Gota  
    OP
       212 天前
    @soap520 哈哈, 异步相关的东西确实比较烧脑. 1L 重新赋值一个无缓冲 channel 是为了防止 stop 之后有数据进入 chData 却没人来处理, 随着主线程退出这份数据就丢掉了. 至于加锁, 不到万不得已最好别加, 否则每调一次 Log 整个应用都被锁一下, 就有点夸张了.
    nuk
        31
    nuk  
       212 天前
    显然需要一个临界区,close channel 的时候要等待其他所有线程离开临界区,不管用 channel 还是 lock 来实现代价都蛮大,单个 bool 或者单个 channel close 都不可能实现,感觉 stop 就直接 drop 掉 log 会比较容易简单。
    Gota
        32
    Gota  
    OP
       211 天前 via iPhone
    @nuk 直接丢数据肯定不行,正文末尾的写法也不用等其他线程呀。
    nuk
        33
    nuk  
       211 天前
    @Gota 但是这个也不能保证 close channel 之后没有线程去写啊。。
    Gota
        34
    Gota  
    OP
       211 天前 via iPhone
    @nuk 没有 close ,是换成了一个无缓冲的 channel ,从而确保能 select 到第一个 case 。
    mapleray
        35
    mapleray  
       209 天前
    @Gota #30 如果不想加锁,又不想丢日志,只能想办法把关闭操作抛给使用者了。

    目前使用方式 uber-go/atomic.Bool + 缓冲 channel ,退出时等待 channel 清理完才退出。
    paceewang1
        36
    paceewang1  
       203 天前
    这个场景,可以用乐观锁吧,atomic ,CAS
    Jrue0011
        37
    Jrue0011  
       198 天前
    读写锁的场景?
    stop 写锁更新状态
    submit 读锁判断状态
    Gota
        38
    Gota  
    OP
       198 天前
    @paceewang1 #36
    @Jrue0011 #37
    见 #30 的回复,帖子末尾的写法是不需要锁的
    paceewang1
        39
    paceewang1  
       194 天前
    @Gota CAS 也不是在 submit 里面加锁,我是指在 stop 里面加锁然后转换状态;相当于引入一个状态机而已,submit 只需要加一个状态判断就可以了,看了一下就和#13 的代码大致一样吧,但是 stop 方法先处理 chData 再关闭:
    ```
    func stop() {
    if ok := CAS(chQuit); !ok {
    // return error
    }

    for n := range chData {
    // process data
    _ = n
    }

    close(chData)
    }
    ```
    Gota
        40
    Gota  
    OP
       194 天前 via iPhone
    @paceewang1 见#23 楼和后续的回复,还是有点问题
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   917 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 22:30 · PVG 06:30 · LAX 15:30 · JFK 18:30
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.