V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
zhady009
V2EX  ›  Java

用 BlockingQueue 出现了一个无法解释的问题

  •  
  •   zhady009 · 2018-07-18 23:26:06 +08:00 · 2672 次点击
    这是一个创建于 2353 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想试用一下阻塞列队 做了个生产者和消费者的 demo 预期结果就是相互交替执行也就是 生产一个之后,消费一个

    不允许连续生产或者连续消费

    但是如果不让生产线程 sleep 就会无法实现交替执行的效果 我是没想到是什么原因

    public static void main(String[] args) throws IOException {
    
            BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);
    
            Producer p1 = new Producer(bq);
            p1.setName("producer01");
            Customer c1 = new Customer(bq);
            c1.setName("customer01");
            p1.start();
            c1.start();
        }
    
    public class Producer extends Thread {
    
        private BlockingQueue<Integer> bq;
        public Producer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    bq.put(produce());
                    Thread.sleep(0,1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        private Integer produce() {
            Integer number = (new Random().nextInt(100));
            System.out.println(getName() + ":produced =====> " + number);
            return number;
        }
    }
    
    public class Customer extends Thread {
    
        private BlockingQueue<Integer> bq;
    
        public Customer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    consume();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void consume() throws InterruptedException {
            System.out.println(getName() + ":consumed:" + bq.take());
        }
    }
    
    20 条回复    2018-07-19 22:13:26 +08:00
    watzds
        1
    watzds  
       2018-07-19 00:07:35 +08:00 via Android
    什么叫交替执行?看输出不准吧
    chocotan
        2
    chocotan  
       2018-07-19 00:09:02 +08:00
    Thread.sleep(0,1) 实际上是 sleep 了 1ms 吧
    bq.take()耗时小于 1ms,所以看起来是交替执行
    去掉 sleep 之后,bq.take()拿到数据比循环到下一个 produce()时要慢,所以看起来不是交替执行
    zhady009
        3
    zhady009  
    OP
       2018-07-19 00:11:11 +08:00
    producer01:produced =====> 63
    customer01:consumed:63
    producer01:produced =====> 70
    customer01:consumed:70
    producer01:produced =====> 16
    customer01:consumed:16
    producer01:produced =====> 25
    customer01:consumed:25

    像这样的如果不加 sleep 会如下,

    producer01:produced =====> 70
    producer01:produced =====> 16
    customer01:consumed:70
    customer01:consumed:16
    producer01:produced =====> 25
    customer01:consumed:25
    sagaxu
        4
    sagaxu  
       2018-07-19 00:15:32 +08:00 via Android
    take 和 put 是交替执行的,但 println 不是
    zhady009
        5
    zhady009  
    OP
       2018-07-19 00:18:41 +08:00
    Thread.sleep(0,1) 是一纳秒吧 Thread.sleep(1)才是 1 毫秒

    put 方法如果队列满了,将阻塞当前线程
    take 方法列队为空,将阻塞当前线程
    chocotan
        6
    chocotan  
       2018-07-19 00:20:36 +08:00
    @zhady009 你看一下这个方法的源码

    ```
    if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
    millis++;
    }
    sleep(millis);
    ```
    zhady009
        7
    zhady009  
    OP
       2018-07-19 00:21:05 +08:00
    那如何让
    System.out.println(getName() + ":consumed:" + bq.take());
    变成原子性
    lcorange
        8
    lcorange  
       2018-07-19 00:21:48 +08:00
    比如这句 System.out.println(getName() + ":consumed:" + bq.take());
    可以保证一定是 bq.take()之后,生产者才能 bq.put(),这个可以保证顺序
    但是外层的 System.out.println 函数你是无法保证他一定会紧接着 bq.take()后面执行,拖延到生产者 sysout 后也是有可能的
    zhady009
        9
    zhady009  
    OP
       2018-07-19 00:22:27 +08:00
    @chocotan
    没注意..确实是 1ms
    pwrliang
        10
    pwrliang  
       2018-07-19 00:26:25 +08:00
    我一开始也是认为 sysout 的问题,但是我统计了调用序列,也是交替的啊。
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {

    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq) {
    this.bq = bq;
    this.seq = seq;
    }

    @Override
    public void run() {
    while (true) {
    try {
    bq.put(produce());
    // Thread.sleep(0,1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq) {
    this.bq = bq;
    this.seq = seq;
    }

    @Override
    public void run() {
    while (true) {
    try {
    consume();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + bq.take());
    System.out.flush();
    }
    }





    ----------------------------------------


    seq:2producer01:produced =====> 44
    seq:3producer01:produced =====> 97
    seq:1customer01:consumed:44
    seq:4customer01:consumed:97
    seq:5producer01:produced =====> 19
    seq:7producer01:produced =====> 88
    seq:6customer01:consumed:19
    seq:8producer01:produced =====> 90
    seq:9customer01:consumed:88
    seq:10producer01:produced =====> 93
    seq:11customer01:consumed:90
    seq:12producer01:produced =====> 40
    zhady009
        11
    zhady009  
    OP
       2018-07-19 00:27:21 +08:00
    @lcorange 但是我试了一下把 consume 方法弄成同步方法也不管用..
    lcorange
        12
    lcorange  
       2018-07-19 00:38:30 +08:00   ❤️ 1
    @zhady009 这个是无解的,除非整个函数都包上锁,这时这个队列就变得毫无疑义了

    如果按照命令的顺序拆分,生产者分成 P,消费者分成 C

    P1 print number
    P2 bq.put(number)
    P3 print number
    P4 bq.put(number)
    P5 print number
    P6 bq.put(number)

    C1 bq.take()
    C2 print number
    C3 bq.take()
    C4 print number
    C5 bq.take()
    C6 print number

    当按照以下顺序执行的时候
    P1 P2 P3 C1 C2 P4 C3 C4 ...就会出现你所说的两条日志
    其实内部的 P2 C1 P4 C3 还是保证了两边的顺序的
    cheneydog
        13
    cheneydog  
       2018-07-19 00:41:36 +08:00
    我觉得是打印输出的问题,队列本身应该没问题,只是两个线程共用一个输出流 System.out ,结果无法控制。
    lcorange
        14
    lcorange  
       2018-07-19 00:43:22 +08:00
    @pwrliang AtomicInteger LinkedBlockingQueue 只保证调用这两个对象的函数时能够保证原子性,但是整个 product 和 consume 函数上没有这样的锁,所以执行顺序是不能保证的
    zhady009
        15
    zhady009  
    OP
       2018-07-19 00:49:16 +08:00
    @lcorange 懂了一半 另外一半不懂的是为什么 sleep 之后就可以达到预期结果
    lcorange
        16
    lcorange  
       2018-07-19 00:50:54 +08:00
    @zhady009 只是运气好加系统负载不大,sleep 的时间里让 print 函数有机会执行,加大负载,长时间测试一样会出现这个现象
    sagaxu
        17
    sagaxu  
       2018-07-19 09:50:50 +08:00 via Android
    @zhady009 因为 sleep 改变了占空比,cpu 大部分时间是空闲的,错开了你的两组操作。试想一下,往平底锅里,同时扔 8 个鸡蛋,鸡蛋之间一定会有碰撞,发生空间的争抢,但是同时扔 8 粒芝麻,很大概率是散落不碰撞的。
    reus
        18
    reus  
       2018-07-19 12:42:42 +08:00
    线程是并发执行的,当然不能保证交替执行。
    pwrliang
        19
    pwrliang  
       2018-07-19 22:12:15 +08:00
    这回可以了,要保证 put+sysout, take+sysout 是原子性的,只能加个全局锁。

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {
    Lock lock = new ReentrantLock();
    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq,lock);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq,lock);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    Lock lock;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if(bq.size()==1)continue;
    lock.lock();
    bq.put(produce());
    lock.unlock();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    Lock lock;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if (bq.size()==0)continue;
    lock.lock();
    consume();
    lock.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int tk = bq.take();
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
    System.out.flush();
    }
    }

    -------------------------------------------------

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {
    Lock lock = new ReentrantLock();
    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq,lock);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq,lock);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    Lock lock;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if(bq.size()==1)continue;
    lock.lock();
    bq.put(produce());
    lock.unlock();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    Lock lock;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if (bq.size()==0)continue;
    lock.lock();
    consume();
    lock.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int tk = bq.take();
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
    System.out.flush();
    }
    }
    pwrliang
        20
    pwrliang  
       2018-07-19 22:13:26 +08:00
    @pwrliang 刚刚结果粘贴错了
    --------------------------------------------------
    seq:1producer01:produced =====> 45
    seq:2customer01:consumed:45
    seq:3producer01:produced =====> 20
    seq:4customer01:consumed:20
    seq:5producer01:produced =====> 78
    seq:6customer01:consumed:78
    seq:7producer01:produced =====> 45
    seq:8customer01:consumed:45
    seq:9producer01:produced =====> 90
    seq:10customer01:consumed:90
    seq:11producer01:produced =====> 57
    seq:12customer01:consumed:57
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1312 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 23:38 · PVG 07:38 · LAX 15:38 · JFK 18:38
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.