我想实现一个合并请求工具类,思路:
测试时发现,打印每个线程的返回值时,偶尔会遇到返回值为 null 。
传递的请求封装对象:
class BizTask {
private Thread thread;
private Object param;
private Object response;
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
public Object getResponse() {
return response;
}
public void setResponse(Object response) {
this.response = response;
}
public Thread getThread() {
return thread;
}
public void setThread(Thread thread) {
this.thread = thread;
}
}
工具类:
public class BatchHelper {
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());
private final BlockingQueue<BizTask> requestQueue = new LinkedBlockingQueue<>();
private final Consumer<List<BizTask>> function;
public BatchHelper(Consumer<List<BizTask>> function) {
this.function = function;
threadPool.execute(this::autoDispatch);
}
public Object take(Object param) {
BizTask task = new BizTask();
task.setParam(param);
task.setThread(Thread.currentThread());
// synchronized (task) {
try {
requestQueue.put(task);
} catch (InterruptedException ignored) {
}
// 阻塞
LockSupport.park();
// }
return task.getResponse();
}
public void autoDispatch() {
while (true) {
try {
BizTask t1 = requestQueue.take();
List<BizTask> tasks = new ArrayList<>(128);
tasks.add(t1);
BizTask t2 = requestQueue.poll();
if (t2 == null) {
dispatch(tasks);
continue;
}
int sum = 1;
while (t2 != null) {
tasks.add(t2);
sum = (sum + 1) & 127;
if (sum == 0) {
dispatch(tasks);
tasks = new ArrayList<>(128);
}
t2 = requestQueue.poll();
}
if (sum > 0) {
dispatch(tasks);
}
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
private void dispatch(List<BizTask> list) {
threadPool.execute(() -> {
// 批量处理,设置返回值
function.accept(list);
for (BizTask task : list) {
// synchronized (task) {
Thread lock = task.getThread();
// 唤醒
LockSupport.unpark(lock);
// }
}
});
}
}
测试案例:
public class TestDemo {
public static void main(String[] args) {
BatchHelper batchHelper = new BatchHelper((tasks) -> {
for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
BizTask task = tasks.get(i);
// System.out.println("requestId = " + requestId);
task.setResponse(i);
}
});
// 模拟请求线程
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 20, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 100; i++) {
int finalI = i;
threadPool.execute(() -> {
Object take = batchHelper.take(finalI + "");
if (take == null) {
System.out.println("take = " + take);
}
});
}
}
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.