一条面试题引发的思考--浅谈 Java 公平锁与内存模型

2019-03-21 15:52:51 +08:00
 samray

博客原文 一条面试题引发的思考--浅谈 Java 公平锁与内存模型

前言

春天来了,春招还会远么? 又到了春招的季节,随之而来的是各种的面试题。今天就看到组内大佬面试实习生的一道 Java 题目:

编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

掉进坑里

出于好奇的心态,我花了点时间来尝试解决这个问题, 主要的难点是让线程顺序地如何顺序地输出,线程之间如何交换。很快就按着思路写出了一个版本,用 Lock 来控制线程的顺序,A,B,C 线程依次启动,因为 A 线程先启动,所以 A 线程会最先拿到锁,B,C 阻塞;但是 A 输出完字符串,释放锁,B 线程获得锁,C,A 线程阻塞; 依此循环:

public void Test(){
    private static Integer index = 0;

    Lock lock = new ReentrantLock();
	
	@Test
    public void testLock(){
        Thread threadA = work(i -> i % 3 == 0, () -> System.out.println("A"));
        Thread threadB = work(i -> i % 3 == 1, () -> System.out.println("B"));
        Thread threadC = work(i -> i % 3 == 2, () -> System.out.println("C"));
        threadA.start();
        threadB.start();
        threadC.start();
    }

    private Thread work(Predicate<Integer> condition, Runnable function) {
        return new Thread(() -> {
            while (index < 30) {
                lock.lock();
                if (condition.test(index)) {
                    function.run();
                    index++;
                }
                lock.unlock();
            }
        });
    }
}

输入结果如我预期那般,ABCABC 交替输出,也成功输出了 10 次,奇怪的是 A,B 却多输出了一次?

为什么会多输出一次,不是应该恰好是输出 30 次么, 为什么会多输出一次 A,B 真的百思不得其解. 所以我把index 也打印出来查看, 结果相当奇怪:

...
function.run();
System.out.println(index);
....

为什么 A 会是 30, B 会是 31, 不是有(index.intvalue<30) 的条件判断么, 为什么还会出现这样的数据?灵异事件?

解惑

灵异事件自然是不存在的,仔细分析了一番代码之后,发现了问题:

while (index.intValue() < 30) {  // 1
    lock.lock(); // 2
    if (condition.test(index.intValue())) {
        function.run();
        index++;
    }
    lock.unlock();
}

将 1,2 行的操作做了这三件事,如下:

  1. 线程读取 index 的值
  2. 比较 index 的值是否大于 30
  3. 如果小于 30, 尝试获取锁

换言之,当 index=29 时,线程 C 持有锁,但是锁只能阻止线程 A,线程 B 修改 index 的值,并不能阻止线程 A, 线程 B 在获取锁之前读取 index 的值,所以线程 A 读取 index=29, 并把值保持到线程的内部,如下图:

当线程 C 执行完,还没释放锁的时候,线程 A 的 index 值为 29 ;当线程 C 释放锁,线程 A 获取锁,进入同步块的时候,因为 Java 内存模型有内存可见性的要求, 兼之 Lock 的实现类实现了内存可见,所以线程 A 的 index 值会变成 30, 这就解析了为什么线程 A index=30 的时候能跳过(index.intValue<30)的判断条件,因为执行这个判断条件的时候线程 A index=29, 进入同步块之后变成了 30:

CPU 缓存

为什么每个线程都会持有一个 index 值呢?来看看下面的分级缓存图:

 _______________    ______________  
 |     CPU 1     |  |     CPU 2    |  
 |   _________   |  |   _________  |  
 |  | Level 1 |  |  |  | Level 1 | |  
 |  |   Cache |  |  |  |  Cache  | |  
 |  |         |  |  |  |         | |
 |  |_________|  |  |  |_________| |  
 |_______________|  |______________|
           | |              | |
           | |              | |
          _|_|______________|_|__
         |                       |
         |      MAIN MEMORY      | 
         |_______________________|

众所周知,在计算机的存储体系中,分布式存储系统比硬盘慢,硬盘比内存跑得慢,内存比 Cpu L3 level Cache 跑得慢,L3 Cache 比 L2 Cache 等等。空间越大,速度越慢,速度越快,空间越小,本质上就是空间与时间的取舍。例如,为了提高效率,就会把预计即将用到的变量index从主存缓存到 CPU 缓存,而 CPU 有多个核心,每个核心都缓存变量index。前文的问题本质就是 CPU 缓存的index与主存index不一致,而内存可见性说的就是强制 CPU 从主存获取变量index, 从而规避缓存不一致的问题

解决方案

把问题剖析清楚之后,解决方案就呼之欲出了:

while (index.intValue() < 30) {  // 1
    lock.lock(); // 2
	if(index>=30){
		continue;
	}
    if (condition.test(index.intValue())) {
        function.run();
        index++;
    }
    lock.unlock();
}

这种解决方法不禁让我想起单例模式里面的双重校验:

public static Singleton getSingleton() {
    if (instance == null) {                         //Single Checked
        synchronized (Singleton.class) {
            if (instance == null) {                 //Double Checked
                instance = new Singleton();
            }
        }
    }
    return instance ;
}

只是当时并不清楚 Double Checked 的作用,究竟解决了什么问题?只是知道不加这条语句就会造成初始化多个示例,的确是需要知其然知其所以然.

公平锁与非公平锁

前文说到,

这个程序是用 Lock 来控制线程的顺序,A,B,C 线程依次启动,因为 A 线程先启动,所以 A 线程会最先拿到锁,B,C 阻塞;但是 A 输出完字符串,释放锁,B 线程获得锁,C,A 线程阻塞; 依此循环。

粗看似乎没什么问题, 但是这里是存在着一个问题: 当线程 A 释放锁的时候,获取锁的是否一定是线程 B, 而不是线程 C, 线程 C 是否能够"插队"抢占锁? 这个就涉及到了公平锁和非公平锁的定义了: 公平锁: 线程 C 不能抢占,只能排队等待线程 B 获取并释放锁 非公平锁:线程 C 能抢占,抢到锁之后线程 B 只能继续等(有点惨!)

而 ReentrantLock 默认恰好是非公平锁, 查看源码可知:

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

因此为了规避非公平锁抢占的问题, 上述的代码在同步块增加了判断条件:

 if (condition.test(index.intValue())) {
   ....
 }

只有符合条件的线程才能进行操作,否则就是线程自旋.(但是加锁+自旋实现起来,效率不会太高效!)

使用公平锁

如果使用公平锁,也可以不需要上述这样的判断条件,直接让线程顺序排队和唤醒: 通过让ReentrantLock成为公平锁. 方法也很简单, 只需要构造参数加上一个 boolean 值:

/**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

公平锁改造之后的代码如下:

public void Test(){
    private static Integer index = 0;
    Lock lock = new ReentrantLock(true);
  @Test
    public void testLock(){
        Thread threadA = work(() -> System.out.println("A"));
        Thread threadB = work(() -> System.out.println("B"));
        Thread threadC = work(() -> System.out.println("C"));
        threadA.setName("threadA");
        threadA.start();
        threadB.setName("threadB");
        threadB.start();
        threadC.setName("threadC");
        threadC.start();
        System.out.println();
    }

    private Thread work(Runnable function) {
        return new Thread(() -> {
            while (index.intValue() < 30) {
                lock.lock();
                if (index >= 30) {
                    continue;
                }
                function.run();
                index++;
                lock.unlock();
            }
        });
    }
}

后记

组内路过的 @碳素大佬看到我在研究这道问题的时候,给出了不一样的解决方案: 使用 AtomicInteger 作为控制手段,循环三十次,线程 A 在index%3==0时输出,线程 B 在index%3==1时输出, 线程 C 在index%3==2时输出, 不加锁, 不符合条件的线程作自旋。根据思路整理代码如下:

public void Test(){
      private static AtomicInteger index = new AtomicInteger(0);
   @Test
    public void testLock(){
        Thread threadA = work(i -> i % 3 == 0, () -> System.out.println("A"));
        Thread threadB = work(i -> i % 3 == 1, () -> System.out.println("B"));
        Thread threadC = work(i -> i % 3 == 2, () -> System.out.println("C"));
        threadA.setName("threadA");
        threadA.start();
        threadB.setName("threadB");
        threadB.start();
        threadC.setName("threadC");
        threadC.start();
        System.out.println();
    }

    private Thread work(Predicate<Integer> condition, Runnable function) {
        return new Thread(() -> {
            while (index.intValue() < 30) {
                if (condition.test(index.intValue())) {
                    function.run();
                    index.incrementAndGet();
                }
            }
        });
    }
}

就这样,把前文所有存在的问题都完美规避,实现还很优雅,效率目测也比加锁的方式高,碳总牛🍺!!!

5766 次点击
所在节点    程序员
55 条回复
zealot0630
2019-03-21 18:44:37 +08:00
16 楼理解正确,题目考察唤醒的效率,把题目中的 3 个线程换成 10000 个线程试试,使用 10000 个 cv 能精确唤醒下一个需要被唤醒的线程,而不是 10000 个线程去抢一把锁
lihongjie0209
2019-03-21 18:53:59 +08:00
@hhhsuan 还需要一个状态可以使 A 开始运行, 不然就是死锁了
zealot0630
2019-03-21 19:03:14 +08:00
@lihongjie0209 不需要,cv 本身是无状态的,你去找 example 看,cv.wait 是包在一个判断的 while 里面,由那个判断来控制状态
zealot0630
2019-03-21 19:04:14 +08:00
就是说,除了 cv 外,你还需要一个变量来记录,下面轮到谁了,先修改这个变量,再 signal 对应的 cv
zealot0630
2019-03-21 19:05:21 +08:00
而且这两个操作必须是原子的,解释了为什么 signal 只能在持有锁时候做
lihongjie0209
2019-03-21 19:07:55 +08:00
@zealot0630 cv 没有问题, 关键是 A 要特殊处理一下, 不然 A 在等待 C 而不是第一次直接运行, 那么就死锁了
zealot0630
2019-03-21 19:11:06 +08:00
@lihongjie0209 不会的,代码这样写:while(not A's turn) cv_a.wait(); A 第一次进来时候 A's turn 为真,根本不会进入等待
petelin
2019-03-21 19:11:25 +08:00
https://gist.github.com/Petelin/25ac1f93aaa5d9605bfdfccbeb729079


@lihongjie0209 java 太啰嗦了, 入口函数去通知 A 启动就可以了
@zealot0630 这样写可以吗? signal 好像要等, 对应的 wait 的时候才可以生效(我就偷懒利用 goroutine 启动完的时间差)
zealot0630
2019-03-21 19:13:18 +08:00
@lihongjie0209 看了你 20 楼的代码,说真的 也很难给你及格分。最严重的错误就是你需要把 lock/unlock 放到 while 外面。因为 cv.wait 时候会释放锁
petelin
2019-03-21 19:14:28 +08:00
@petelin 没说清楚, 就是必须要 condition 调用了 wait 之后在调用 singal 才能解除阻塞, 提前调用 singal 是不行的
zealot0630
2019-03-21 19:15:09 +08:00
@petelin 不及格。看一下我 29 楼说的。并且,假唤醒了解一下,在仔细看两遍示例代码。
lihongjie0209
2019-03-21 19:19:31 +08:00
@zealot0630

javadoc 不是这么说的

参考: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html
```
* <p>In all cases, before this method can return the current thread must
* re-acquire the lock associated with this condition. When the
* thread returns it is <em>guaranteed</em> to hold this lock.

```
zealot0630
2019-03-21 19:25:05 +08:00
@lihongjie0209 在 cv.wait 等待过程中,锁会被释放. 在等待成功后,会重新获取锁. 你再仔细理解一下.
lihongjie0209
2019-03-21 19:29:26 +08:00
@zealot0630
javadoc 给的 demo, 看起来没问题啊

```
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}


```
exonuclease
2019-03-21 19:33:44 +08:00
手痒来个 c++版本的。。。
#include <thread>
#include <atomic>
#include <vector>

using namespace std;

atomic_int counter = 0;
const vector<char> letters = { 'A','B','C' };
void worker(int target) {
int local_counter = 0;
while (local_counter < 10)
{
if (counter.load() % 3 == target) {
cout << letters[target] << endl;
++counter;
++local_counter;
}
}
}

int main()
{
thread a(worker, 0);
thread b(worker, 1);
thread c(worker, 2);
a.join();
b.join();
c.join();
return 0;
}
zealot0630
2019-03-21 19:56:27 +08:00
来抄答案吧,lock 放在 while 是效率考虑,虽然放在里面也可以,但是你上一个 unlock 紧接着下一次循环的 lock,你觉得这种代码有意义么?我们期望在 cv.wait 处等待,而不是在 rl.lock 地方等待

https://gist.github.com/kghost/585cb0d2f1c66ebc7d9af3d0a4fd8a42

https://scastie.scala-lang.org/k5ijcVnoTTGXtwLWqJQa2A
xrlin
2019-03-21 20:20:31 +08:00
我来个 go 的好了

```go
func main() {
channels := make([]chan bool, 3)
nextChannel := make(chan bool)

assets := [...]string{"A", "B", "C"}
for i := 0; i < len(channels); i++ {
channels[i] = make(chan bool)
go func(i int) {
for {

<-channels[i]
fmt.Print(assets[i])
nextChannel <- true

}
}(i)
}

for i := 0; i < 30; i++ {
channels[i%len(channels)] <- true
<-nextChannel
}

}
```
rwdy2008
2019-03-21 21:02:57 +08:00
rwdy2008
2019-03-21 21:07:49 +08:00
```
package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
fmt.Println("Start...")

var x int32
done := false
lock := new(sync.Mutex)
cond := sync.NewCond(lock)

cond.L.Lock()

go func() {
for {
if x%3 == 0 {
fmt.Println("A", x)
atomic.AddInt32(&x, 1)
if x > 3*10-3 {
return
}
}
}
}()

go func() {
for {
if x%3 == 1 {
fmt.Println("B", x)
atomic.AddInt32(&x, 1)
if x > 3*10-2 {
return
}
}
}
}()

go func() {
for {
if x%3 == 2 {
fmt.Println("C", x)
atomic.AddInt32(&x, 1)
if x > 3*10-1 {
done = true
cond.Signal()
}
}
}
}()

for !done {
cond.Wait()
}

return
}
```
xrlin
2019-03-21 23:09:53 +08:00
@rwdy2008 #39 你的代码有问题,在 GOMAXPROCS 过小(比如 1 )时会导致阻塞,goroutine 执行是不确定的,但是你的 goroutine 里都是死循环,且没有等待信号量或者 channel。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/547045

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX