一条面试题引发的思考 Go 版本

2019-04-07 09:42:15 +08:00
 bwangel

说明

前两天看到了这个帖子,https://www.v2ex.com/t/547045#reply53 感觉其中描述的问题很有意思,我就用 Go 把它提到的解决方案都实现了一遍,文章链接:一条面试题引发的关于 Go 语言中锁的思考,还请大家多多指教!

~~~正文分割线~~~

一条面试题引发的关于 Go 语言中锁的思考

前言

前两天在 V2EX 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。

问题描述及一个错误解法

问题如下:

编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

一个错误解法如下:

     1	package main
     2	
     3	import (
     4		"fmt"
     5		"sync"
     6	)
     7	
     8	var mu sync.Mutex
     9	var index int
    10	var endSignal = make(chan struct{})
    11	
    12	func echoRoutine(routineIndex int, routineName string) {
    13		for index < 30 {
    14			mu.Lock()
    15			if index%3 == routineIndex {
    16				fmt.Println(index, routineName)
    17				index++
    18			}
    19			mu.Unlock()
    20		}
    21	
    22		endSignal <- struct{}{}
    23	}
    24	
    25	func main() {
    26		routineNames := []string{"A", "B", "C"}
    27	
    28		for idx, name := range routineNames {
    29			go echoRoutine(idx, name)
    30		}
    31	
    32		for _ = range routineNames {
    33			<-endSignal
    34		}
    35		//Output:
    36		//0 A
    37		//1 B
    38		//2 C
    39		//3 A
    40		//4 B
    41		//5 C
    42		//6 A
    43		//7 B
    44		//8 C
    45		//9 A
    46		//10 B
    47		//11 C
    48		//12 A
    49		//13 B
    50		//14 C
    51		//15 A
    52		//16 B
    53		//17 C
    54		//18 A
    55		//19 B
    56		//20 C
    57		//21 A
    58		//22 B
    59		//23 C
    60		//24 A
    61		//25 B
    62		//26 C
    63		//27 A
    64		//28 B
    65		//29 C
    66		//30 A
    67		//31 B
    68	}

从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30 没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。

Go 语言的内存模型

首先说什么是 Go 的内存模型,在官方文档中是这样定义的:

The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.

Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。

Happens Before 原则

在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2,另一个 Goroutine 可能会在a被赋值之前先观察到b被赋值了。

为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2 同时发生。

在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v的读取事件r,可以观察到变量v的写入事件w

  1. r没有在w之前发生。
  2. w之后,r之前,没有另外一个变量v的写入事件w'发生。

总的来说,在单个 Goroutine 的环境下,读事件r会观察到最近的一个写事件w

在多个 Goroutine 的运行环境下,如果需要让v的读取事件r观察到其写入事件w。需要满足更严格的条件:

  1. w发生在r之前
  2. 任何针对共享变量v的写入事件都发生在w之前或者r之后。

因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w必须在r之前发生,两者不能同时发生,且它们之间没有其他的写事件w'。 因此在多个 Goroutine 访问一个共享变量v的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。

此外,还需要注意的是:

  1. 在变量v发生写事件之前,它被初始化成对应类型的零值。
  2. 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。

正确答案 V1

了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。

理解了这个错误之后,我们可以把代码稍微改一下,将index < 30这个比较操作也置于锁的保护中,就能够得到正确的结果了。

package main

import (
	"fmt"
	"sync"
)

var i int
var mu sync.Mutex
var endSignal = make(chan struct{})

func threadPrint(threadNum int, threadName string) {
	for {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			break
		}

		if i%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			i++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name)
	}

	for _ = range names {
		<-endSignal
	}
}

正确答案 V2 -- 公平锁

上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的, 但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum)。它就会将锁释放,然后让大家重新再来抢。

这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。

为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。

package main

import (
	"fmt"
	"log"
	"sync"
)

type FailLock struct {
	mu        *sync.Mutex
	cond      *sync.Cond
	holdCount int
}

func NewFailLock() sync.Locker {
	mu := new(sync.Mutex)
	cond := sync.NewCond(mu)

	return &FailLock{
		holdCount: 0,
		mu:        mu,
		cond:      cond,
	}
}

func (fl *FailLock) Lock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	fl.holdCount++
	if fl.holdCount == 1 {
		return
	}

	fl.cond.Wait()
}

func (fl *FailLock) Unlock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if fl.holdCount == 0 {
		log.Fatal("unlock of UnLocked mutex")
	}

	fl.holdCount--
	if fl.holdCount != 0 {
		fl.cond.Signal()
	}
}

var (
	end = make(chan struct{})
	i   int
)

func threadPrint(threadNum int, threadName string, mu sync.Locker) {
	for i < 30 {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			continue
		}
		if i < 3 && i%3 != threadNum {
			mu.Unlock()
			continue
		}

		fmt.Printf("%d: %s\n", i, threadName)
		i += 1
		mu.Unlock()
	}
	end <- struct{}{}
}

func main() {
	mu := NewFailLock()
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name, mu)
	}

	for _ = range names {
		<-end
	}
}

上述代码中需要注意两点:

  1. 由于 Goroutine 无法保证启动顺序,即我们无法保证最开始上锁的顺序是A,B,C这样的顺序,所以需要在64~67行加一个判断,程序刚开始执行时如果获得的锁不对,就不执行任何操作,重新获得锁。
  2. 由于可见性的原因,需要在60~63行上锁之后加一个判断,保证i的值是最新的值。

正确答案 V3 -- AtomicInt

除了自己手动加锁外,我们也可以使用 Go 的 atomic 包中提供的原子操作来完成上述功能。 每个 Goroutine 原子性地获得i的值,如果符合i % 3 == threadNum的条件,则执行操作,否则作自旋。代码如下:

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	end = make(chan struct{})
	i   int32
)

func threadPrint(threadNum int32, threadName string) {

	for {
		v := atomic.LoadInt32((*int32)(&i))
		if v >= 30 {
			break
		}

		if v%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			atomic.AddInt32((*int32)(&i), int32(1))
		} else {
			continue
		}
	}
	end <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(int32(idx), name)
	}

	for _ = range names {
		<-end
	}
}

参考链接

9342 次点击
所在节点    Go 编程语言
83 条回复
myself659
2019-04-07 23:36:52 +08:00
@tairan2006
看上一楼 gist 流式处理
tairan2006
2019-04-07 23:48:31 +08:00
@myself659 Emmm …虽然思路是大体一样的,还是建议你看一下我 42 楼写的代码…因为你这个编程风格可能会让面试官感觉有点不满。。
ethego
2019-04-08 00:38:56 +08:00
@georgetso 不懂你说暴力是什么意思,用锁的正确写法就是这样。
cokeboL
2019-04-08 02:06:47 +08:00
package main

import (
"fmt"
"github.com/temprory/util"
"sync"
)

func main() {
wg := sync.WaitGroup{}
cl := util.NewCorsLink("test", 10, 3)
for i := 0; i < 30; i++ {
idx := i
wg.Add(1)
cl.Go(func(task *util.LinkTask) {
defer wg.Done()

task.WaitPre()
if idx%3 == 0 {
fmt.Println("A")
} else if idx%3 == 1 {
fmt.Println("B")
} else {
fmt.Println("C")
}

task.Done(nil)
})
}
wg.Wait()
cl.Stop()
}
ShayneWang
2019-04-08 09:21:58 +08:00
插眼
findTheWay
2019-04-08 09:43:54 +08:00
技术贴
georgetso
2019-04-08 11:02:59 +08:00
@ethego 略有异议. 该方案的确实现了既定目标, 但 cpu 空转有些过分了. 简单任务你这么写没关系, 如果是计算密集型系统, 这么写是过不了 review 的.
Gea
2019-04-08 11:23:39 +08:00
仔细看一下
ethego
2019-04-08 14:25:52 +08:00
@georgetso 什么简单不简单任务,题目的要求都清晰地摆在这里了,就不要去假设什么需求。另外这道题用原子操作并不会比上锁省多少时间,别意淫,你写出来做 benchmark 就知道了。
ethego
2019-04-08 14:33:05 +08:00
@georgetso 另外你还要再学习一个。我虽然写了 for {} 但是并没有什么 CPU 空转,mutex 没有抢到锁会阻塞等待释放,这是基础知识。
georgetso
2019-04-08 16:17:48 +08:00
@ethego 这倒是有趣了. 该学习的我虚心学习, 不过我尝试在
mu.RLock()
这一句上面添加
counter++ // (先定义一个全局变量 var counter = 0)
fmt.Println(counter)
然后发现打印了 500 多行. 最后几行是:
521
522
523
524
486
29 C
526
525
518

如果 cpu 没有空转, 那不应该跑 500 多句 println 吧?
请赐教!
georgetso
2019-04-08 16:21:38 +08:00
@ethego 我认真学习一个后, 认为贵码能完成任务, 主要还是靠关键的三句
mu.Lock()
index++
mu.Unlock()
放在了 if 中. 这个 if 没有 else, 不过如果你把我上面回帖中的 ++ 和 print 放在这个 else 里, 就会发现 mutex 其实被抢到了 500 多次, 毕竟这是“读写锁”, 这是“基础知识”.
再请赐教!
hheedat
2019-04-08 16:27:38 +08:00
@ethego 抢到的情况不就空转了
exonuclease
2019-04-08 17:46:38 +08:00
我当时实现的 c++版本是每个线程用自己的计数器来判断是否退出的
atomic_int counter = 0;
const vector<char> letters = { 'A','B','C' };
void worker(int target) {
int local_counter = 0;
while (local_counter < 10)
{
if (counter.load() % 3 == target) {
cout << letters[target] << endl;
++counter;
++local_counter;
}
}
}

int main()
{
thread a(worker, 0);
thread b(worker, 1);
thread c(worker, 2);
a.join();
b.join();
c.join();
return 0;
}
ethego
2019-04-08 18:52:29 +08:00
@georgetso 当然我有误解你说的空转。不过对于这道题,你说的那点抢占开销比什么公平锁快多了。你自己动手 benchmark 试下就知道了,为了解决抢占的开销远比抢占本身大。
hheedat
2019-04-12 15:59:46 +08:00
楼主,V2 公平锁这里有个错误,贴出我的一次执行结果


```
0: A
1: B
2: C
3: B
4: A
5: C
6: B
7: A
8: C
9: B
10: A
11: C
12: B
13: A
14: C
15: B
16: A
17: C
18: B
19: A
20: C
21: B
22: A
23: C
24: B
25: A
26: C
27: B
28: A
29: C
```


原因在于 ```if i < 3 && i%3 != threadNum {``` 这里



应该把 i<3 这个条件去掉,你这里加这个是为了保证第一轮按照 ABC 输出,所以只在 i<3 的时候校验了,但是后面的也应该全部校验。因为即使没有收到条件变量的通知,调用其方法的 goroutine 也是有可能被唤醒的。
hheedat
2019-04-12 16:53:48 +08:00
func threadPrint(threadNum int, threadName string, mu sync.Locker) {
for i < 9 {
fmt.Println("......", threadNum, threadName, "S-1")
mu.Lock()
if i >= 9 {
mu.Unlock()
continue
}
if i < 3 && i%3 != threadNum {
fmt.Println("......", threadNum, threadName, "S-2")
mu.Unlock()
continue
}

fmt.Printf("%d: %s\n", i, threadName)
i += 1
fmt.Println("......", threadNum, threadName, "S-3")
mu.Unlock()
}
end <- struct{}{}
}

...... 0 A S-1
0: A
...... 0 A S-3
...... 0 A S-1
...... 0 A S-2
...... 0 A S-1
...... 0 A S-2
...... 2 C S-1
...... 1 B S-1
...... 2 C S-2
...... 2 C S-1
1: B
...... 1 B S-3
...... 1 B S-1
2: C
...... 2 C S-3
...... 2 C S-1
...... 0 A S-1
3: B
...... 1 B S-3
...... 1 B S-1
4: C
...... 2 C S-3
...... 2 C S-1
5: A
...... 0 A S-3
...... 0 A S-1
6: B
...... 1 B S-3
...... 1 B S-1
7: C
...... 2 C S-3
...... 2 C S-1
8: A
...... 0 A S-3


打印一些状态可以看出一些端倪
hheedat
2019-04-12 17:15:29 +08:00
https://golang.org/pkg/sync/#Cond.Wait
其实还有一个问题,把 sync.Wait 用 defer 这么封装是否会有问题? wait 在阻塞的时候会先解锁,唤醒的时候会先加锁,现在唤醒之后立马就释放锁了,相当于锁的是 fl.holdCount,而不是 i,这样可能会出问题吧,i++的时候
bwangel
2019-04-14 17:23:21 +08:00
@hheedat 你好,感谢你的回复,你反馈的问题确实存在。出现`BAC`的原因是这样的(为了格式更好看一些,我把内容写在了 Gist 上,希望你拥有 跨越长城 的能力):

https://gist.github.com/bwangelme/1d204647f4658007043f348a61f37936

你说的第二个问题,`i` 确实没有被 mutex 保护。但是由于每个 Goroutine 执行 `i++` 的时候都会首先获取 `holdCount` 的值,如果 holdCount 的值不为 1,那么这个 Goroutine 就会阻塞。所以可以确保同一时刻只会有一个 Goroutine 执行 `i++`
bwangel
2019-04-14 18:11:37 +08:00
@mengzhuo 老哥,默默地把你取消拉黑了。后来发现你吐槽的是对的,这个问题不能用锁解决。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/552620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX