TestHttpClientFactory.java
3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.zhaoonline.common.es;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import com.zhaoonline.common.json.TestDateObject;
public class TestHttpClientFactory {
@Test
public void testConnectEvict() throws InterruptedException{
String index="zhaoon2";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("192.168.0.162:9200");
ESHttpClient client= new ESHttpClient(index,type,config);
client.init();
List ids=new ArrayList<>();
List dataList=new ArrayList<>();
for(int i=0;i<100;i++){
TestDateObject o=new TestDateObject();
o.setId(i);
o.setDate(new Date(System.currentTimeMillis()));
dataList.add(o);
ids.add(i);
}
boolean result=false;
try {
result=client.bulkAddDoc(ids,dataList);
Assert.assertTrue(result);
} catch (UnsupportedOperationException | IOException e) {
e.printStackTrace();
}
Thread.sleep(10000);
try {
result=client.bulkAddDoc(ids,dataList);
Assert.assertTrue(result);
} catch (UnsupportedOperationException | IOException e) {
e.printStackTrace();
}
client.close();
try {
result=client.bulkAddDoc(ids,dataList);
Assert.assertTrue(result);
} catch (UnsupportedOperationException | IOException e) {
e.printStackTrace();
}
ESHttpClient client2= new ESHttpClient(index,type,config);
client2.init();
try {
result=client2.bulkAddDoc(ids,dataList);
} catch (UnsupportedOperationException | IOException e) {
e.printStackTrace();
}
Assert.assertTrue(result);
}
@Test
public void testMultiThread(){
int threadCount = 200;
CountDownLatch latch =new CountDownLatch(threadCount);
String index="zhaoon2";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("192.168.0.162:9200");
for(int i=0;i<threadCount;i++){
WriteThread wt= new WriteThread(latch,index,type,config,i);
Thread t=new Thread(wt);
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("it is ok to comme to this step");
}
class WriteThread implements Runnable{
CountDownLatch latch =null;
private String _index;
private String _type;
private ESClientConfiguration config;
private int threadid;
public WriteThread(CountDownLatch latch,String index,String type,ESClientConfiguration config, int i){
this.latch= latch;
this._index=index;
this._type=type;
this.config=config;
this.threadid=i;
}
@Override
public void run() {
System.out.println("thread "+threadid+" begin to running at:"+System.currentTimeMillis());
ESHttpClient client= new ESHttpClient( _index, _type, config);
client.init();
List ids=new ArrayList<>();
List dataList=new ArrayList<>();
for(int i=0;i<100;i++){
TestDateObject o=new TestDateObject();
o.setId(i);
o.setDate(new Date(System.currentTimeMillis()));
dataList.add(o);
ids.add(i);
}
boolean result=false;
try {
result=client.bulkAddDoc(ids,dataList);
System.out.println("thread "+threadid+" end to run result:"+result+" at:"+System.currentTimeMillis());
} catch (UnsupportedOperationException | IOException e) {
e.printStackTrace();
}
this.latch.countDown();
}
}
}