yangyoupeng

添加bulkAdd方法以及测试例子

......@@ -8,6 +8,7 @@ import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.ParseException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.entity.ContentType;
......@@ -16,8 +17,10 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhaoonline.common.Utils.IOUtils;
import com.zhaoonline.common.es.bean.BulkResponse;
import com.zhaoonline.common.es.bean.DeleteResponse;
import com.zhaoonline.common.es.bean.GetResponse;
import com.zhaoonline.common.es.bean.IndexResponse;
......@@ -277,4 +280,94 @@ public class ESHttpClient {
IOUtils.closeStream(response);
return deleteResponse.isSuccess();
}
/**
* Method name: bulkAddDoc <BR>
* Description: 批量添加指定ID <BR>
* Remark: <BR>
* @param ids
* @param dataList
* @return
* @throws UnsupportedOperationException
* @throws IOException boolean<BR>
*/
public boolean bulkAddDoc(List ids,List dataList) throws UnsupportedOperationException, IOException {
if(!validateBulkInput(ids, dataList)){
throw new IllegalArgumentException("the ids size is not equal to dataList ");
};
StringBuilder builder=buildBulkIndexScript(ids,dataList);
if(builder.length()==0){
return false;
}
return bulkAdd(builder.toString());
}
private boolean validateBulkInput(List ids, List dataList) {
if(ids !=null || dataList != null){
return false;
}
if(ids.size() != dataList.size() ){
return false;
}
return true;
}
/**
* Method name: bulkAddDoc <BR>
* Description: 批量添加自动生成ID <BR>
* Remark: <BR>
* @param dataList
* @return
* @throws UnsupportedOperationException
* @throws IOException boolean<BR>
*/
public boolean bulkAddDoc(List dataList) throws UnsupportedOperationException, IOException {
StringBuilder builder=buildBulkIndexScript(null,dataList);
if(builder.length()==0){
return false;
}
return bulkAdd(builder.toString());
}
public boolean bulkAdd(String bulkScripts) throws ParseException, IOException{
String bulkPath=path+PATH_SEPERATOR+"_bulk";
CloseableHttpResponse response= sendPostRequest(bulkScripts,bulkPath,this.hostsList);
if(response ==null){
return false;
}
HttpEntity responseEntity=response.getEntity();
BulkResponse bulkResponse=JsonUtils.toObject(responseEntity.getContent(), BulkResponse.class);
IOUtils.closeStream(response);
return !bulkResponse.isErrors();
}
public StringBuilder buildBulkIndexScript(List ids,List dataList) throws JsonProcessingException {
StringBuilder bulkBuilder=new StringBuilder();
if(ids !=null && ids.size() !=0){
for(int i=0;i<ids.size();i++){
StringBuilder itemBuilder=new StringBuilder();
itemBuilder.append("{ \"index\" :").append("{ \"_index\" : \"").
append(this._index).append("\"").append(",\"_type\" :\"").append(this._type).append("\"")
.append(",\"_id\" :\"").append(ids.get(i)).append("\"")
.append("} }").append("\n");
String objectjson=JsonUtils.toJson(dataList.get(i));
itemBuilder.append(objectjson).append("\n");
bulkBuilder.append(itemBuilder);
}
}else {
for(Object object:dataList){
StringBuilder itemBuilder=new StringBuilder();
itemBuilder.append("{ \"create\" :").append("{ \"_index\" : \"").
append(this._index).append("\"").append(",\"_type\" :\"").append(this._type).append("\"")
//.append(",\"_id\" :\"").append(1).append("\"")
.append("} }").append("\n");
String objectjson=JsonUtils.toJson(object);
bulkBuilder.append(itemBuilder);
}
}
return bulkBuilder;
}
}
......
package com.zhaoonline.common.es.bean;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class BulkResponse {
private long took;
private boolean errors;
private List items;
public long getTook() {
return took;
}
public void setTook(long took) {
this.took = took;
}
public boolean isErrors() {
return errors;
}
public void setErrors(boolean errors) {
this.errors = errors;
}
public List getItems() {
return items;
}
public void setItems(List items) {
this.items = items;
}
}
package com.zhaoonline.common.es.bean;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class IndexResponse extends CommonESResponse{
@JsonProperty("created")
......
package com.zhaoonline.common.es;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -160,6 +162,93 @@ public class TestESClient {
client.close();
}
@Test
public void testBuilderBulkScript() throws JsonProcessingException{
String index="zhaoon1";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("127.0.0.1:9200");
ESHttpClient client= new ESHttpClient(index,type,config);
List dataList=new ArrayList<>();
for(int i=0;i<1;i++){
Map object=new HashMap<>();
object.put("testKey"+i, "testValue");
dataList.add(object);
}
StringBuilder string=client.buildBulkIndexScript(null,dataList);
//
StringBuilder expectBuilder=new StringBuilder();
expectBuilder.append("{ \"create\" :").append("{ \"_index\" : \"").
append("zhaoon1").append("\"").append(",\"_type\" :\"").append("test1").append("\"").append("} }").append("\n");
Map object=new HashMap<>();
object.put("testKey0", "testValue");
String objectjson=JsonUtils.toJson(object);
expectBuilder.append(objectjson).append("\n");
Assert.assertEquals(expectBuilder.toString(), string.toString());
}
@Test
public void testBulkAddAutoId() throws UnsupportedOperationException, IOException{
String index="zhaoon1";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("127.0.0.1:9200");
ESHttpClient client= new ESHttpClient(index,type,config);
client.init();
List dataList=new ArrayList<>();
for(int i=0;i<100;i++){
Map object=new HashMap<>();
object.put("testKey"+i, "testValue");
dataList.add(object);
}
boolean result=client.bulkAddDoc(dataList);
Assert.assertTrue(result);
}
@Test(expected=IllegalArgumentException.class)
public void testBulkAddIdsNotMatchDataList() throws UnsupportedOperationException, IOException{
String index="zhaoon1";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("127.0.0.1:9200");
ESHttpClient client= new ESHttpClient(index,type,config);
client.init();
List ids=new ArrayList<>();
List dataList=new ArrayList<>();
for(int i=0;i<100;i++){
Map object=new HashMap<>();
object.put("testKey"+i, "testValue");
dataList.add(object);
}
boolean result=client.bulkAddDoc(ids,dataList);
}
@Test
public void testBulkAdd() throws UnsupportedOperationException, IOException{
String index="zhaoon1";
String type="test1";
ESClientConfiguration config=new ESClientConfiguration();
config.addHostPorts("127.0.0.1:9200");
ESHttpClient client= new ESHttpClient(index,type,config);
client.init();
List ids=new ArrayList<>();
List dataList=new ArrayList<>();
for(int i=0;i<100;i++){
Map object=new HashMap<>();
object.put("testKey"+i, "testValue");
dataList.add(object);
ids.add(i);
}
boolean result=client.bulkAddDoc(ids,dataList);
Assert.assertTrue(result);
}
}
......