请教一个并发设计问题

2023-05-08 07:37:55 +08:00
 swqslwl
我在做一个监测流量的项目。每秒会从数据源中获取 1w 条 json 格式的流量信息,我希望对这些流量进行分析,但是现在会出现丢数据的情况。

我的做法是
1.接受到数据后先传入 channelA
2.启动一个协程循环从 A 中读取数据存入切片 B
3.另起一个协程处理切片 B 的数据,同时在处理业务时利用 mutex 锁住 B

实际调试中发现,mutex 的次数会影响数据的丢失量
请问我这样设计是否有问题,是否会导致丢数据
4107 次点击
所在节点    Go 编程语言
39 条回复
leonshaw
2023-05-08 11:24:30 +08:00
conn 是什么协议?
把加放锁和处理数据的位置再标一下。
rrfeng
2023-05-08 11:37:29 +08:00
channel 里读 N 条出来直接处理掉,不要用切片缓存 /交互数据,就没这个问题了。

这个切片设计的根本没什么道理。
pkoukk
2023-05-08 11:39:19 +08:00
没理由 append B 加锁了还能丢数据啊
你可能丢数据的地方在 err := json.Unmarshal([]byte(<-A), &fs)
oldshensheep
2023-05-08 11:53:53 +08:00
看你最终的代码感觉没什么问题。

建议写个可以复现的 demo ,之前我也是出 bug ,感觉是用的第三方的库的问题。后来写了个可以复现的 demo ,发现是我代码的问题。

我有很多莫名其妙的 bug 都是在写 demo 的时候发现代码真正错误的地方。

比如说你这个代码,里面有网络连接,写数据库啥的,都给简化了,最终就是纯粹的逻辑代码,慢慢调试就发现问题了。
而且也方便别人运行调试。
ns09005264
2023-05-08 11:55:40 +08:00
handleData 里加锁处理数据,但是 txData 里 append 却没有加锁,
所以当 handleData 正在处理数据的时候,txData 还在往里面 append 数据,
等 handleData 处理完,清空了 B ,txData 在 handleData 处理数据的过程中所添加的数据也就被清除了。
没有给写入加锁只给读取加锁,等于没加锁。

另外你想用 handleData 异步处理数据,但是如果在 txData 里给 append 加锁,其实就等于同步处理数据了,没什么意义。考虑在 txData 里对数据进行分块或按时间进行分块,再将分块的数据传给 handleData ,连锁都不用。
8355
2023-05-08 12:08:29 +08:00
我的理解 handleData 这里完全没必要 也没必要用锁
可以把写库代码直接放到 appendB = append(B,fs) 位置执行
其次 db 本身是支持并发写库的,这里加锁意义不大,加了锁也都是在等待锁反而更慢
leonshaw
2023-05-08 13:47:13 +08:00
检查一下发送端的返回值。
如上面所说的,这样实现并没有并发。如果处理能力大于上游,同步处理就行;如果小于上游,最终结果就是一个协程在处理,一个在等锁,一个在等 channel 缓冲空间。
reliefe
2023-05-08 14:25:00 +08:00
这个问题根本应该在于多个线程操作同一个切片导致的,这里就会有很大不确定性。我问了 GPT-4 ,它给了很好的建议,把 B 换成 chan 而不是切片试试
```
var A = make(chan string, 1048576)
var B = make(chan flowStatistic, 1048576) // 使用带缓冲的 channel 而非切片
...

func txData() {
for {
var fs flowStatistic
err := json.Unmarshal([]byte(<-A), &fs)
...
B <- fs // 将 fs 传递给 handleData
}
}

func handleData() {
var buffer []flowStatistic
timer := time.NewTimer(5 * time.Second)

for {
select {
case fs := <-B:
buffer = append(buffer, fs)
case <-timer.C:
// 处理 buffer 中的数据
...
buffer = make([]flowStatistic, 0)
timer.Reset(5 * time.Second)
}
}
}
```
完整回复: https://flowus.cn/share/533684c0-2869-4507-8375-297103f09c77
PS: 顺便一提在我的小站就可以随时用 GPT-4 了, liaobots.com
quzard
2023-05-08 14:39:20 +08:00
```go
var fmutex = sync.Mutex{}
var A = make(chan string, 1048576)
var B = sync.Pool{
New: func() interface{} {
return make([]flowStatistic, 0, 10000)
},
}

func foo(){
go getData()
go txData()
go handleData()
}

// 接收数据
func getData(){
for {
// ...
data := conn.Read()
A<-data
}
}


func txData() {
for {
var fs flowStatistic
err := json.Unmarshal([]byte(<-A), &fs)
// ...

fmutex.Lock()
currB := B.Get().([]flowStatistic)
currB = append(currB, fs)
B.Put(currB)
fmutex.Unlock()
}
}

func handleData() {
for {
time.Sleep(5 * time.Second)
fmutex.Lock()
currB := B.Get().([]flowStatistic)

// 进行数据聚合和存储操作
// ...
// 清空 B
currB = currB[:0]
B.Put(currB)
fmutex.Unlock()
}
}

```
quzard
2023-05-08 14:40:52 +08:00
@quzard #29 怎么发送后格式就乱了呢
Anivial
2023-05-08 15:21:25 +08:00
感觉可以换一种思路,通过 time.Ticker 和 select 来代替锁保证缓存数据不会被互相抢占影响
for {
select {
case data := <-A:
...
B = append(B,fs)
case t := <-ticker.C: // ticker := time.NewTicker(5 * time.Second)
// 聚合处理数据
process(B)

// 清空 B 保留容量
B = B[:0:cap(B)]
}
}
piaodazhu
2023-05-08 15:28:16 +08:00
在楼主给的第二份代码其实也没有解决上面我提的那个问题,因为 goroutine1 在等待 goroutine2 放锁的时候,它栈里面的变量 B 就是旧的 B (底层指针不会变成你清空后新赋值的指针),所以 goroutine2 的清空操作 goroutine1 在这一次执行中是不可见的。

试试这样修改?
```
var fmutex = sync.Mutex{}
var A = make(chan string, 1048576)
var B_array = make([]flowStatistic,0) // <------
var B = &B // <------

func foo(){
go getData()
go txData()
go handleData()
}

//接受数据
func getData(){
for {
...
addr, err := net.ResolveUnixAddr("unixgram", sock)
conn, err := net.ListenUnixgram("unixgram", addr)
data := conn.ReadFromUnix()
A<-data
}
}

func txData(){
for{
var fs flowStatistic
err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析 A 传过来的数据
...
fmutex.Lock()
*B = append(*B,fs) // <------
fmutex.Unlock()
}
}

func handleData(){
//这里每 5 秒钟对 B 中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住 B ,处理完后清空 B 中数据并解锁
for{
time.Sleep(5 * time.Second)
fmutex.Lock()
...
*B = make([]flowStatistic,0) // <------
fmutex.Unlock()
}
}
```

感觉大概率是这里的问题
piaodazhu
2023-05-08 15:43:16 +08:00
@piaodazhu 不好意思看错了,B 不在栈上,上面这个请忽略。。。

另外,在 handleData()里面,可以在加锁之后:
fmutex.Lock()
tmp := B
B = make([]flowStatistic, 0)
fmutex.Unlock()
... // processing tmp

可以减少加锁时间,看不能减少或者消除数据丢失?
PythonYXY
2023-05-08 16:05:37 +08:00
数据量也不小了,感觉还是上 Flink 吧,基于滚动窗口+RocksDB 状态后端做实时分析。
picone
2023-05-08 16:44:18 +08:00
感觉代码没有问题,但是有些能优化的地方,可以改成无锁化
```go
func txData() {
ticker := time.NewTicker()
for {
select {
case <- ticker.C:
go func() // report your data
B = make()
case evt <- A:
B = append(B, evet)
case <-ctx.Done():
return
}
}
}
```
liuxu
2023-05-08 17:19:19 +08:00
第二条附言的代码应该没问题了,golang 所有基础类型都不是线程安全的,txData()在不断自动扩容 B ,而 handleData()拿到的是旧指针,处理完旧指针的数据清空新 B 指针,导致了旧指针和新 B 指针这段时间 append()的数据丢失

第一个附言等于没锁,handleData()内部没有线程安全问题,是单线程的,竞态出在 txData()的 append()和 handleData()的 B = make([]flowStatistic,0)之间
ccde8259
2023-05-08 20:33:03 +08:00
这种地方 mutex 写 slice 不如写 chan……
doraf
2023-05-09 09:43:00 +08:00
如果还有问题,能不能试试 atomic.Value 来存取 B 。
txData 和 handleData 之间,能不能使用 chan 来传递 flowStatistic 。
5 秒处理一次的话,在 txData 缓存数据,每 5 秒调用一次 go handleData 行不行(传递缓存数据给 handleData ),不知道语义还对不对。
要不要考虑考虑 kafka 、flink 这种。
xurh
2023-05-09 11:03:17 +08:00
我之前做爬虫收集数据也遇到过类似的问题,把数据聚合进行批量插入减少 io 。

我采用的 chan ,然后启动一个协程监听 chan ,当收集一定数量的数据或者时间满足,就把数据写入 db

```go

type DBWriter[T any] struct {
Size int
Interval time.Duration
done chan struct{}
ch chan T
insertDB func([]T, int)
}

func (w *DBWriter[T]) Start() {
ticker := time.NewTicker(w.Interval)
records := make([]T, 0, w.Size)
insert := func() {
if len(records) == 0 {
return
}
w.insertDB(records, w.Size)
records = make([]T, 0, w.Size)
}

for {
select {
case <-w.done:
insert()
return

case <-ticker.C:
insert()

case data := <-w.ch:
records = append(records, data)

if len(records) == w.Size {
insert()
}
}
}
}

```

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/938125

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX