yangyoupeng

对ESHTTPClinet做了修改,修复重复创建poolmanager的对象的性能问题,

添加time定时清除http的idle,expire的connect
...@@ -4,17 +4,24 @@ import java.io.IOException; ...@@ -4,17 +4,24 @@ import java.io.IOException;
4 import java.util.ArrayList; 4 import java.util.ArrayList;
5 import java.util.List; 5 import java.util.List;
6 import java.util.Map; 6 import java.util.Map;
7 +import java.util.Timer;
8 +import java.util.TimerTask;
9 +import java.util.concurrent.TimeUnit;
10 +import java.util.concurrent.atomic.AtomicReference;
7 11
8 import org.apache.http.HttpEntity; 12 import org.apache.http.HttpEntity;
9 import org.apache.http.HttpHost; 13 import org.apache.http.HttpHost;
10 import org.apache.http.HttpRequest; 14 import org.apache.http.HttpRequest;
11 import org.apache.http.ParseException; 15 import org.apache.http.ParseException;
16 +import org.apache.http.client.HttpClient;
12 import org.apache.http.client.methods.CloseableHttpResponse; 17 import org.apache.http.client.methods.CloseableHttpResponse;
13 import org.apache.http.client.methods.RequestBuilder; 18 import org.apache.http.client.methods.RequestBuilder;
14 import org.apache.http.entity.ContentType; 19 import org.apache.http.entity.ContentType;
15 import org.apache.http.entity.StringEntity; 20 import org.apache.http.entity.StringEntity;
16 import org.apache.http.impl.client.CloseableHttpClient; 21 import org.apache.http.impl.client.CloseableHttpClient;
17 import org.apache.http.util.EntityUtils; 22 import org.apache.http.util.EntityUtils;
23 +import org.slf4j.Logger;
24 +import org.slf4j.LoggerFactory;
18 25
19 import com.fasterxml.jackson.core.JsonParseException; 26 import com.fasterxml.jackson.core.JsonParseException;
20 import com.fasterxml.jackson.core.JsonProcessingException; 27 import com.fasterxml.jackson.core.JsonProcessingException;
...@@ -38,14 +45,38 @@ import com.zhaoonline.common.json.JsonUtils; ...@@ -38,14 +45,38 @@ import com.zhaoonline.common.json.JsonUtils;
38 * @author zhaoonline)yangyoupeng 45 * @author zhaoonline)yangyoupeng
39 */ 46 */
40 public class ESHttpClient { 47 public class ESHttpClient {
48 + private static final Logger LOG = LoggerFactory.getLogger(ESHttpClient.class);
41 private static final String PATH_SEPERATOR = "/"; 49 private static final String PATH_SEPERATOR = "/";
42 private String _index; 50 private String _index;
43 private String _type; 51 private String _type;
44 - private CloseableHttpClient httpClient; 52 + private static final AtomicReference<CloseableHttpClient> httpClient = new AtomicReference<CloseableHttpClient>();
53 + private static final Thread monitorThread=createMonitorThread();
45 private ESClientConfiguration config; 54 private ESClientConfiguration config;
46 private String path; 55 private String path;
47 private List<HttpHost> hostsList=new ArrayList<HttpHost>(); 56 private List<HttpHost> hostsList=new ArrayList<HttpHost>();
48 57
58 +
59 + private static final Timer CONNECTION_MANAGER_TIMER = new Timer(true);
60 +
61 + static {
62 + CONNECTION_MANAGER_TIMER.schedule(new TimerTask() {
63 + @Override
64 + public void run() {
65 + try {
66 + final CloseableHttpClient hc = httpClient.get();
67 + if (hc == null) return;
68 + hc.getConnectionManager().closeExpiredConnections();
69 + hc.getConnectionManager().closeIdleConnections(30, TimeUnit.SECONDS);
70 + } catch (Throwable t) {
71 + LOG.error("error closing expired connections", t);
72 + }
73 + }
74 + }, 30000, 5000);
75 + Runtime.getRuntime().addShutdownHook(monitorThread);
76 + }
77 +
78 +
79 +
49 public ESHttpClient(String index,String type,ESClientConfiguration config){ 80 public ESHttpClient(String index,String type,ESClientConfiguration config){
50 this._index=index; 81 this._index=index;
51 this._type=type; 82 this._type=type;
...@@ -53,15 +84,31 @@ public class ESHttpClient { ...@@ -53,15 +84,31 @@ public class ESHttpClient {
53 this.config=config; 84 this.config=config;
54 } 85 }
55 86
87 + private static Thread createMonitorThread() {
88 + Thread monitorThread =new Thread(new Runnable() {
89 + @Override
90 + public void run() {
91 + CONNECTION_MANAGER_TIMER.cancel();
92 + LOG.info("pool Manager check timer cancel running");
93 + }
94 + });
95 + return monitorThread;
96 + }
56 private void concatPathWithIndexType() { 97 private void concatPathWithIndexType() {
57 this.path=_index+PATH_SEPERATOR+_type; 98 this.path=_index+PATH_SEPERATOR+_type;
58 } 99 }
59 -
60 -
61 public void init(){ 100 public void init(){
62 - httpClient =HttpClientFactory.newClient(); 101 + getClient();
63 List<String> hostPosts=config.getHostPorts(); 102 List<String> hostPosts=config.getHostPorts();
64 hostsList=buildHostPortListFromString(hostPosts); 103 hostsList=buildHostPortListFromString(hostPosts);
104 +
105 + }
106 +
107 + private CloseableHttpClient getClient() {
108 + if(httpClient.get() ==null){
109 + httpClient.set(HttpClientFactory.newClient());
110 + }
111 + return httpClient.get();
65 } 112 }
66 113
67 114
...@@ -253,7 +300,7 @@ public class ESHttpClient { ...@@ -253,7 +300,7 @@ public class ESHttpClient {
253 CloseableHttpResponse response=null; 300 CloseableHttpResponse response=null;
254 for(HttpHost httpHost:hostsList){ 301 for(HttpHost httpHost:hostsList){
255 try { 302 try {
256 - response= httpClient.execute(httpHost, request); 303 + response= getClient().execute(httpHost, request);
257 if(response !=null){ 304 if(response !=null){
258 return response; 305 return response;
259 } 306 }
...@@ -265,7 +312,8 @@ public class ESHttpClient { ...@@ -265,7 +312,8 @@ public class ESHttpClient {
265 } 312 }
266 313
267 public void close(){ 314 public void close(){
268 - IOUtils.closeStream(httpClient); 315 + IOUtils.closeStream(getClient());
316 + httpClient.set(null);
269 } 317 }
270 318
271 public boolean deleteDoc(String id) throws JsonParseException, JsonMappingException, UnsupportedOperationException, IOException { 319 public boolean deleteDoc(String id) throws JsonParseException, JsonMappingException, UnsupportedOperationException, IOException {
......
1 package com.zhaoonline.common.es; 1 package com.zhaoonline.common.es;
2 2
3 +
3 import org.apache.http.client.config.RequestConfig; 4 import org.apache.http.client.config.RequestConfig;
4 import org.apache.http.conn.HttpClientConnectionManager; 5 import org.apache.http.conn.HttpClientConnectionManager;
5 import org.apache.http.impl.client.CloseableHttpClient; 6 import org.apache.http.impl.client.CloseableHttpClient;
...@@ -8,8 +9,8 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; ...@@ -8,8 +9,8 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
8 9
9 public class HttpClientFactory { 10 public class HttpClientFactory {
10 private static Integer SOCKET_TIMEOUT =10000; 11 private static Integer SOCKET_TIMEOUT =10000;
11 - private static Integer CONNECTION_TIMEOUT=2000; 12 + private static Integer CONNECTION_TIMEOUT=4000;
12 - 13 +
13 public static final CloseableHttpClient newClient() { 14 public static final CloseableHttpClient newClient() {
14 final HttpClientBuilder builder = HttpClientBuilder.create(); 15 final HttpClientBuilder builder = HttpClientBuilder.create();
15 builder.setConnectionManager(newConnectionManager()); 16 builder.setConnectionManager(newConnectionManager());
...@@ -21,7 +22,6 @@ public class HttpClientFactory { ...@@ -21,7 +22,6 @@ public class HttpClientFactory {
21 CloseableHttpClient httpclient = builder.build(); 22 CloseableHttpClient httpclient = builder.build();
22 return httpclient; 23 return httpclient;
23 } 24 }
24 -
25 public static final HttpClientConnectionManager newConnectionManager() { 25 public static final HttpClientConnectionManager newConnectionManager() {
26 // 默认支持http和https协议 26 // 默认支持http和https协议
27 final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); 27 final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
......
...@@ -33,17 +33,14 @@ public class TestESClient { ...@@ -33,17 +33,14 @@ public class TestESClient {
33 33
34 ESHttpClient client= new ESHttpClient(index,type,config); 34 ESHttpClient client= new ESHttpClient(index,type,config);
35 client.init(); 35 client.init();
36 - 36 + boolean result=false;
37 -
38 for(int i=0;i<100;i++){ 37 for(int i=0;i<100;i++){
39 Map object=new HashMap<>(); 38 Map object=new HashMap<>();
40 object.put("testKey"+i, "testValue"); 39 object.put("testKey"+i, "testValue");
41 String docString=JsonUtils.toJson(object); 40 String docString=JsonUtils.toJson(object);
42 - boolean result=client.addDoc(""+i,docString); 41 + result=client.addDoc(""+i,docString);
43 } 42 }
44 - 43 + Assert.assertTrue(result);
45 -
46 -// Assert.assertTrue(result);
47 } 44 }
48 45
49 46
...@@ -72,7 +69,7 @@ public class TestESClient { ...@@ -72,7 +69,7 @@ public class TestESClient {
72 String index="zhaoon1"; 69 String index="zhaoon1";
73 String type="test1"; 70 String type="test1";
74 ESClientConfiguration config=new ESClientConfiguration(); 71 ESClientConfiguration config=new ESClientConfiguration();
75 - config.addHostPorts("127.0.0.1:9200"); 72 + config.addHostPorts("192.168.0.162:9200");
76 ESHttpClient client= new ESHttpClient(index,type,config); 73 ESHttpClient client= new ESHttpClient(index,type,config);
77 client.init(); 74 client.init();
78 String id="1"; 75 String id="1";
...@@ -260,7 +257,7 @@ public class TestESClient { ...@@ -260,7 +257,7 @@ public class TestESClient {
260 257
261 @Test 258 @Test
262 public void testBulkAddDate() throws UnsupportedOperationException, IOException{ 259 public void testBulkAddDate() throws UnsupportedOperationException, IOException{
263 - String index="zhaoon1"; 260 + String index="zhaoon2";
264 String type="test1"; 261 String type="test1";
265 ESClientConfiguration config=new ESClientConfiguration(); 262 ESClientConfiguration config=new ESClientConfiguration();
266 config.addHostPorts("192.168.0.162:9200"); 263 config.addHostPorts("192.168.0.162:9200");
......
1 +package com.zhaoonline.common.es;
2 +
3 +import java.io.IOException;
4 +import java.util.ArrayList;
5 +import java.util.Date;
6 +import java.util.List;
7 +import java.util.concurrent.CountDownLatch;
8 +
9 +import org.junit.Assert;
10 +import org.junit.Test;
11 +
12 +import com.zhaoonline.common.json.TestDateObject;
13 +
14 +public class TestHttpClientFactory {
15 +
16 + @Test
17 + public void testConnectEvict() throws InterruptedException{
18 +
19 + String index="zhaoon2";
20 + String type="test1";
21 + ESClientConfiguration config=new ESClientConfiguration();
22 + config.addHostPorts("192.168.0.162:9200");
23 + ESHttpClient client= new ESHttpClient(index,type,config);
24 + client.init();
25 + List ids=new ArrayList<>();
26 + List dataList=new ArrayList<>();
27 + for(int i=0;i<100;i++){
28 + TestDateObject o=new TestDateObject();
29 + o.setId(i);
30 + o.setDate(new Date(System.currentTimeMillis()));
31 + dataList.add(o);
32 + ids.add(i);
33 + }
34 + boolean result=false;
35 + try {
36 + result=client.bulkAddDoc(ids,dataList);
37 + Assert.assertTrue(result);
38 + } catch (UnsupportedOperationException | IOException e) {
39 + e.printStackTrace();
40 + }
41 + Thread.sleep(10000);
42 + try {
43 + result=client.bulkAddDoc(ids,dataList);
44 + Assert.assertTrue(result);
45 + } catch (UnsupportedOperationException | IOException e) {
46 + e.printStackTrace();
47 + }
48 + client.close();
49 + try {
50 + result=client.bulkAddDoc(ids,dataList);
51 + Assert.assertTrue(result);
52 + } catch (UnsupportedOperationException | IOException e) {
53 + e.printStackTrace();
54 + }
55 +
56 + ESHttpClient client2= new ESHttpClient(index,type,config);
57 + client2.init();
58 + try {
59 + result=client2.bulkAddDoc(ids,dataList);
60 + } catch (UnsupportedOperationException | IOException e) {
61 + e.printStackTrace();
62 + }
63 + Assert.assertTrue(result);
64 + }
65 +
66 +
67 + @Test
68 + public void testMultiThread(){
69 +
70 + int threadCount = 200;
71 + CountDownLatch latch =new CountDownLatch(threadCount);
72 + String index="zhaoon2";
73 + String type="test1";
74 + ESClientConfiguration config=new ESClientConfiguration();
75 + config.addHostPorts("192.168.0.162:9200");
76 + for(int i=0;i<threadCount;i++){
77 + WriteThread wt= new WriteThread(latch,index,type,config,i);
78 + Thread t=new Thread(wt);
79 + t.start();
80 + }
81 + try {
82 + latch.await();
83 + } catch (InterruptedException e) {
84 + e.printStackTrace();
85 + }
86 + System.out.println("it is ok to comme to this step");
87 + }
88 +
89 + class WriteThread implements Runnable{
90 + CountDownLatch latch =null;
91 + private String _index;
92 + private String _type;
93 + private ESClientConfiguration config;
94 + private int threadid;
95 + public WriteThread(CountDownLatch latch,String index,String type,ESClientConfiguration config, int i){
96 + this.latch= latch;
97 + this._index=index;
98 + this._type=type;
99 + this.config=config;
100 + this.threadid=i;
101 + }
102 +
103 + @Override
104 + public void run() {
105 + System.out.println("thread "+threadid+" begin to running at:"+System.currentTimeMillis());
106 + ESHttpClient client= new ESHttpClient( _index, _type, config);
107 + client.init();
108 + List ids=new ArrayList<>();
109 + List dataList=new ArrayList<>();
110 + for(int i=0;i<100;i++){
111 + TestDateObject o=new TestDateObject();
112 + o.setId(i);
113 + o.setDate(new Date(System.currentTimeMillis()));
114 + dataList.add(o);
115 + ids.add(i);
116 + }
117 + boolean result=false;
118 + try {
119 + result=client.bulkAddDoc(ids,dataList);
120 + System.out.println("thread "+threadid+" end to run result:"+result+" at:"+System.currentTimeMillis());
121 + } catch (UnsupportedOperationException | IOException e) {
122 + e.printStackTrace();
123 + }
124 + this.latch.countDown();
125 + }
126 + }
127 +}