线程安全问题,求大佬解惑

2022-12-14 23:07:53 +08:00
 yusheng88

背景

我想实现一个合并请求工具类,思路:

  1. 把请求放入请求队列,阻塞当前线程 t1 [ Locksupport.park ]
  2. 线程 t2 收集一个批次的请求,提交到线程池
  3. 线程 tn 执行批量处理,逐个设置返回值,然后逐个唤醒阻塞线程[t1...] [ Locksupport.unpark(t1)]

遇到的问题

测试时发现,打印每个线程的返回值时,偶尔会遇到返回值为 null 。

源码

传递的请求封装对象:

class BizTask {
    private Thread thread;
    private Object param;
    private Object response;

    public Object getParam() {
        return param;
    }

    public void setParam(Object param) {
        this.param = param;
    }

    public Object getResponse() {
        return response;
    }

    public void setResponse(Object response) {
        this.response = response;
    }

    public Thread getThread() {
        return thread;
    }

    public void setThread(Thread thread) {
        this.thread = thread;
    }
}

工具类:

public class BatchHelper {

    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());
    private final BlockingQueue<BizTask> requestQueue = new LinkedBlockingQueue<>();
    private final Consumer<List<BizTask>> function;

    public BatchHelper(Consumer<List<BizTask>> function) {
        this.function = function;
        threadPool.execute(this::autoDispatch);
    }

    public Object take(Object param) {
        BizTask task = new BizTask();
        task.setParam(param);
        task.setThread(Thread.currentThread());
        // synchronized (task) {
        try {
            requestQueue.put(task);
        } catch (InterruptedException ignored) {
        }
        // 阻塞
        LockSupport.park();
        // }
        return task.getResponse();
    }

    public void autoDispatch() {
        while (true) {
            try {
                BizTask t1 = requestQueue.take();
                List<BizTask> tasks = new ArrayList<>(128);
                tasks.add(t1);
                BizTask t2 = requestQueue.poll();
                if (t2 == null) {
                    dispatch(tasks);
                    continue;
                }
                int sum = 1;
                while (t2 != null) {
                    tasks.add(t2);
                    sum = (sum + 1) & 127;
                    if (sum == 0) {
                        dispatch(tasks);
                        tasks = new ArrayList<>(128);
                    }
                    t2 = requestQueue.poll();
                }
                if (sum > 0) {
                    dispatch(tasks);
                }
                Thread.sleep(10);
            } catch (InterruptedException ignored) {
            }
        }
    }

    private void dispatch(List<BizTask> list) {
        threadPool.execute(() -> {
            // 批量处理,设置返回值
            function.accept(list);
            for (BizTask task : list) {
                // synchronized (task) {
                Thread lock = task.getThread();
                // 唤醒
                LockSupport.unpark(lock);
                // }
            }
        });
    }
}

测试案例:

public class TestDemo {
    public static void main(String[] args) {
        BatchHelper batchHelper = new BatchHelper((tasks) -> {
            for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
                BizTask task = tasks.get(i);
                // System.out.println("requestId = " + requestId);
                task.setResponse(i);
            }
        });

        // 模拟请求线程
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 20, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            threadPool.execute(() -> {
                Object take = batchHelper.take(finalI + "");
                if (take == null) {
                    System.out.println("take = " + take);
                }
            });
        }


    }
}
2183 次点击
所在节点    程序员
10 条回复
yusheng88
2022-12-14 23:11:37 +08:00
为什么会出现返回值为 null?
TylerYY
2022-12-15 09:04:40 +08:00
是不是可见性问题呢?线程 tn 设置返回值后,upark 阻塞的线程,唤醒的线程不一定能立即看到设置的返回值?
给 BizTask 的 response 加一个 volatile 修饰试下
yusheng88
2022-12-15 09:07:29 +08:00
@TylerYY 这个尝试过了,仍然会出现 take=null
senninha
2022-12-15 11:21:09 +08:00
可能的原因:
LockSupport 的 permit 提前被设置了,这时候调用 park 会直接返回,resp 肯定就是 null 了,也就是 park 与 unpark 调用不对称?难道是 LinkedBlockingQueue 有问题?
看文档还有这三种情况 park 会直接返回:
Some other thread invokes unpark with the current thread as the target; or
Some other thread interrupts the current thread; or
The call spuriously (that is, for no reason) returns.
yusheng88
2022-12-15 14:16:58 +08:00
@senninha 这就是我觉得奇怪的地方,设置值在 unpark 前,获取 take 前会阻塞,无法理解为什么会出现 take=null 。我尝试过打印执行次数,次数是正确的
oldshensheep
2022-12-15 14:57:40 +08:00
应该是因为这个 Spurious wakeup
类似的问题
https://stackoverflow.com/questions/67118821/futuretask-get-method-may-distable-locksupport-park
https://stackoverflow.com/questions/1050592/do-spurious-wakeups-in-java-actually-happen
我简化了楼主的代码逻辑

if (task.data == null) {
System.out.println(task);
System.out.println(task);
}
里面的 56-58 行代码输出有时是这样的
Task{id=3678, data='null', thread=Thread[pool-1-thread-3,5,main]}
Task{id=3678, data='3678 FINISHED.', thread=Thread[pool-1-thread-3,5,main]}
应该是提取被唤醒了……
https://gist.github.com/oldshensheep/034044093ce9608ee3d02d7629c2bf81
GloryJie
2022-12-15 15:13:33 +08:00
在执行 setResponse 之前打印时间 A ,take == null 的时候时间 B 。得出 B < A 的,还没执行前,线程就被唤醒唤醒了。感觉是楼上说的 spuriously 的原因
senninha
2022-12-15 15:36:52 +08:00
@yusheng88 楼下有说到 spuriously (that is, for no reason) returns 。看 unlock 的说明也是需要 re-check condition 的。
Callers should re-check the conditions which caused the thread to park in the first place.
yusheng88
2022-12-15 15:41:25 +08:00
@oldshensheep 感谢大佬,就是这个原因了,看注释,没理解 spuriously 调用是啥 0.0
strayerxx
2022-12-21 15:14:15 +08:00
我在想 LinkedBlockingQueue 底层好像也是使用 park 和 unpark 而且也都是对当前线程操作,会不会是相互之间产生了影响

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/902578

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX