yangyoupeng

添加service的LRUCache

......@@ -78,12 +78,14 @@ public class SendResponseFilter extends ZuulFilter {
private void writeResponse() throws IOException {
RequestContext context = RequestContext.getCurrentContext();
String response = context.getResponseBody();
LOG.debug("response [{}]",response);
HttpServletResponse servletResponse = context.getResponse();
servletResponse.setCharacterEncoding("UTF-8");
OutputStream outStream = servletResponse.getOutputStream();
//发送错误信息
if (response == null && context.getResponseDataStream() == null) {
Response responseObject=Response.failedResponse();
LOG.info("there is now response body or response steam,just send fail info [{}]",responseObject);
writeResponse(new ByteArrayInputStream(responseObject.toJson().getBytes(Charset.forName("UTF-8"))), outStream);
return;
}
......@@ -94,6 +96,7 @@ public class SendResponseFilter extends ZuulFilter {
if(context.getThrowable() !=null){
Response responseObject=Response.failedResponse();
responseObject.setMsg(context.getThrowable().getCause().getMessage());
LOG.info("there is exception when do request ,just send fail info [{}]",responseObject);
writeResponse(new ByteArrayInputStream(responseObject.toJson().getBytes(Charset.forName("UTF-8"))), outStream);
return;
}
......
......@@ -8,6 +8,7 @@ import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
......@@ -69,6 +70,9 @@ public class TransforFilter extends ZuulFilter implements InitializingBean,Dispo
RequestContext context = RequestContext.getCurrentContext();
HttpServletRequest request = context.getRequest();
LOGGER.info("recieve http request [{}]",request);
Map<String,Object> headerMap=new HashMap<String,Object>();
Enumeration<String> headers=request.getHeaderNames();
while(headers.hasMoreElements()){
......@@ -96,14 +100,14 @@ public class TransforFilter extends ZuulFilter implements InitializingBean,Dispo
} catch (Exception e) {
//对于异常。应该通知下异常处理
e.printStackTrace();
LOGGER.error("fail to read service info about {},because {} ",serviceName,e.getMessage());
LOGGER.error("fail to read service info about [{}],because [{}] ",serviceName,e.getMessage());
}
handlerServiceUnvaliable(context, serviceName, serviceInfo);
String servicetype=serviceInfo.getServiceType();
boolean enablecircuitBreak=commonConfig.getBoolean("gateway.cirecuitbreak.enable", true);
if(!StringUtils.isEmpty(servicetype)){
if(enablecircuitBreak){
......
......@@ -35,6 +35,12 @@ public class ServiceLifeCycleChecker implements LifeCycleChecker {
@Override
public void run() {
AbstractService service=serviceRef.get();
if(service ==null || serviceRef.get().isClosed() ){
LOGGER.info ("dubbo service is closed now");
throw new RuntimeException(String.format("throw an exception to terminate the thread,the service %s is closed ", serviceInfo));
}
Long startTime=service.getStartTime();
DateTime startDateTime=new DateTime(startTime);
......@@ -45,10 +51,6 @@ public class ServiceLifeCycleChecker implements LifeCycleChecker {
service.setClosed(true);
}
if(service ==null || serviceRef.get().isClosed() ){
LOGGER.info ("dubbo service is closed now");
throw new RuntimeException(String.format("throw an exception to terminate the thread,the service %s is closed ", serviceInfo));
}
try{
String status = (String) echoService.$echo("OK"); // 回声测试可用性
if(!status.equals("OK")){
......
......@@ -43,7 +43,9 @@ public class BeanConfiguration {
@Bean
public EhCacheFactoryBean ehcache4UserDetails(){
return new EhCacheFactoryBean();
EhCacheFactoryBean factory= new EhCacheFactoryBean();
factory.diskPersistent(false);
return factory;
}
@Bean
......
......@@ -17,10 +17,12 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServletRequest;
import org.apache.http.HttpRequest;
import org.apache.http.entity.ContentType;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.message.BasicHttpRequest;
import org.mockito.internal.matchers.ContainsExtraTypeInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
......@@ -44,6 +46,7 @@ import com.zhaoonline.support.gateway.config.GWConstants;
import com.zhaoonline.support.gateway.utils.Utils;
public class DubboService extends AbstractService<HttpServletRequest, HttpResponse> {
private static final String APPLICATION_JSON_MIME_TYPE = "application/json";
private ApiClassLoaderManager apiClassLoaderManager=ApiClassLoaderManager.INSTANCE;
private static final int UNKOWN_CONTENT_LENGTH = -1;
private static final Logger LOG = LoggerFactory.getLogger(DubboService.class);
......@@ -86,6 +89,12 @@ public class DubboService extends AbstractService<HttpServletRequest, HttpRespon
if(!isAvailable()){
throw new GatewayException(String.format("target service:%s is unavailable not ",servicInfo.getServiceName()));
}
final String contentType = request.getContentType();
if (contentType == null || !(contentType.startsWith(APPLICATION_JSON_MIME_TYPE))) {
throw new GatewayException(String.format("request context type must be set as %s ",APPLICATION_JSON_MIME_TYPE));
}
Object dubboservice = getAutomicReference().get();
//http访问请求中必须带有需要访问method的名称,因为dubbo是rpc协议。
String accessMethod = getAccessMethod(request);
......
package com.zhaoonline.support.gateway.service;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
public class ServiceLRUCache {
private ConcurrentHashMap<String, AbstractService> serviceLRUcache = null;
private Entry first;
private Entry last;
private int capacity = 0;
public ServiceLRUCache() {
this(200);
}
public ServiceLRUCache(int capacity) {
this.capacity = capacity;
this.serviceLRUcache = new ConcurrentHashMap<String, AbstractService>(200);
}
/**
* serialVersionUID:TODO
*/
private static final long serialVersionUID = -8224662589517146643L;
public synchronized AbstractService put(String key, AbstractService value) {
Entry entry = getEntry(key);
if (entry == null) {
if (serviceLRUcache.size() >= capacity) {
serviceLRUcache.remove(last.key);
removeLast();
}
entry = new Entry();
entry.key = key;
}else{
//因为覆盖了原来的key,所以要讲service的关闭,防止内存溢出,
//另外需要注意的是,此时内存中并不能立刻被销毁,因为在servicelifechecher中还存在对value的reference引用。
//
entry.value.setClosed(true);
entry.value=null;
}
entry.value = value;
moveToFirst(entry);
return serviceLRUcache.put(key, value);
}
public synchronized AbstractService get(String key) {
Entry entry = getEntry(key);
if (entry == null){
return null;
}
moveToFirst(entry);
return entry.value;
}
public synchronized void remove(String key) {
Entry entry = getEntry(key);
if (entry != null) {
closeService(entry);
if (entry.pre != null)
entry.pre.next = entry.next;
if (entry.next != null)
entry.next.pre = entry.pre;
if (entry == first)
first = entry.next;
if (entry == last)
last = entry.pre;
}
serviceLRUcache.remove(key);
}
private void closeService(Entry entry) {
entry.value.setClosed(true);
entry.value.destory();
entry.value=null;
}
private void removeLast() {
if (last != null) {
closeService(last);
last = last.pre;
if (last == null)
first = null;
else
last.next = null;
}
}
private void moveToFirst(Entry entry) {
if (entry == first)
return;
if (entry.pre != null)
entry.pre.next = entry.next;
if (entry.next != null)
entry.next.pre = entry.pre;
if (entry == last)
last = last.pre;
if (first == null || last == null) {
first = last = entry;
return;
}
entry.next = first;
first.pre = entry;
first = entry;
entry.pre = null;
}
private Entry getEntry(String key) {
AbstractService service = serviceLRUcache.get(key);
if (service == null) {
return null;
}
Entry entry = new Entry();
entry.key = key;
entry.value = service;
return entry;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Entry entry = first;
while (entry != null) {
sb.append(String.format("%s:%s ", entry.key, entry.value));
entry = entry.next;
}
return sb.toString();
}
static class Entry {
String key;
AbstractService value;
Entry pre;
Entry next;
}
public int getSize() {
return serviceLRUcache.size();
}
public Set<String> keySet(){
Set<String> keSet=new HashSet<String>();
KeySetView<String, AbstractService> keysSetView= serviceLRUcache.keySet();
Iterator<String> iter=keysSetView.iterator();
while(iter.hasNext()){
String key=iter.next();
keSet.add(key);
}
return keSet;
}
}
......@@ -17,7 +17,8 @@ public enum ServiceManager{
INSTANCE;
private ConcurrentHashMap<String , HTTPService> httpServiceCache=new ConcurrentHashMap<>();
private ConcurrentHashMap<String , DubboService> dubboServiceCache=new ConcurrentHashMap<>();
// private ConcurrentHashMap<String , DubboService> dubboServiceCache=new ConcurrentHashMap<>();
private ServiceLRUCache dubboServiceCache=new ServiceLRUCache();
private static final Logger LOG = LoggerFactory.getLogger(ServiceManager.class);
private LifeCycleCheckerManager lifeCycleCheckerManager=LifeCycleCheckerManager.INSTANCE;
......@@ -44,11 +45,19 @@ public enum ServiceManager{
entry.getValue().destory();
LOG.info("HTTP Service:{} has bean shutdown",entry.getKey());
}
Set<Entry<String, DubboService>> dubboEntrySet=dubboServiceCache.entrySet();
for(Entry<String, DubboService> entry:dubboEntrySet){
entry.getValue().destory();
LOG.info("Dubbo Service:{} has bean shutdown",entry.getKey());
Set<String> dubboEntrySet=dubboServiceCache.keySet();
for(String key:dubboEntrySet){
dubboServiceCache.remove(key);
LOG.info("Dubbo Service:{} has bean shutdown",key);
}
// Set<Entry<String, DubboService>> dubboEntrySet=dubboServiceCache.entrySet();
// for(Entry<String, DubboService> entry:dubboEntrySet){
// entry.getValue().destory();
// LOG.info("Dubbo Service:{} has bean shutdown",entry.getKey());
// }
}
/**
......@@ -59,24 +68,27 @@ public enum ServiceManager{
* @return DubboService<BR>
*/
public synchronized DubboService buildDubboServiceFrom(ServiceInfo serviceInfo,GateWayConfiguration config) {
DubboService cachedService = dubboServiceCache.get(serviceInfo.getServiceInterface());
String serviceName=serviceInfo.getServiceName();
DubboService cachedService = (DubboService) dubboServiceCache.get(serviceName);
if(cachedService == null){
LOG.info("no cache for dubbo service {},create new instance for service",serviceInfo.getServiceInterface());
LOG.info("no cache for dubbo service [{}],create new instance for service",serviceInfo.getServiceName());
DubboService dubboSerivce=new DubboService(serviceInfo);
try {
dubboSerivce.init();
monitorStatus(dubboSerivce,config);
} catch (Exception e) {
LOG.info("fail to init the dubbo service interface {},cause by {}",serviceInfo.getServiceInterface(),e.getMessage());
LOG.info("fail to init the dubbo service interface [{}],cause by [{}]",serviceInfo,e.getMessage());
e.printStackTrace();
return null;
}
dubboServiceCache.put(serviceInfo.getServiceInterface(), dubboSerivce);
LOG.info("cache dubbo service with key [{}]",serviceName);
dubboServiceCache.put(serviceName,dubboSerivce);
return dubboSerivce;
}else{
LOG.info("get dubbo service from cache with key [{}]",serviceName);
if(cachedService.isClosed()){
LOG.info("the cached dubbo service interface {} is closed by {},just return null",serviceInfo.getServiceInterface(),LifeCycleCheckerManager.class.getName());
dubboServiceCache.remove(serviceInfo.getServiceInterface());
dubboServiceCache.remove(serviceName);
cachedService=null;
}
}
......
......@@ -24,7 +24,7 @@ public static void main(String[] args) throws JsonProcessingException {
TransportClient clinet=ElasticClientFactory.createClient(config);
ESServiceInfoLoader esUserInfoReposity=new ESServiceInfoLoader(clinet);
ServiceInfo newServiceInfo = localHttpService("wxtest");
ServiceInfo newServiceInfo = dubboService("wxtest");
boolean indexResponse=esUserInfoReposity.addServiceInfo(newServiceInfo);
// ServiceInfo serviceInfo=esUserInfoReposity.findServiceInfo("dubbo2");
......@@ -48,7 +48,7 @@ private static ServiceInfo dubboService(String serviceName) {
HostPort hostport=new HostPort();
hostport.setProtocol("zookeeper");
hostport.setHost("192.168.0.205");
hostport.setHost("192.168.0.112");
hostport.setPort(2181);
List<HostPort> hostPorts=Arrays.asList(hostport);
servicInfo.setHostports(hostPorts);
......
package com.zhaoonline.support.gateway.service;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import javax.servlet.http.HttpServletRequest;
import org.apache.http.HttpResponse;
import org.junit.Assert;
import org.junit.Test;
import com.netflix.zuul.context.RequestContext;
public class TestServiceLRUCache {
@Test
public void test(){
ServiceLRUCache cahce=new ServiceLRUCache(3);
AbstractService service =new HTTPService(null);
AbstractService service1 =new HTTPService(null);
AbstractService service2 =new HTTPService(null);
AbstractService service3 =new HTTPService(null);
cahce.put("1", service);
cahce.put("2", service1);
cahce.put("3", service2);
Assert.assertFalse(service.isClosed());
Assert.assertFalse(service1.isClosed());
Assert.assertFalse(service2.isClosed());
Assert.assertEquals(3, cahce.getSize());
cahce.put("4", service3);
cahce.put("5", service3);
cahce.put("4", service3);
cahce.put("5", service3);
Assert.assertTrue(service.isClosed());
Assert.assertTrue(service1.isClosed());
Assert.assertFalse(service2.isClosed());
Assert.assertEquals(3, cahce.getSize());
}
}