用 BlockingQueue 出现了一个无法解释的问题

2018-07-18 23:26:06 +08:00
 zhady009

想试用一下阻塞列队 做了个生产者和消费者的 demo 预期结果就是相互交替执行也就是 生产一个之后,消费一个

不允许连续生产或者连续消费

但是如果不让生产线程 sleep 就会无法实现交替执行的效果 我是没想到是什么原因

public static void main(String[] args) throws IOException {

        BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

        Producer p1 = new Producer(bq);
        p1.setName("producer01");
        Customer c1 = new Customer(bq);
        c1.setName("customer01");
        p1.start();
        c1.start();
    }

public class Producer extends Thread {

    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq) {
        this.bq = bq;
    }

    @Override
    public void run() {
        while (true) {
            try {
                bq.put(produce());
                Thread.sleep(0,1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    private Integer produce() {
        Integer number = (new Random().nextInt(100));
        System.out.println(getName() + ":produced =====> " + number);
        return number;
    }
}

public class Customer extends Thread {

    private BlockingQueue<Integer> bq;

    public Customer(BlockingQueue<Integer> bq) {
        this.bq = bq;
    }

    @Override
    public void run() {
        while (true) {
            try {
                consume();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void consume() throws InterruptedException {
        System.out.println(getName() + ":consumed:" + bq.take());
    }
}
2551 次点击
所在节点    Java
20 条回复
watzds
2018-07-19 00:07:35 +08:00
什么叫交替执行?看输出不准吧
chocotan
2018-07-19 00:09:02 +08:00
Thread.sleep(0,1) 实际上是 sleep 了 1ms 吧
bq.take()耗时小于 1ms,所以看起来是交替执行
去掉 sleep 之后,bq.take()拿到数据比循环到下一个 produce()时要慢,所以看起来不是交替执行
zhady009
2018-07-19 00:11:11 +08:00
producer01:produced =====> 63
customer01:consumed:63
producer01:produced =====> 70
customer01:consumed:70
producer01:produced =====> 16
customer01:consumed:16
producer01:produced =====> 25
customer01:consumed:25

像这样的如果不加 sleep 会如下,

producer01:produced =====> 70
producer01:produced =====> 16
customer01:consumed:70
customer01:consumed:16
producer01:produced =====> 25
customer01:consumed:25
sagaxu
2018-07-19 00:15:32 +08:00
take 和 put 是交替执行的,但 println 不是
zhady009
2018-07-19 00:18:41 +08:00
Thread.sleep(0,1) 是一纳秒吧 Thread.sleep(1)才是 1 毫秒

put 方法如果队列满了,将阻塞当前线程
take 方法列队为空,将阻塞当前线程
chocotan
2018-07-19 00:20:36 +08:00
@zhady009 你看一下这个方法的源码

```
if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
millis++;
}
sleep(millis);
```
zhady009
2018-07-19 00:21:05 +08:00
那如何让
System.out.println(getName() + ":consumed:" + bq.take());
变成原子性
lcorange
2018-07-19 00:21:48 +08:00
比如这句 System.out.println(getName() + ":consumed:" + bq.take());
可以保证一定是 bq.take()之后,生产者才能 bq.put(),这个可以保证顺序
但是外层的 System.out.println 函数你是无法保证他一定会紧接着 bq.take()后面执行,拖延到生产者 sysout 后也是有可能的
zhady009
2018-07-19 00:22:27 +08:00
@chocotan
没注意..确实是 1ms
pwrliang
2018-07-19 00:26:25 +08:00
我一开始也是认为 sysout 的问题,但是我统计了调用序列,也是交替的啊。
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {

public static void main(String[] args) throws IOException, InterruptedException {

BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

AtomicInteger seq =new AtomicInteger(0);

Producer p1 = new Producer(bq,seq);
p1.setName("producer01");
Customer c1 = new Customer(bq,seq);
c1.setName("customer01");
p1.start();
c1.start();
}

}

class Producer extends Thread {
AtomicInteger seq;
private BlockingQueue<Integer> bq;
public Producer(BlockingQueue<Integer> bq,AtomicInteger seq) {
this.bq = bq;
this.seq = seq;
}

@Override
public void run() {
while (true) {
try {
bq.put(produce());
// Thread.sleep(0,1);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

private Integer produce() {
Integer number = (new Random().nextInt(100));
int sid=seq.addAndGet(1);
System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
System.out.flush();
return number;
}
}

class Customer extends Thread {
AtomicInteger seq;
private BlockingQueue<Integer> bq;

public Customer(BlockingQueue<Integer> bq,AtomicInteger seq) {
this.bq = bq;
this.seq = seq;
}

@Override
public void run() {
while (true) {
try {
consume();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void consume() throws InterruptedException {
int sid=seq.addAndGet(1);
System.out.println("seq:" + sid + getName() + ":consumed:" + bq.take());
System.out.flush();
}
}





----------------------------------------


seq:2producer01:produced =====> 44
seq:3producer01:produced =====> 97
seq:1customer01:consumed:44
seq:4customer01:consumed:97
seq:5producer01:produced =====> 19
seq:7producer01:produced =====> 88
seq:6customer01:consumed:19
seq:8producer01:produced =====> 90
seq:9customer01:consumed:88
seq:10producer01:produced =====> 93
seq:11customer01:consumed:90
seq:12producer01:produced =====> 40
zhady009
2018-07-19 00:27:21 +08:00
@lcorange 但是我试了一下把 consume 方法弄成同步方法也不管用..
lcorange
2018-07-19 00:38:30 +08:00
@zhady009 这个是无解的,除非整个函数都包上锁,这时这个队列就变得毫无疑义了

如果按照命令的顺序拆分,生产者分成 P,消费者分成 C

P1 print number
P2 bq.put(number)
P3 print number
P4 bq.put(number)
P5 print number
P6 bq.put(number)

C1 bq.take()
C2 print number
C3 bq.take()
C4 print number
C5 bq.take()
C6 print number

当按照以下顺序执行的时候
P1 P2 P3 C1 C2 P4 C3 C4 ...就会出现你所说的两条日志
其实内部的 P2 C1 P4 C3 还是保证了两边的顺序的
cheneydog
2018-07-19 00:41:36 +08:00
我觉得是打印输出的问题,队列本身应该没问题,只是两个线程共用一个输出流 System.out ,结果无法控制。
lcorange
2018-07-19 00:43:22 +08:00
@pwrliang AtomicInteger LinkedBlockingQueue 只保证调用这两个对象的函数时能够保证原子性,但是整个 product 和 consume 函数上没有这样的锁,所以执行顺序是不能保证的
zhady009
2018-07-19 00:49:16 +08:00
@lcorange 懂了一半 另外一半不懂的是为什么 sleep 之后就可以达到预期结果
lcorange
2018-07-19 00:50:54 +08:00
@zhady009 只是运气好加系统负载不大,sleep 的时间里让 print 函数有机会执行,加大负载,长时间测试一样会出现这个现象
sagaxu
2018-07-19 09:50:50 +08:00
@zhady009 因为 sleep 改变了占空比,cpu 大部分时间是空闲的,错开了你的两组操作。试想一下,往平底锅里,同时扔 8 个鸡蛋,鸡蛋之间一定会有碰撞,发生空间的争抢,但是同时扔 8 粒芝麻,很大概率是散落不碰撞的。
reus
2018-07-19 12:42:42 +08:00
线程是并发执行的,当然不能保证交替执行。
pwrliang
2018-07-19 22:12:15 +08:00
这回可以了,要保证 put+sysout, take+sysout 是原子性的,只能加个全局锁。

public class Test {

public static void main(String[] args) throws IOException, InterruptedException {
Lock lock = new ReentrantLock();
BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

AtomicInteger seq =new AtomicInteger(0);

Producer p1 = new Producer(bq,seq,lock);
p1.setName("producer01");
Customer c1 = new Customer(bq,seq,lock);
c1.setName("customer01");
p1.start();
c1.start();
}

}

class Producer extends Thread {
AtomicInteger seq;
Lock lock;
private BlockingQueue<Integer> bq;
public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
this.bq = bq;
this.seq = seq;
this.lock = lock;
}

@Override
public void run() {
while (true) {
try {
if(bq.size()==1)continue;
lock.lock();
bq.put(produce());
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

private Integer produce() {
Integer number = (new Random().nextInt(100));
int sid=seq.addAndGet(1);
System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
System.out.flush();
return number;
}
}

class Customer extends Thread {
AtomicInteger seq;
private BlockingQueue<Integer> bq;
Lock lock;

public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
this.bq = bq;
this.seq = seq;
this.lock = lock;
}

@Override
public void run() {
while (true) {
try {
if (bq.size()==0)continue;
lock.lock();
consume();
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void consume() throws InterruptedException {
int tk = bq.take();
int sid=seq.addAndGet(1);
System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
System.out.flush();
}
}

-------------------------------------------------

public class Test {

public static void main(String[] args) throws IOException, InterruptedException {
Lock lock = new ReentrantLock();
BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

AtomicInteger seq =new AtomicInteger(0);

Producer p1 = new Producer(bq,seq,lock);
p1.setName("producer01");
Customer c1 = new Customer(bq,seq,lock);
c1.setName("customer01");
p1.start();
c1.start();
}

}

class Producer extends Thread {
AtomicInteger seq;
Lock lock;
private BlockingQueue<Integer> bq;
public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
this.bq = bq;
this.seq = seq;
this.lock = lock;
}

@Override
public void run() {
while (true) {
try {
if(bq.size()==1)continue;
lock.lock();
bq.put(produce());
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

private Integer produce() {
Integer number = (new Random().nextInt(100));
int sid=seq.addAndGet(1);
System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
System.out.flush();
return number;
}
}

class Customer extends Thread {
AtomicInteger seq;
private BlockingQueue<Integer> bq;
Lock lock;

public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
this.bq = bq;
this.seq = seq;
this.lock = lock;
}

@Override
public void run() {
while (true) {
try {
if (bq.size()==0)continue;
lock.lock();
consume();
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void consume() throws InterruptedException {
int tk = bq.take();
int sid=seq.addAndGet(1);
System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
System.out.flush();
}
}
pwrliang
2018-07-19 22:13:26 +08:00
@pwrliang 刚刚结果粘贴错了
--------------------------------------------------
seq:1producer01:produced =====> 45
seq:2customer01:consumed:45
seq:3producer01:produced =====> 20
seq:4customer01:consumed:20
seq:5producer01:produced =====> 78
seq:6customer01:consumed:78
seq:7producer01:produced =====> 45
seq:8customer01:consumed:45
seq:9producer01:produced =====> 90
seq:10customer01:consumed:90
seq:11producer01:produced =====> 57
seq:12customer01:consumed:57

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/472177

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX