有人能帮我看下我这么用 PoolingHttpClientConnectionManager 是线程安全的吗?

2017-09-10 23:11:05 +08:00
 hheedat

没写过 Java,心里不顶真;我看了网上的资料应该 ok,试了下也没发现什么问题;要并发的去发送一些请求,用到了连接池,我这样应该是线程安全的吧?

public class MyRequest {

    private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MyRequest.class);
    private final String REQUEST_URL;

    private ExecutorService executorService;
    private Queue<Map<String, String>> tasks;

    private static PoolingHttpClientConnectionManager cm; //<-----
    private static CloseableHttpClient httpClient;

    private MyRequest(int taskQueueSize, int executorCount, String requestURL, Map<String, Object> connConfig) {
        this.tasks = new ArrayBlockingQueue<>(taskQueueSize);
        this.executorService = Executors.newFixedThreadPool(executorCount);
        REQUEST_URL = requestURL;

        String proxyHost = connConfig.get("proxyHost").toString();
        int proxyPort = Integer.parseInt(connConfig.get("proxyPort").toString());

        cm = new PoolingHttpClientConnectionManager();
        cm.setMaxTotal(Integer.parseInt(connConfig.get("maxTotal").toString()));
        cm.setDefaultMaxPerRoute(Integer.parseInt(connConfig.get("defaultMaxPerRoute").toString()));

        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(Integer.parseInt(connConfig.get("connectTimeout").toString()))
                .setSocketTimeout(Integer.parseInt(connConfig.get("socketTimeout").toString()))
                .setConnectionRequestTimeout(Integer.parseInt(connConfig.get("cxxRxxTxout").toString()))
                .build();

        HttpClientBuilder httpClientBuilder = HttpClients.custom()
                .setConnectionManager(cm)
                .setDefaultRequestConfig(config);

        if (!proxyHost.equals("") && 0 != proxyPort) {
            httpClient = httpClientBuilder.setProxy(new HttpHost(proxyHost, proxyPort)).build();
        } else {
            httpClient = httpClientBuilder.build();
        }
    }

    private void addTask(Map<String, String> parameters) {
        tasks.offer(parameters);
    }

    private void flush() {
        List<Future> futures = this.tasks.stream()
                .map(this::delegate)
                .collect(Collectors.toList());

        futures.forEach((f) -> {
            try {
                f.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

        this.tasks.clear();
    }

    private Future delegate(Map<String, String> parameters) {
        return this.executorService.submit(() -> {
            doRequest(parameters, REQUEST_URL);
        });
    }

    private void doRequest(Map<String, String> parameters, String url) {
        CloseableHttpResponse resp = null; 
        HttpGet get = null; 
        try {
            URIBuilder builder = new URIBuilder(url);
            builder.addParameter("foo", parameters.get("bar"));

            get = new HttpGet(builder.build()); //<-----
            resp = httpClient.execute(get); //<-----

            if (resp.getStatusLine().getStatusCode() != 200) {
                LOGGER.warn("xxx");
            } else {
                LOGGER.info("xxx");
            }

            resp.close();

        } catch (URISyntaxException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } catch (ClientProtocolException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } catch (IOException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } finally {
            if (resp != null) {
                try {
                    resp.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void shutdown() {
        this.executorService.shutdown();
    }

    public static void main(String args[]) throws IOException {

        String logPath = System.getProperty("readLogPath");
        String requestURL = System.getProperty("requestURL");
        int taskQueueSize = Integer.valueOf(System.getProperty("max.requests", "2000"));
        int executorCount = Integer.valueOf(System.getProperty("num.executors", "100"));
        int interval = Integer.valueOf(System.getProperty("max.interval", "500"));
        String proxyHost = System.getProperty("proxyHost", "");
        int proxyPort = Integer.parseInt(System.getProperty("proxyInt", "0"));

        int maxTotal = Integer.parseInt(System.getProperty("maxTotal", "5000"));
        int defaultMaxPerRoute = Integer.parseInt(System.getProperty("defaultMaxPerRoute", "1000"));
        int connectTimeout = Integer.parseInt(System.getProperty("connectTimeout", "1000"));
        int socketTimeout = Integer.parseInt(System.getProperty("socketTimeout", "3000"));
        int connectionRequestTimeout = Integer.parseInt(System.getProperty("connectionRequestTimeout", "3000"));

        Map<String, Object> connConfig = new HashMap<>();
        connConfig.put("proxyHost", proxyHost);
        connConfig.put("proxyPort", proxyPort);
        connConfig.put("maxTotal", maxTotal);
        connConfig.put("defaultMaxPerRoute", defaultMaxPerRoute);
        connConfig.put("connectTimeout", connectTimeout);
        connConfig.put("socketTimeout", socketTimeout);
        connConfig.put("connectionRequestTimeout", connectionRequestTimeout);

        try {

            MyRequest syncLog = new MyRequest(taskQueueSize, executorCount, requestURL, connConfig);

            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(logPath);
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));

            try {

                String line;
                int size = 0;
                long startTs = System.currentTimeMillis();
                line = br.readLine();

                while (line != null) {
                    JsonElement root = new JsonParser().parse(line);
                    Map<String, String> parameters = new HashMap<>();
                    JsonObject rootJson = root.getAsJsonObject();

                    for (Map.Entry entry : rootJson.entrySet()) {
                        parameters.put(entry.getKey().toString(),
                                rootJson.get(entry.getKey().toString()).getAsString());
                    }

                    syncLog.addTask(parameters);
                    ++size;

                    if (size >= taskQueueSize || (System.currentTimeMillis() - startTs) > interval) {
                        syncLog.flush();
                        size = 0;
                        startTs = System.currentTimeMillis();
                    }

                    line = br.readLine();
                }

                if (0 != size) {
                    syncLog.flush();
                }

            } catch (IOException e) {

                LOGGER.error("xxx" + e.getMessage());

            } finally {

                br.close();

            }

            syncLog.shutdown();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
3976 次点击
所在节点    Java
2 条回复
hand515
2017-09-11 07:49:47 +08:00
doRequest 的 resp.close()重复了吧,既然你都在 finally 里 close 了
hand515
2017-09-11 07:56:27 +08:00
还有就是 ArrayBlockingQueue 的用法有问题,你这用法还不如直接用 List 批量提交。
如果想用到 Queue,那直接开 N 个线程作为消费者线程。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/389624

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX