yangyoupeng

jar包同步,同步功能

Showing 32 changed files with 905 additions and 26 deletions
......@@ -286,6 +286,11 @@
<version>0.1</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.48</version>
</dependency>
</dependencies>
<dependencyManagement>
......
......@@ -20,7 +20,7 @@ public interface ApiJarHandler {
* @return
* @throws IOException Path<BR>
*/
public Path saveFile(File file) throws IOException;
public Path saveFile(File file) throws IOException;
public boolean deleteFile(File file) throws IOException;
......
package com.zhaoonline.support.gateway.apijar;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhaoonline.support.gateway.client.HttpClientFactory;
import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.listener.JarEventRepository;
import com.zhaoonline.support.gateway.listener.NewJarAddEvent;
import com.zhaoonline.support.gateway.utils.Utils;
/**
* class ApiJarSyncer <BR>
* class description:该类的作用是为了与其他节点的jar包进行同步<BR>
* Remark: <BR>
* @version 1.00 2016年11月1日
* @author zhaoonline)yangyoupeng
*/
public class ApiJarSyncer {
private JarEventRepository repostiry;
private ApiManager localapiManager;
private Set<String> jarFileIndexOnlocal=new HashSet<String>();
private static final Thread monitorThread=createMonitorThread();
private static final AtomicReference<CloseableHttpClient> CLIENT = new AtomicReference<CloseableHttpClient>();
private static final Logger LOG = LoggerFactory.getLogger(ApiJarSyncer.class);
private static final Timer CONNECTION_MANAGER_TIMER = new Timer(true);
public ApiJarSyncer(JarEventRepository repostiry,ApiManager apiManager){
this.repostiry=repostiry;
this.localapiManager=apiManager;
}
public void syncJarFiles(){
List<JarFileInfo> resultList= repostiry.queryAllJarEvent();
//Repository中查询到jar如果属于自己的。我们要将之进行删除
Iterator<JarFileInfo> itr=resultList.iterator();
while(itr.hasNext()){
JarFileInfo jarFileInfo = itr.next();
if(repostiry.getHost().equalsIgnoreCase(jarFileInfo.getHost())){
itr.remove();
}
}
List<FileInfo> localFileInfoList=localapiManager.displayAllApiArchiveFile();
List<JarFileInfo> notExistJarInfos=notExistedJars(resultList,localFileInfoList);
doSyncJarFiles(notExistJarInfos);
}
public void doSyncJarFiles(List<JarFileInfo> notExistJarInfos){
int successSyncCount=0;
int failedSyncCount=0;
LOG.info("[{}] jar file need to sync ",notExistJarInfos.size());
for(JarFileInfo jarFileInfo:notExistJarInfos){
LOG.info("begin to sync jar file [{}]",jarFileInfo);
ApiUploadResult reults=new ApiUploadResult();
reults.setSuccess(false);
try{
reults=buildRequestAndDownLoad(jarFileInfo);
LOG.info("sync jar file [{}],result is [{}]",jarFileInfo,reults.isSuccess());
}catch(Exception e){
e.printStackTrace();
LOG.info("sync jar file [{}] failed due to",jarFileInfo,e.getMessage());
}
if(reults.isSuccess()){
successSyncCount++;
}else{
failedSyncCount++;
}
}
LOG.warn("total sync result: success:[{}],fail:[{}]",successSyncCount,failedSyncCount);
}
private ApiUploadResult buildRequestAndDownLoad(JarFileInfo jarFileInfo) {
String host=jarFileInfo.getHost();
int port=jarFileInfo.getPort();
RequestBuilder requestBuilder=RequestBuilder.get("/apiman/download").addParameter("file", jarFileInfo.getJarFileName());
HttpHost hostPort=new HttpHost(host,port);
File downloadedFile = executeDownload(jarFileInfo, requestBuilder, hostPort);
ApiUploadResult uploadResult= upload2Local(downloadedFile);
return uploadResult;
}
private File executeDownload(JarFileInfo jarFileInfo,RequestBuilder requestBuilder,HttpHost hostPort) {
ApiUploadResult uploadResult=new ApiUploadResult();
File archiveFile =null;
OutputStream out = null;
InputStream inputStream=null;
CloseableHttpResponse response=null;
try {
response=getClient().execute(hostPort, requestBuilder.build());
HttpEntity httpEntity=response.getEntity();
String headerKey = "Content-Disposition";
Header[] header=response.getHeaders(headerKey);
assert header !=null;
String header_fileName = "filename=";
int fileNameIdx=header[0].getValue().indexOf(header_fileName);
String downloadFileName=header[0].getValue().substring(fileNameIdx+header_fileName.length());
//截取到fileName为"***",两边有引号我们需要去掉
downloadFileName=downloadFileName.replace("\"", "");
assert downloadFileName.equals(jarFileInfo.getJarFileName());
inputStream=httpEntity.getContent();
File tempDir = Utils.createTempDir();
archiveFile = new File(tempDir, downloadFileName);
out = new BufferedOutputStream(new FileOutputStream(archiveFile));
IOUtils.copy(inputStream, out);
} catch (IOException e) {
e.printStackTrace();
uploadResult.setSuccess(false);
}finally{
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(out);
IOUtils.closeQuietly(response);
}
return archiveFile;
}
public ApiUploadResult upload2Local(File archiveFile){
NewJarAddEvent event=new NewJarAddEvent();
event.addAdditionInfo(GWConstants.NOT_ADDTOREPOSITRY, null);
ApiUploadResult uploadResult=localapiManager.handleJarFileWithEvent(archiveFile,event);
return uploadResult;
}
public List<JarFileInfo> notExistedJars(List<JarFileInfo> jarInfos,List<FileInfo> localFileInfoList){
indexFileNames(localFileInfoList);
List<JarFileInfo> resultList=new ArrayList<JarFileInfo>();
for(JarFileInfo jarInfo:jarInfos){
if(jarFileIndexOnlocal.contains(jarInfo.getJarFileName())){
continue;
}else{
resultList.add(jarInfo);
}
}
return resultList;
}
private void indexFileNames(List<FileInfo> localFileInfoList) {
for(FileInfo jarInfo:localFileInfoList){
if(jarFileIndexOnlocal.contains(jarInfo.getName())){
continue;
}else{
jarFileIndexOnlocal.add(jarInfo.getName());
}
}
}
static {
CONNECTION_MANAGER_TIMER.schedule(new TimerTask() {
@Override
public void run() {
try {
final HttpClient hc = CLIENT.get();
if (hc == null) return;
hc.getConnectionManager().closeExpiredConnections();
hc.getConnectionManager().closeIdleConnections(30, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.error("error closing expired connections", t);
}
}
}, 30000, 5000);
Runtime.getRuntime().addShutdownHook(monitorThread);
}
private static Thread createMonitorThread() {
Thread monitorThread =new Thread(new Runnable() {
@Override
public void run() {
final CloseableHttpClient hc = CLIENT.get();
if (hc != null){
try {
hc.close();
} catch (IOException e) {
e.printStackTrace();
};
}
CONNECTION_MANAGER_TIMER.cancel();
LOG.info("pool Manager check timer cancel running");
}
});
return monitorThread;
}
public CloseableHttpClient getClient(){
if(CLIENT.get() == null){
CLIENT.set(newClient());
}
return CLIENT.get();
}
private CloseableHttpClient newClient() {
return HttpClientFactory.newClient();
}
}
package com.zhaoonline.support.gateway.apijar;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApiJarSyncerManager {
private static final Logger LOG = LoggerFactory.getLogger(ApiJarSyncerManager.class);
private ScheduledExecutorService executorService=Executors.newScheduledThreadPool(1);
private ApiJarSyncer syncer=null;
public ApiJarSyncerManager(ApiJarSyncer syncer) {
this.syncer =syncer;
}
private static long DELAY=TimeUnit.SECONDS.toMillis(5L);
private static long PERIOD=TimeUnit.SECONDS.toMillis(60L);
public void init() {
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
LOG.info("begain to run syncer other peers");
syncer.syncJarFiles();
LOG.info("end to run syncer other peers");
}
}, DELAY, PERIOD,TimeUnit.MILLISECONDS);
}
public void close() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
......@@ -108,6 +108,33 @@ public class ApiManager {
return result;
}
/**
* Method name: handleJarFileWithEvent <BR>
* Description: 该方法给#link{ApiJarScanner}使用<BR>
* Remark: <BR>
* @param archiveFile
* @return ApiUploadResult<BR>
*/
protected ApiUploadResult handleJarFileWithEvent(File archiveFile,NewJarAddEvent event) {
ApiUploadResult result=new ApiUploadResult();
try {
Path savedFilePath= apiJarhanlder.saveFile(archiveFile);
//不为空就表示正确
if(savedFilePath != null){
event.setNewJarPath(savedFilePath);
listeners.sendEvent(event);
}else{
result.setSuccess(false);
}
} catch (IOException e) {
LOG.error("exception happens when save jar file ine the file {},cause by {}",archiveFile,e.getMessage());
e.printStackTrace();
result.setSuccess(false);
}
return result;
}
private ApiUploadResult handleJarFile(File archiveFile) {
ApiUploadResult result=new ApiUploadResult();
try {
......
package com.zhaoonline.support.gateway.apijar;
public class JarFileInfo {
private String jarFileName;
private String host;
private int port;
public String getJarFileName() {
return jarFileName;
}
public void setJarFileName(String jarFileName) {
this.jarFileName = jarFileName;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String toString(){
StringBuilder strBuilder=new StringBuilder();
strBuilder.append("{").
append("\"").append("jarFileName").append("\"").append(":").append("\"").append(jarFileName).append("\"").append(",").
append("\"").append("host").append("\"").append(":").append("\"").append(host).append("\"").append(",").
append("\"").append("port").append("\"").append(":").append(port).append("}");
return strBuilder.toString();
}
}
......@@ -19,6 +19,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.listener.GWEventListenerRegistry;
import com.zhaoonline.support.gateway.listener.NewJarAddEvent;
import com.zhaoonline.support.gateway.utils.Utils;
......@@ -125,7 +126,7 @@ public class LocalFileSystemApiJarHandler implements ApiJarHandler {
}
@Override
public Path saveFile(File file) throws IOException {
public synchronized Path saveFile(File file) throws IOException {
Path willsaveFile=getApiJarDir().resolve(file.getName());
if(Files.exists(willsaveFile)){
LOG.error("can not save the file {} to apijardirectory {} ,file already exists",file.getName(),apiJarDir);
......
package com.zhaoonline.support.gateway.client;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class HttpClientFactory {
private static Integer SOCKET_TIMEOUT =10000;
private static Integer CONNECTION_TIMEOUT=2000;
public static final CloseableHttpClient newClient() {
final HttpClientBuilder builder = HttpClientBuilder.create();
builder.setConnectionManager(newConnectionManager());
final RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(SOCKET_TIMEOUT)
.setConnectTimeout(CONNECTION_TIMEOUT).build();
builder.setDefaultRequestConfig(requestConfig);
CloseableHttpClient httpclient = builder.build();
return httpclient;
}
public static final HttpClientConnectionManager newConnectionManager() {
// 默认支持http和https协议
final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(200);
cm.setDefaultMaxPerRoute(20);
cm.setValidateAfterInactivity(0);
return cm;
}
}
......@@ -22,4 +22,11 @@ public class GWConstants {
public static String INITIAL_STREAM_BUFFER_SIZE="zhaoonline.initial-stream-buffer-size";
public static int INITIAL_STREAM_BUFFER_SIZE_DEFAULT=1024;
public static final boolean SET_CONTENT_LENGTH = false;
public static final String NOT_ADDTOREPOSITRY = "NIP";
public static String DEFAUL_JAREVENT_INDEX="gwzhaoonline";
public static String DEFAUL_USER_INDEX="gwzhaoonline";
public static String DEFAUL_SERVICE_INDEX="gwzhaoonline";
}
......
......@@ -38,6 +38,14 @@ public class GateWayConfiguration {
GateWayConfiguration config= new GateWayConfiguration();
return config;
}
public String getString(String key) {
Object valObject=configMap.get(key);
if(valObject!=null){
return String.valueOf(valObject);
}
return null;
}
public Integer getInteger(String key, int defaultValue) {
Object valObject=configMap.get(key);
......
......@@ -2,19 +2,26 @@ package com.zhaoonline.support.gateway.listener;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URLClassLoader;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhaoonline.support.gateway.apijar.ApiClassLoaderManager;
import com.zhaoonline.support.gateway.apijar.ClassLoaderExistException;
import com.zhaoonline.support.gateway.apijar.JarLoader;
import com.zhaoonline.support.gateway.config.GWConstants;
public class JarEventListener implements GWEventListener {
ApiClassLoaderManager apiClassLoaderManager=ApiClassLoaderManager.INSTANCE;
private static Logger LOG=LoggerFactory.getLogger(JarEventListener.class);
JarEventRepositoryFactory jarEventRepositoryFactory ;
JarEventRepository repositry;
public JarEventListener(JarEventRepositoryFactory jarEventRepositoryFactory) throws Exception{
this.jarEventRepositoryFactory =jarEventRepositoryFactory;
repositry =jarEventRepositoryFactory.jarEventRepository();
JarEventRepositoryFactorySingleton.INSTACE.setJarEventRepository(jarEventRepositoryFactory);
}
@Override
public String getName() {
return "jarEventLister";
......@@ -37,6 +44,11 @@ public class JarEventListener implements GWEventListener {
//之所以不用URLClassLoader,是因为集成了Dubbo,dubbo加载interfaceclass是从Thread.currentThread().getContextClassLoader(),加载class类的。所以我们使用
//JarLoader.addFile来加载jar
JarLoader.addFile(jarPath);
//从页面正常upload上来的jar,不包含{@link GWConstants.NOT_ADDTOREPOSITRY},要将event添加到Repository中
//由于节点之间做jar同步的时候,测试不需要添加到Repository中,GWConstants.NOT_ADDTOREPOSITRY的字段又{@see ApiJarScanner#upload2Local(File archiveFile)}来指定
if(!event.getEventSource().containsKey(GWConstants.NOT_ADDTOREPOSITRY)){
repositry.saveJarEvent(event);
}
LOG.info("success to load the jar {}",jarPathObject);
} catch (MalformedURLException e) {
e.printStackTrace();
......
package com.zhaoonline.support.gateway.listener;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zhaoonline.support.gateway.apijar.JarFileInfo;
import com.zhaoonline.support.gateway.client.ElasticClientFactory;
import com.zhaoonline.support.gateway.client.ElasticConfiguration;
import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.config.GateWayConfiguration;
import com.zhaoonline.support.gateway.service.ServiceInfo;
import com.zhaoonline.support.gateway.utils.Utils;
/**
* class name:JarEventRepository <BR>
* class description: 将Jar发生上传、删除时候的event存储到某个仓库里面<BR>
* Remark: <BR>
* @version 1.00 2016年10月31日
* @author zhaoonline)yangyoupeng
*/
public class JarEventRepository {
private static final String JAR_PORT = "port";
private static final String JAR_HOST = "host";
private static final String JAR_FILE_NAME = "jarFileName";
private static Logger LOG=LoggerFactory.getLogger(JarEventRepository.class);
private static String DEFAUL_JAREVENT_TYPE="jarInfo";
private String host;
private String port;
private static final int MAX_ES_SIZE = 10000;//ES的最大size的值不能超过10000
private ElasticConfiguration elasticconfig ;
TransportClient client =null;
private GateWayConfiguration gatewatConfig;
public JarEventRepository(ElasticConfiguration elasticconfig,GateWayConfiguration gatewatConfig) throws Exception{
this.elasticconfig=elasticconfig;
this.gatewatConfig=gatewatConfig;
client = ElasticClientFactory.createClient(this.elasticconfig);
init();
}
public void init() throws Exception{
String host=this.gatewatConfig.getString(JAR_HOST);
String port=this.gatewatConfig.getString(JAR_PORT);
if(ObjectUtils.isEmpty(host) || ObjectUtils.isEmpty(port)){
throw new Exception("server.address or server.port can not be emty in application.properties");
}
this.host=host;
this.port=port;
}
public String getHost() {
return host;
}
public String getPort() {
return port;
}
/**
* Method name: saveJarEvent <BR>
* Description: 由JarEventListener调用,其中Jar的path已经存在 <BR>
* Remark: <BR>
* @param event
* @return boolean<BR>
* @throws JsonProcessingException
*/
public boolean saveJarEvent(GWEvent event) throws JsonProcessingException{
Map<String,Object> newInfo=new HashMap<String,Object>();
Object jarPathObject=event.getEventSource().get(GWEventConstants.JAR_PATH);
if(!ObjectUtils.isEmpty(jarPathObject)){
Path jarPath =(Path)jarPathObject;
String key=jarPath.toFile().getName();
newInfo.put(JAR_FILE_NAME, key);
newInfo.put(JAR_HOST, this.host);
newInfo.put(JAR_PORT, Integer.valueOf(this.port));
String docString=Utils.toJson(newInfo);
//我们使用文件名作为key
IndexResponse indexResponse= client.prepareIndex(GWConstants.DEFAUL_JAREVENT_INDEX, DEFAUL_JAREVENT_TYPE,key)
.setSource(docString).get();
if(indexResponse !=null){
LOG.info("success to jar event info:[{}],id:[{}]",newInfo,indexResponse.getId());
return true;
}
}
return false;
}
/**
* Method name: queryAllJarEvent <BR>
* Description: 我们使用10000作为查询所有的限定,是假设要是有10000个jar包,这个系统就已经有问题了。 <BR>
* Remark: <BR>
* @return List<JarFileInfo><BR>
*/
public List<JarFileInfo> queryAllJarEvent(){
String filterJarFileName="*";
SearchRequestBuilder requestBuilder=client.prepareSearch(GWConstants.DEFAUL_JAREVENT_INDEX).setTypes(DEFAUL_JAREVENT_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.wildcardQuery(JAR_FILE_NAME, filterJarFileName))
.setFrom(0).setSize(MAX_ES_SIZE);
SearchResponse searchRes = requestBuilder
.execute().actionGet();
List<JarFileInfo> results= new ArrayList<JarFileInfo>();
SearchHits hits = searchRes.getHits();
SearchHit[] searchHits=hits.getHits() ;
LOG.info("find {} docs,with query string:[{}]",hits.getTotalHits(),requestBuilder);
if (searchHits != null) {
for(SearchHit hit:searchHits){
Map<String, Object> jarEventDoc = hit.getSource();
JarFileInfo jarFileInfo=setJarFileInfo(jarEventDoc);
results.add(jarFileInfo);
}
}
return results;
}
private JarFileInfo setJarFileInfo(Map<String, Object> jarEventDoc) {
JarFileInfo jarFileInfo=new JarFileInfo();
Object jarHost=jarEventDoc.get(JAR_HOST);
Object jarFileName=jarEventDoc.get(JAR_FILE_NAME);
Object jarPort=jarEventDoc.get(JAR_PORT);
if(!ObjectUtils.isEmpty(jarPort)){
if(jarPort instanceof Integer){
jarFileInfo.setPort((int) jarPort);
}
if(jarPort instanceof String){
jarFileInfo.setPort(Integer.valueOf(String.valueOf(jarPort)));
}
}
if(!ObjectUtils.isEmpty(jarFileName)){
String jarFileNameStr=String.valueOf(jarFileName);
jarFileInfo.setJarFileName(jarFileNameStr);
}
if(!ObjectUtils.isEmpty(jarHost)){
String jarHostStr= String.valueOf(jarHost);
jarFileInfo.setHost(jarHostStr);
}
return jarFileInfo;
}
}
package com.zhaoonline.support.gateway.listener;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import com.zhaoonline.support.gateway.client.ElasticConfiguration;
import com.zhaoonline.support.gateway.config.GateWayConfiguration;
public class JarEventRepositoryFactory {
private Environment environment;
private ElasticConfiguration elasticconfig;
public JarEventRepositoryFactory(Environment environment, ElasticConfiguration elasticconfig) {
this.environment=environment;
this.elasticconfig=elasticconfig;
}
public JarEventRepository jarEventRepository() throws Exception{
GateWayConfiguration gatewatConfig=new GateWayConfiguration();
String port = environment.getProperty("server.port");
String host = environment.getProperty("server.address");
Map<String,String> map=new HashMap<String,String>();
map.put("port", port);
map.put("host", host);
gatewatConfig.addProperties(map);
JarEventRepository rep=new JarEventRepository(elasticconfig, gatewatConfig);
return rep;
}
}
package com.zhaoonline.support.gateway.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public enum JarEventRepositoryFactorySingleton {
INSTACE;
private Logger LOG=LoggerFactory.getLogger(JarEventRepositoryFactorySingleton.class);
JarEventRepositoryFactory jarEventRepositoryFactory=null;
public void setJarEventRepository(JarEventRepositoryFactory jarEventRepositoryFactory){
if(this.jarEventRepositoryFactory==null){
this.jarEventRepositoryFactory=jarEventRepositoryFactory;
}else{
LOG.warn(" jarEventRepositoryFactory instance already existed,just ignore the newly added jarEventRepositoryFactory");
}
}
public JarEventRepositoryFactory getJarEventRepositoryFactory(){
return this.jarEventRepositoryFactory;
}
}
......@@ -2,13 +2,24 @@ package com.zhaoonline.support.gateway.listener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import com.zhaoonline.support.gateway.client.ElasticConfiguration;
@Configuration
public class ListenerConfiguration {
@Bean
public JarEventRepositoryFactory jarEventRepositoryFactory(Environment environment,ElasticConfiguration elasticconfig){
JarEventRepositoryFactory factory =new JarEventRepositoryFactory(environment,elasticconfig);
return factory;
}
@Bean
public GWEventListenerRegistry listenerRegistry(){
public GWEventListenerRegistry listenerRegistry(JarEventRepositoryFactory jarEventRepositoryFactory) throws Exception{
GWEventListenerRegistry registry=GWEventListenerRegistry.INSTANCE;
JarEventListener jarEventLister=new JarEventListener();
JarEventListener jarEventLister=new JarEventListener(jarEventRepositoryFactory);
RuntimeStatusListener statsListener =new RuntimeStatusListener();
registry.registerListener(jarEventLister);
registry.registerListener(statsListener);
......
......@@ -27,4 +27,11 @@ public class NewJarAddEvent implements GWEvent{
info.put(GWEventConstants.JAR_PATH, jarPath);
}
public void addAdditionInfo(String key,Object value){
if(!info.containsKey(key)){
info.put(key, value);
}
}
}
......
......@@ -24,7 +24,9 @@ public class LogbackEventAppender extends AppenderBase<ILoggingEvent> {
@Override
protected void append(ILoggingEvent eventObject) {
watcher.add(eventObject,eventObject.getTimeStamp());
if(eventObject.getLevel().isGreaterOrEqual(ch.qos.logback.classic.Level.WARN)){
watcher.add(eventObject,eventObject.getTimeStamp());
}
}
}
......
......@@ -109,7 +109,7 @@ public class LogbackWatcher extends LogWatcher<ILoggingEvent>{
@Override
public Map toLogInfo(ILoggingEvent event) {
Map loggerInfo = new HashMap();
Map<String, String> loggerInfo = new HashMap<String, String>();
loggerInfo.put("time",format.print(new DateTime(event.getTimeStamp())));
loggerInfo.put("level", event.getLevel().toString());
loggerInfo.put("logger", event.getLoggerName());
......
......@@ -8,7 +8,7 @@ public enum LoggerWatcherSingleton {
private LogWatcher logwather;
LoggerWatcherSingleton(){
config = new LogWatcherConfig(true, LogWatcher.LOGFRAME_LOGBACK, Level.WARN.toString(), 1000);
config = new LogWatcherConfig(true, LogWatcher.LOGFRAME_LOGBACK, Level.INFO.toString(), 1000);
logwather = LogWatcher.newRegisteredLogWatcher(config);
}
......
package com.zhaoonline.support.gateway.main;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
//import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.context.ConfigurableApplicationContext;
......@@ -11,12 +20,18 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.support.StandardServletEnvironment;
import com.beust.jcommander.JCommander;
import com.netflix.zuul.http.ZuulServlet;
import com.zhaoonline.support.gateway.servlet.ApiManagerServlet;
import com.zhaoonline.support.gateway.servlet.LoggingServlet;
import com.zhaoonline.support.gateway.servlet.ServerStatisticsServlet;
import com.zhaoonline.support.gateway.servlet.StatsServlet;
import com.zhaoonline.support.gateway.utils.IOUtils;
@Configuration
......@@ -90,7 +105,37 @@ public class Application {
}
public static void main(String[] args) throws InterruptedException {
SpringApplication app = new SpringApplication(Application.class);
SpringApplicationBuilder builder=new SpringApplicationBuilder(Application.class);
CommandWrapper commandWrapper=new CommandWrapper();
new JCommander(commandWrapper,args);
File file=commandWrapper.file;
if(!ObjectUtils.isEmpty(file)){
Properties properties=new Properties();
FileInputStream inputStream=null;
try {
inputStream=new FileInputStream(file);
properties.load(inputStream);
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(inputStream);
}
StandardServletEnvironment environmet=new StandardServletEnvironment();
Map<String, Object> pros=new HashMap<>();
Iterator<Object> keysetIter= properties.keySet().iterator();
while(keysetIter.hasNext()){
String key=String.valueOf(keysetIter.next());
String value=properties.getProperty(String.valueOf(key));
pros.put(key, value);
}
PropertySource<?> propertySource=new MapPropertySource("customConfig", pros);
environmet.getPropertySources().addFirst(propertySource);
builder.environment(environmet);
}
SpringApplication app =builder.build();
//SpringApplication app = new SpringApplication(Application.class);
app.setRegisterShutdownHook(false);
boolean isWeb=true;
app.setWebEnvironment(isWeb);
......
package com.zhaoonline.support.gateway.main;
import java.io.File;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.FileConverter;
public class CommandWrapper {
@Parameter(names = {"-file","-f"}, converter = FileConverter.class)
File file;
}
......@@ -15,12 +15,12 @@ import org.joda.time.DateTime;
import org.springframework.util.ObjectUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.utils.Utils;
public class ESUserInfoReposity implements UserInfoRepository{
private TransportClient client;
private static String DEFAUL_SERVICE_INDEX="zhaoonline";
private static String DEFAUL_SERVICE_TYPE="user";
public ESUserInfoReposity(TransportClient client){
......@@ -29,7 +29,7 @@ public class ESUserInfoReposity implements UserInfoRepository{
@Override
public UserInfo findUser(String userName) {
SearchResponse searchRes = client.prepareSearch(DEFAUL_SERVICE_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
SearchResponse searchRes = client.prepareSearch(GWConstants.DEFAUL_USER_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(UserInfo.KEY_USERNAME,userName)))
......@@ -83,13 +83,13 @@ public class ESUserInfoReposity implements UserInfoRepository{
public IndexResponse addUser(UserInfo userInfo) throws JsonProcessingException {
String docString=Utils.toJson(userInfo);
IndexResponse indexResponse= client.prepareIndex(DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE)
IndexResponse indexResponse= client.prepareIndex(GWConstants.DEFAUL_USER_INDEX, DEFAUL_SERVICE_TYPE)
.setSource(docString).get();
return indexResponse;
}
public boolean deleteUserByID(String userID) {
DeleteResponse deleteResponse=client.prepareDelete(DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE, userID).get();
DeleteResponse deleteResponse=client.prepareDelete(GWConstants.DEFAUL_USER_INDEX, DEFAUL_SERVICE_TYPE, userID).get();
if(deleteResponse!=null){
return true;
}
......
......@@ -21,6 +21,7 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.utils.Utils;
import com.zhaoonline.support.gateway.web.service.APIInfoService;
......@@ -28,16 +29,14 @@ public class ESServiceInfoLoader implements ServiceInfoLoader{
private static final int MAX_ES_SIZE = 10000;//ES的最大size的值不能超过10000
private static Logger LOG=LoggerFactory.getLogger(ESServiceInfoLoader.class);
private TransportClient client;
private static String DEFAUL_SERVICE_INDEX="zhaoonline";
private static String DEFAUL_SERVICE_TYPE="service";
public ESServiceInfoLoader(TransportClient client){
this.client=client;
}
@Override
public ServiceInfo findServiceInfo(String serviceName) {
SearchRequestBuilder requestBuilder=client.prepareSearch(DEFAUL_SERVICE_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
SearchRequestBuilder requestBuilder=client.prepareSearch(GWConstants.DEFAUL_SERVICE_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(ServiceInfo.KEY_SERVICENAME,serviceName)))
.setFrom(0).setSize(1);
......@@ -61,7 +60,7 @@ public class ESServiceInfoLoader implements ServiceInfoLoader{
if(!StringUtils.hasText(serviceName)){
filterSeriveName="*";
}
SearchRequestBuilder requestBuilder=client.prepareSearch(DEFAUL_SERVICE_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
SearchRequestBuilder requestBuilder=client.prepareSearch(GWConstants.DEFAUL_SERVICE_INDEX).setTypes(DEFAUL_SERVICE_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.wildcardQuery(ServiceInfo.KEY_SERVICENAME, filterSeriveName))
//.setQuery(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(ServiceInfo.KEY_SERVICENAME,filterSeriveName)))
......@@ -176,7 +175,7 @@ public class ESServiceInfoLoader implements ServiceInfoLoader{
String docString=Utils.toJson(newServiceInfo);
IndexResponse indexResponse= client.prepareIndex(DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE)
IndexResponse indexResponse= client.prepareIndex(GWConstants.DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE)
.setSource(docString).get();
LOG.info("add service info:[{}],result details:[{}]",newServiceInfo,indexResponse);
if(indexResponse !=null){
......@@ -195,7 +194,7 @@ public class ESServiceInfoLoader implements ServiceInfoLoader{
*/
@Override
public boolean deleteServiceInfoByID(String serviceID) {
DeleteResponse deleteResponse=client.prepareDelete(DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE, serviceID).get();
DeleteResponse deleteResponse=client.prepareDelete(GWConstants.DEFAUL_SERVICE_INDEX, DEFAUL_SERVICE_TYPE, serviceID).get();
if(deleteResponse!=null){
LOG.info("success to delete service info with id:[{}]",deleteResponse.getId());
return true;
......
......@@ -40,7 +40,7 @@ public class HTTPService extends AbstractService<HttpServletRequest, HttpRespons
private static final Logger LOG = LoggerFactory.getLogger(HTTPService.class);
private GateWayConfiguration config=null;
private static final AtomicReference<CloseableHttpClient> CLIENT = new AtomicReference<CloseableHttpClient>();
private static final Thread monitorThread=createMonitorThread();
private Integer SOCKET_TIMEOUT =10000;
private Integer CONNECTION_TIMEOUT=2000;
......@@ -60,8 +60,28 @@ public class HTTPService extends AbstractService<HttpServletRequest, HttpRespons
}
}
}, 30000, 5000);
Runtime.getRuntime().addShutdownHook(monitorThread);
}
private static Thread createMonitorThread() {
Thread monitorThread =new Thread(new Runnable() {
@Override
public void run() {
CONNECTION_MANAGER_TIMER.cancel();
final CloseableHttpClient hc = CLIENT.get();
if (hc != null){
try {
hc.close();
} catch (IOException e) {
e.printStackTrace();
};
}
LOG.info("pool Manager check timer cancel running");
}
});
return monitorThread;
}
public HTTPService(ServiceInfo serviceInfo){
this(serviceInfo,new GateWayConfiguration());
}
......
......@@ -25,11 +25,15 @@ import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhaoonline.support.gateway.apijar.ApiJarSyncer;
import com.zhaoonline.support.gateway.apijar.ApiJarSyncerManager;
import com.zhaoonline.support.gateway.apijar.ApiManager;
import com.zhaoonline.support.gateway.apijar.ApiManagerConfiguration;
import com.zhaoonline.support.gateway.apijar.ApiResponse;
import com.zhaoonline.support.gateway.apijar.ApiUploadResult;
import com.zhaoonline.support.gateway.apijar.FileInfo;
import com.zhaoonline.support.gateway.listener.JarEventRepository;
import com.zhaoonline.support.gateway.listener.JarEventRepositoryFactorySingleton;
import com.zhaoonline.support.gateway.utils.Utils;
/**
......@@ -61,7 +65,8 @@ public class ApiManagerServlet extends HttpServlet {
private ApiManagerConfiguration config;
private ApiJarSyncerManager syncerManager;
@Override
public void init() throws ServletException {
super.init();
......@@ -77,6 +82,18 @@ public class ApiManagerServlet extends HttpServlet {
e.printStackTrace();
throw new ServletException(e);
}
JarEventRepository repostiry;
try {
repostiry = JarEventRepositoryFactorySingleton.INSTACE.getJarEventRepositoryFactory().jarEventRepository();
} catch (Exception e) {
e.printStackTrace();
throw new ServletException(e);
}
ApiJarSyncer syncer=new ApiJarSyncer(repostiry, apiManager);
syncerManager=new ApiJarSyncerManager(syncer);
syncerManager.init();
LOG.info("ApiManagerServlet started");
}
public void loadConfig(){
this.config = ApiManagerConfiguration.loadFrom(configFileName);
......@@ -303,4 +320,10 @@ public class ApiManagerServlet extends HttpServlet {
public boolean hasParam(HttpServletRequest request, String param) {
return HttpRequestUtils.hasParam(request, param);
}
@Override
public void destroy(){
syncerManager.close();
}
}
......
......@@ -27,7 +27,7 @@ public class LoggingServlet extends HttpServlet {
@Override
public void init(ServletConfig config){
loggerWatcherSingleton = LoggerWatcherSingleton.INSTANCE;
LOGGER.info("LoggingServlet started");
}
@Override
......
......@@ -29,7 +29,7 @@ public class IOUtils {
if (resourceLocation.startsWith(CLASSPATH_URL_PREFIX)) {
String path = resourceLocation.substring(CLASSPATH_URL_PREFIX.length());
Resource resource = new ClassPathResource(path);
System.out.println(resource.getURI());
//System.out.println(resource.getURI());
return resource;
// String description = "class path resource [" + path + "]";
// ClassLoader cld = Thread.currentThread().getContextClassLoader();
......
package com.zhaoonline.support.gateway.utils;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class NetworkUtils {
public static InetAddress[] getGlobalAddresses() throws SocketException {
List<InetAddress> list = new ArrayList<>();
for (NetworkInterface intf : getInterfaces()) {
if (intf.isUp()) {
for (InetAddress address : Collections.list(intf.getInetAddresses())) {
if (address.isLoopbackAddress() == false &&
address.isSiteLocalAddress() == false &&
address.isLinkLocalAddress() == false) {
list.add(address);
}
}
}
}
if (list.isEmpty()) {
throw new IllegalArgumentException("No up-and-running global-scope (public) addresses found, got " + getInterfaces());
}
return list.toArray(new InetAddress[list.size()]);
}
/** Return all interfaces (and subinterfaces) on the system */
public static List<NetworkInterface> getInterfaces() throws SocketException {
List<NetworkInterface> all = new ArrayList<>();
addAllInterfaces(all, Collections.list(NetworkInterface.getNetworkInterfaces()));
Collections.sort(all, new Comparator<NetworkInterface>() {
@Override
public int compare(NetworkInterface left, NetworkInterface right) {
return Integer.compare(left.getIndex(), right.getIndex());
}
});
return all;
}
/** Helper for getInterfaces, recursively adds subinterfaces to {@code target} */
private static void addAllInterfaces(List<NetworkInterface> target, List<NetworkInterface> level) {
if (!level.isEmpty()) {
target.addAll(level);
for (NetworkInterface intf : level) {
addAllInterfaces(target, Collections.list(intf.getSubInterfaces()));
}
}
}
/** Returns only the IPV4 addresses in {@code addresses} */
static InetAddress[] filterIPV4(InetAddress addresses[]) {
List<InetAddress> list = new ArrayList<>();
for (InetAddress address : addresses) {
if (address instanceof Inet4Address) {
list.add(address);
}
}
if (list.isEmpty()) {
throw new IllegalArgumentException("No ipv4 addresses found in " + Arrays.toString(addresses));
}
return list.toArray(new InetAddress[list.size()]);
}
}
......@@ -2,6 +2,8 @@ application.message=\u8D75\u6D8C\u5728\u7EBFAPI GateWay\u6B22\u8FCE\u60A8
spring.application.name=apigateway
server.port=80
server.address=192.168.0.162
#server.address=192.168.3.79
spring.mvc.view.prefix: /WEB-INF/view/
spring.mvc.view.suffix: .jsp
......
package com.zhaoonline.support.gateway.apijar;
import java.io.File;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.RequestBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
public class TestApiJarScanner {
@Test
public void testExecuteDownload(){
JarFileInfo jarFileInfo=new JarFileInfo();
jarFileInfo.setHost("192.168.0.162");
jarFileInfo.setPort(80);
jarFileInfo.setJarFileName("zhao-user-api-1.0.3.jar");
ApiJarSyncer fileSync=new ApiJarSyncer(null, null);
RequestBuilder requestBuilder=RequestBuilder.get("/apiman/download").addParameter("file", jarFileInfo.getJarFileName());
HttpHost hostPort=new HttpHost(jarFileInfo.getHost(),jarFileInfo.getPort());
File file=ReflectionTestUtils.invokeMethod(fileSync, "executeDownload",jarFileInfo,requestBuilder,hostPort);
Assert.assertEquals("zhao-user-api-1.0.3.jar", file.getName());
}
}
......@@ -4,10 +4,16 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -44,12 +50,16 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.embedded.EmbeddedServletContainerFactory;
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;
......@@ -60,6 +70,7 @@ import com.zhaoonline.support.gateway.apijar.ApiManager;
import com.zhaoonline.support.gateway.apijar.ApiResponse;
import com.zhaoonline.support.gateway.apijar.TestJarLoader;
import com.zhaoonline.support.gateway.main.TestLoaderZuulServlet;
import com.zhaoonline.support.gateway.utils.NetworkUtils;
import com.zhaoonline.support.gateway.utils.Utils;
import javassist.CannotCompileException;
......@@ -275,7 +286,10 @@ public class TestApiManagerServlet {
app.setRegisterShutdownHook(false);
app.setWebEnvironment(true);
String[] args = new String[0];
app.run(args);
ConfigurableApplicationContext context=app.run(args);
System.out.println(context.getEnvironment().getProperty("local.server.port"));
System.out.println(context.getEnvironment().getProperty("server.address"));
}
private void createTestJarFile(String filePath) throws IOException, NotFoundException, CannotCompileException{
......@@ -287,6 +301,16 @@ public class TestApiManagerServlet {
TestJarLoader.createDynamicJarWithSimplePojo(properties, "com.test.User", filePath);
}
@Test
public void test() throws UnknownHostException, SocketException{
InetAddress addresses[] =NetworkUtils.getGlobalAddresses();
for(InetAddress address:addresses){
System.out.println(address.getHostAddress());
}
}
}
......
package com.zhaoonline.support.gateway.utils;
import java.net.InetAddress;
import java.net.SocketException;
import org.junit.Test;
public class TestNetworkUtils {
@Test
public void testgetGlobalAddresses() throws SocketException{
InetAddress addresses[] =NetworkUtils.getGlobalAddresses();
for(InetAddress address:addresses){
System.out.println(address.getHostAddress());
}
}
}