TestHttpClientFactory.java 3.36 KB
package com.zhaoonline.common.es;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.junit.Assert;
import org.junit.Test;

import com.zhaoonline.common.json.TestDateObject;

public class TestHttpClientFactory {
	
	@Test
	public void testConnectEvict() throws InterruptedException{
		
		String index="zhaoon2";
		String type="test1";
		ESClientConfiguration config=new ESClientConfiguration();
		config.addHostPorts("192.168.0.162:9200");
		ESHttpClient client= new ESHttpClient(index,type,config);
		client.init();
		List ids=new ArrayList<>();
		List dataList=new ArrayList<>();
		for(int i=0;i<100;i++){
			TestDateObject o=new TestDateObject();
			o.setId(i);
			o.setDate(new Date(System.currentTimeMillis()));
			dataList.add(o);
			ids.add(i);
		}
		boolean result=false;
		try {
			 result=client.bulkAddDoc(ids,dataList);
				Assert.assertTrue(result);
		} catch (UnsupportedOperationException | IOException e) {
			e.printStackTrace();
		}
		Thread.sleep(10000);
		 try {
			result=client.bulkAddDoc(ids,dataList);
			Assert.assertTrue(result);
		} catch (UnsupportedOperationException | IOException e) {
			e.printStackTrace();
		}
		client.close();
		try {
			result=client.bulkAddDoc(ids,dataList);
			Assert.assertTrue(result);
		} catch (UnsupportedOperationException | IOException e) {
			e.printStackTrace();
		}
		
		ESHttpClient client2= new ESHttpClient(index,type,config);
		client2.init();
		try {
			result=client2.bulkAddDoc(ids,dataList);
		} catch (UnsupportedOperationException | IOException e) {
			e.printStackTrace();
		}
		Assert.assertTrue(result);
	}
	
	
	@Test
	public void testMultiThread(){
		
		int threadCount = 200;
		CountDownLatch latch =new CountDownLatch(threadCount);
		String index="zhaoon2";
		String type="test1";
		ESClientConfiguration config=new ESClientConfiguration();
		config.addHostPorts("192.168.0.162:9200");
		for(int i=0;i<threadCount;i++){
			WriteThread wt= new WriteThread(latch,index,type,config,i);
			Thread t=new Thread(wt);
			t.start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("it is ok to comme to this step");
	}
	
	class WriteThread implements Runnable{
		CountDownLatch latch =null;
		private String _index;
		private String _type;
		private ESClientConfiguration config;
		private int threadid;
		public WriteThread(CountDownLatch latch,String index,String type,ESClientConfiguration config, int i){
			this.latch= latch;
			this._index=index;
			this._type=type;
			this.config=config;
			this.threadid=i;
		}

		@Override
		public void run() {
			System.out.println("thread "+threadid+" begin to running at:"+System.currentTimeMillis());
			ESHttpClient client= new ESHttpClient( _index, _type, config);
			client.init();
			List ids=new ArrayList<>();
			List dataList=new ArrayList<>();
			for(int i=0;i<100;i++){
				TestDateObject o=new TestDateObject();
				o.setId(i);
				o.setDate(new Date(System.currentTimeMillis()));
				dataList.add(o);
				ids.add(i);
			}
			boolean result=false;
			try {
				result=client.bulkAddDoc(ids,dataList);
				System.out.println("thread "+threadid+" end to run result:"+result+" at:"+System.currentTimeMillis());
			} catch (UnsupportedOperationException | IOException e) {
				e.printStackTrace();
			}
			this.latch.countDown();
		}
	}
}