一条面试题引发的思考 Go 版本

2019-04-07 09:42:15 +08:00
 bwangel

说明

前两天看到了这个帖子,https://www.v2ex.com/t/547045#reply53 感觉其中描述的问题很有意思,我就用 Go 把它提到的解决方案都实现了一遍,文章链接:一条面试题引发的关于 Go 语言中锁的思考,还请大家多多指教!

~~~正文分割线~~~

一条面试题引发的关于 Go 语言中锁的思考

前言

前两天在 V2EX 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。

问题描述及一个错误解法

问题如下:

编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

一个错误解法如下:

     1	package main
     2	
     3	import (
     4		"fmt"
     5		"sync"
     6	)
     7	
     8	var mu sync.Mutex
     9	var index int
    10	var endSignal = make(chan struct{})
    11	
    12	func echoRoutine(routineIndex int, routineName string) {
    13		for index < 30 {
    14			mu.Lock()
    15			if index%3 == routineIndex {
    16				fmt.Println(index, routineName)
    17				index++
    18			}
    19			mu.Unlock()
    20		}
    21	
    22		endSignal <- struct{}{}
    23	}
    24	
    25	func main() {
    26		routineNames := []string{"A", "B", "C"}
    27	
    28		for idx, name := range routineNames {
    29			go echoRoutine(idx, name)
    30		}
    31	
    32		for _ = range routineNames {
    33			<-endSignal
    34		}
    35		//Output:
    36		//0 A
    37		//1 B
    38		//2 C
    39		//3 A
    40		//4 B
    41		//5 C
    42		//6 A
    43		//7 B
    44		//8 C
    45		//9 A
    46		//10 B
    47		//11 C
    48		//12 A
    49		//13 B
    50		//14 C
    51		//15 A
    52		//16 B
    53		//17 C
    54		//18 A
    55		//19 B
    56		//20 C
    57		//21 A
    58		//22 B
    59		//23 C
    60		//24 A
    61		//25 B
    62		//26 C
    63		//27 A
    64		//28 B
    65		//29 C
    66		//30 A
    67		//31 B
    68	}

从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30 没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。

Go 语言的内存模型

首先说什么是 Go 的内存模型,在官方文档中是这样定义的:

The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.

Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。

Happens Before 原则

在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2,另一个 Goroutine 可能会在a被赋值之前先观察到b被赋值了。

为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2 同时发生。

在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v的读取事件r,可以观察到变量v的写入事件w

  1. r没有在w之前发生。
  2. w之后,r之前,没有另外一个变量v的写入事件w'发生。

总的来说,在单个 Goroutine 的环境下,读事件r会观察到最近的一个写事件w

在多个 Goroutine 的运行环境下,如果需要让v的读取事件r观察到其写入事件w。需要满足更严格的条件:

  1. w发生在r之前
  2. 任何针对共享变量v的写入事件都发生在w之前或者r之后。

因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w必须在r之前发生,两者不能同时发生,且它们之间没有其他的写事件w'。 因此在多个 Goroutine 访问一个共享变量v的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。

此外,还需要注意的是:

  1. 在变量v发生写事件之前,它被初始化成对应类型的零值。
  2. 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。

正确答案 V1

了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。

理解了这个错误之后,我们可以把代码稍微改一下,将index < 30这个比较操作也置于锁的保护中,就能够得到正确的结果了。

package main

import (
	"fmt"
	"sync"
)

var i int
var mu sync.Mutex
var endSignal = make(chan struct{})

func threadPrint(threadNum int, threadName string) {
	for {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			break
		}

		if i%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			i++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name)
	}

	for _ = range names {
		<-endSignal
	}
}

正确答案 V2 -- 公平锁

上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的, 但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum)。它就会将锁释放,然后让大家重新再来抢。

这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。

为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。

package main

import (
	"fmt"
	"log"
	"sync"
)

type FailLock struct {
	mu        *sync.Mutex
	cond      *sync.Cond
	holdCount int
}

func NewFailLock() sync.Locker {
	mu := new(sync.Mutex)
	cond := sync.NewCond(mu)

	return &FailLock{
		holdCount: 0,
		mu:        mu,
		cond:      cond,
	}
}

func (fl *FailLock) Lock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	fl.holdCount++
	if fl.holdCount == 1 {
		return
	}

	fl.cond.Wait()
}

func (fl *FailLock) Unlock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if fl.holdCount == 0 {
		log.Fatal("unlock of UnLocked mutex")
	}

	fl.holdCount--
	if fl.holdCount != 0 {
		fl.cond.Signal()
	}
}

var (
	end = make(chan struct{})
	i   int
)

func threadPrint(threadNum int, threadName string, mu sync.Locker) {
	for i < 30 {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			continue
		}
		if i < 3 && i%3 != threadNum {
			mu.Unlock()
			continue
		}

		fmt.Printf("%d: %s\n", i, threadName)
		i += 1
		mu.Unlock()
	}
	end <- struct{}{}
}

func main() {
	mu := NewFailLock()
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name, mu)
	}

	for _ = range names {
		<-end
	}
}

上述代码中需要注意两点:

  1. 由于 Goroutine 无法保证启动顺序,即我们无法保证最开始上锁的顺序是A,B,C这样的顺序,所以需要在64~67行加一个判断,程序刚开始执行时如果获得的锁不对,就不执行任何操作,重新获得锁。
  2. 由于可见性的原因,需要在60~63行上锁之后加一个判断,保证i的值是最新的值。

正确答案 V3 -- AtomicInt

除了自己手动加锁外,我们也可以使用 Go 的 atomic 包中提供的原子操作来完成上述功能。 每个 Goroutine 原子性地获得i的值,如果符合i % 3 == threadNum的条件,则执行操作,否则作自旋。代码如下:

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	end = make(chan struct{})
	i   int32
)

func threadPrint(threadNum int32, threadName string) {

	for {
		v := atomic.LoadInt32((*int32)(&i))
		if v >= 30 {
			break
		}

		if v%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			atomic.AddInt32((*int32)(&i), int32(1))
		} else {
			continue
		}
	}
	end <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(int32(idx), name)
	}

	for _ = range names {
		<-end
	}
}

参考链接

9323 次点击
所在节点    Go 编程语言
83 条回复
jadeity
2019-04-07 13:38:21 +08:00
@bwangel 用到并行就是为了减少阻塞,并行同步就是要阻塞。如何挑选阻塞的时机,如何更优雅更有效率的阻塞,不管怎么问,都是在加上更多限制的条件下解决这个问题。
liyunlong41
2019-04-07 13:43:06 +08:00
技术贴 赞一个 之前刚好看到了 go 不满足内存一致性模型,不理解,现在稍微理解了点
art2cat
2019-04-07 13:48:48 +08:00
写的很好,但是你确定不是 fairlock 吗?
whoisghost
2019-04-07 14:03:55 +08:00
@whoisghost 楼主呀,看下我这评论呀,解法 3 是不是有问题?
bwangel
2019-04-07 14:31:11 +08:00
@art2cat 哇,谢谢老哥提醒。尴尬了,手滑打错了。
lazyfighter
2019-04-07 14:52:17 +08:00
在 go 语言入门的时候我记得有本书写的是能用锁的时候尽量用锁,因为 channel 还是依赖锁进行实现的
reus
2019-04-07 15:01:13 +08:00
@hjc4869 像你这样写的话,go 也不需要信号量。所以没人提信号量。

<script src="https://gist.github.com/reusee/57f30d177a688365e78ee9a12dac4468.js"></script>
georgetso
2019-04-07 15:34:31 +08:00
@ngn999 这就有意思了, 我怎么觉得这个问题和信号量没啥关系呢? 能详细说说为什么是考信号量吗?
hjc4869
2019-04-07 16:21:46 +08:00
@reus 这就是把 channel 当 semaphore 用……
而且 channel 本身实现用了 mutex,mutex 底层又是 semaphore,一层层包上来不如直接用 semaphore 直观。
Coey
2019-04-07 16:22:56 +08:00
归根结底是个并行无序转为串行有序的问题,遇到这种问题我一般是觉得没什么意思
当你不真正需要并行的时候,就不要用并行
另外,我认为用 FanIn/FanOut 和其他的几个解法并不是完全等价的,原因在于它对标准输出做了串行
应该有人见过多线程环境写文件导致文件混乱的吧,噗
hjc4869
2019-04-07 16:39:30 +08:00
@georgetso 这个问题本质上是在实现同步,而不是互斥。同步强调的是做事的先后顺序而不是多线程访问资源的冲突。虽然二者是包含关系并且可以互相实现,但是如果这里第一眼看到题就只是想到用 lock/mutex 而不是 semaphore/channel 去实现,那么显然是对后者的应用不熟,面试要挂的……
Lpl
2019-04-07 16:56:11 +08:00
@lazyfighter 怕是在开玩笑吧?你写一个两个 goroutine 同步的程序,channel 比 mutex 快至少一倍。
georgetso
2019-04-07 17:00:04 +08:00
@hjc4869 我同意你的分析, 这不是关于互斥的问题而是关于 happens-before 的问题. 但似乎 semaphore 是为了做资源控制而不是顺序同步吧?
hjc4869
2019-04-07 17:13:34 +08:00
@georgetso Semaphore 是一种同步的手段,可以用来做很多事情。你说的资源控制是一种用途(限制 N 个并发访问),我说的用来等待 /唤醒 thread 也是一个用途。
georgetso
2019-04-07 17:18:58 +08:00
@hjc4869 是的, 等待唤醒是这个题目的点. 我会用 condition_variable, 逻辑上更合理.
whoisghost
2019-04-07 17:31:05 +08:00
鉴于提示楼主“正确”解法 3 有问题两次被无视,为了避免其他人被楼主误导,本人只好替楼主修正所谓的“正确”解法 3:
Lpl
2019-04-07 17:59:23 +08:00
感觉我这个版本会更加简洁一点,本质上就是一个同步的问题。另外,mutex 和 channel 之间性能其实是有很大差别的,golang 的并发通信模型是“通信同步”( CSP )。mutex 就是单纯的一个互斥锁。

https://gist.github.com/penglongli/47395e7e75472a7da92787f3eb8e947d

```
var (
num = 10
)

func main() {
sig := make(chan int)
c1 := make(chan string)
c2 := make(chan string)
c3 := make(chan string)

var wg sync.WaitGroup
wg.Add(3)

go print(&wg, c1, "A")
go print(&wg, c2, "B")
go print(&wg, c3, "C")
go func() {
for {
select {
case <-sig:
// 检测到 sig 信号后退出 goroutine,避免出现 Deadlock
return
case str := <- c1: {
fmt.Println(str)
str = <- c2
fmt.Println(str)
str = <- c3
fmt.Println(str)
}
}
}
}()
wg.Wait()
sig <- 1
}

func print(wg *sync.WaitGroup, c chan string, str string) {
defer wg.Done()

for i := 0; i < num; i++ {
c <- str
}
}
```
Lpl
2019-04-07 18:02:26 +08:00
这道题目有好多种解法,我又想到了那个很“经典”的睡眠排序了,用在这里很切题。。
29EtwXn6t5wgM3fD
2019-04-07 18:15:32 +08:00
经典的使用信号量实现前驱关系的题目啊。
gamexg
2019-04-07 18:19:11 +08:00
@whoisghost #36 即使加了 gosched 面试时也够呛打及格分数。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/552620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX