yangyoupeng

重构,将consumer和producer与pool解耦。

package com.zhaoonline.message.queue;
import org.apache.commons.pool.impl.GenericObjectPool;
import com.aliyun.mns.client.MNSClient;
public enum ClientLifeCycleManger {
INSTANCE;
private boolean alreadyInitiated=false;
private GenericObjectPool<MNSClient> pool =null;
/**
* Method name: reclaimClient <BR>
* Description: 回收这个client <BR>
* Remark: <BR>
* @param client
* @throws Exception void<BR>
*/
public void reclaimClient(MNSClient client) throws Exception{
if(pool != null){
pool.returnObject(client);
}
}
/**
* Method name: initPool <BR>
* Description: 将pool设置进入{@link ClientLifeCycleManger}
* 若pool已经存在返回false,不进行任何设置,若pool不存在返回true,设置pool<BR>
* Remark: <BR>
* @param pool
* @return boolean<BR>
*/
public boolean initPool(final GenericObjectPool<MNSClient> pool) {
if(alreadyInitiated){
return false;
}
if(this.pool ==null){
this.pool= pool;
alreadyInitiated=true;
}
return true;
}
public MNSClient borrowClient() throws Exception {
if(pool != null){
return pool.borrowObject();
}
return null;
}
public void close() throws Exception {
pool.close();
}
}
......@@ -19,24 +19,23 @@ public class CloudPullTopicConsumer {
private TopicMeta topicMeta =new TopicMeta();
private String queueName;
private GenericObjectPool<MNSClient> pool;
private QueueMeta queueMetaTemplate=new QueueMeta();
private CloudQueue queue=null;
private MNSClient mnsClient =null;
private boolean initiated=false;
public CloudPullTopicConsumer(String topic,GenericObjectPool<MNSClient> pool,String queue) throws Exception {
this(topic,pool,null,queue);
public CloudPullTopicConsumer(String topic,MNSClient client,String queue) throws Exception {
this(topic,client,null,queue);
}
public CloudPullTopicConsumer(String topic,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,null,pool,queueMetaTemplate,queue);
public CloudPullTopicConsumer(String topic,MNSClient client,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,null,client,queueMetaTemplate,queue);
}
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,topicMeta,pool,queueMetaTemplate,queue,true);
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,MNSClient client,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,topicMeta,client,queueMetaTemplate,queue,true);
}
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queueName,boolean initiate) throws Exception {
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,MNSClient client,QueueMeta queueMetaTemplate,String queueName,boolean initiate) throws Exception {
if(topicMeta != null){
this.topicMeta =topicMeta;
}
......@@ -47,13 +46,12 @@ public class CloudPullTopicConsumer {
}else{
this.queueMetaTemplate.setPollingWaitSeconds(30);
}
this.mnsClient=client;
initiated=initiate;
this.pool=pool;
init();
}
public void init() throws Exception{
mnsClient=pool.borrowObject();
queue=mnsClient.getQueueRef(this.queueName);
if(initiated){
if(queueNotExisted(queue)){
......@@ -116,7 +114,7 @@ public class CloudPullTopicConsumer {
public void close(){
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
ClientLifeCycleManger.INSTANCE.reclaimClient(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -37,13 +37,11 @@ public class CloudPullTopicProducer implements ProducerInt<MessageObject,TopicMe
private CloudPullTopic cloudPullTopic;
private GenericObjectPool<MNSClient> pool;
@Autowired
public CloudPullTopicProducer(CloudPullTopic cloudPullTopic,MNSClient mnsClient, GenericObjectPool<MNSClient> pool){
public CloudPullTopicProducer(CloudPullTopic cloudPullTopic,MNSClient mnsClient){
this.mnsClient = mnsClient;
this.cloudPullTopic = cloudPullTopic;
this.pool=pool;
startProducer();
}
private boolean start=false;
......@@ -65,7 +63,7 @@ public class CloudPullTopicProducer implements ProducerInt<MessageObject,TopicMe
public void close(){
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
ClientLifeCycleManger.INSTANCE.reclaimClient(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -23,7 +23,7 @@ import com.aliyun.mns.model.TopicMeta;
* @author zhaoonline)yangyoupeng
*/
public class CloudTopicProducerBuilder {
private ClientLifeCycleManger lifeCycleManger=ClientLifeCycleManger.INSTANCE;
private AliMNSConfig config=null;
private final GenericObjectPool<MNSClient> pool;
private static Logger logger=LoggerFactory.getLogger(CloudTopicProducerBuilder.class);
......@@ -34,6 +34,7 @@ public class CloudTopicProducerBuilder {
pool.setMaxActive(this.config.getPoolMaxActive());
pool.setMaxIdle(this.config.getPoolMaxActive());
pool.setMinIdle(this.config.getPoolMinIdle());
lifeCycleManger.initPool(pool);
logger.debug("init connect pool with config:[{}]",config);
}
......@@ -46,7 +47,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public P2PProducer buildP2PProducer(String queue) throws Exception{
P2PProducer producer=new P2PProducer(queue,pool);
MNSClient client=lifeCycleManger.borrowClient();
P2PProducer producer=new P2PProducer(queue,client);
return producer;
}
......@@ -59,7 +61,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public TopicProducer buildTopic(String topic) throws Exception{
TopicProducer producer=new TopicProducer(topic,pool);
MNSClient client=lifeCycleManger.borrowClient();
TopicProducer producer=new TopicProducer(topic,client);
return producer;
}
......@@ -72,7 +75,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topic,String queue) throws Exception{
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,pool,queue);
MNSClient client=lifeCycleManger.borrowClient();
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,client,queue);
return consumer;
}
......@@ -87,7 +91,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception CloudPullTopicConsumer<BR>
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topicName, String queue, boolean initiate) throws Exception {
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topicName,null,pool,null,queue,initiate);
MNSClient client=lifeCycleManger.borrowClient();
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topicName,null,client,null,queue,initiate);
return consumer;
}
/**
......@@ -99,7 +104,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topic,TopicMeta topicMeta,QueueMeta queueMeta,String queue) throws Exception{
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,topicMeta, pool, queueMeta,queue);
MNSClient client=lifeCycleManger.borrowClient();
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,topicMeta, client, queueMeta,queue);
return consumer;
}
......@@ -114,7 +120,8 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public Consumer buildConsumer(String queue) throws Exception{
Consumer consumer=new Consumer(queue,pool);
MNSClient client=lifeCycleManger.borrowClient();
Consumer consumer=new Consumer(queue,client);
return consumer;
}
......@@ -148,7 +155,7 @@ public class CloudTopicProducerBuilder {
* @throws Exception
*/
public CloudPullTopicProducer buildCloudTopic(String topic,boolean createQueue,QueueMeta queueMetaTemplate,String... queueNameList) throws Exception{
MNSClient mnsClient =pool.borrowObject();
MNSClient mnsClient=lifeCycleManger.borrowClient();
TopicMeta topicMeta =new TopicMeta();
topicMeta.setTopicName(topic);
......@@ -158,14 +165,14 @@ public class CloudTopicProducerBuilder {
}
CloudPullTopic cloudPullTopic = mnsClient.createPullTopic(topicMeta, queueNameVector,createQueue,queueMetaTemplate);
CloudPullTopicProducer producer=new CloudPullTopicProducer(cloudPullTopic,mnsClient,pool);
CloudPullTopicProducer producer=new CloudPullTopicProducer(cloudPullTopic,mnsClient);
return producer;
}
@PreDestroy
public void close(){
try {
pool.close();
lifeCycleManger.close();
logger.info("pool has bean closed");
} catch (Exception e) {
logger.error("error when pool closed,cause by ",e.getMessage());
......
......@@ -41,14 +41,12 @@ public class Consumer {
*/
private CloudQueue queueForConsumer;
private GenericObjectPool<MNSClient> pool;
@Autowired
public Consumer(String queueName,GenericObjectPool<MNSClient> pool) throws Exception{
public Consumer(String queueName,MNSClient client) throws Exception{
this.queueName=queueName;
this.pool=pool;
this.mnsClient = pool.borrowObject();
this.mnsClient = client;
init();
}
......@@ -134,7 +132,7 @@ public class Consumer {
public void close(){
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
ClientLifeCycleManger.INSTANCE.reclaimClient(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -36,12 +36,10 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
private CloudQueue queue;
private GenericObjectPool<MNSClient> pool;
public P2PProducer(String queueName,GenericObjectPool<MNSClient> pool) throws Exception{
public P2PProducer(String queueName,MNSClient client) throws Exception{
this.queueName=queueName;
this.mnsClient = pool.borrowObject();
this.pool = pool;
this.mnsClient = client;
init();
}
......@@ -65,7 +63,7 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
public void close() {
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
ClientLifeCycleManger.INSTANCE.reclaimClient(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -21,12 +21,10 @@ public class TopicProducer implements ProducerInt<MessageObject,TopicMessage> {
private CloudTopic cloudTopic;
private boolean start=false;
private MNSClient mnsClient=null;
private GenericObjectPool<MNSClient> pool =null;
public TopicProducer(String topicName , GenericObjectPool<MNSClient> pool) throws Exception {
this.mnsClient=pool.borrowObject();
public TopicProducer(String topicName , MNSClient mnsClient) throws Exception {
this.mnsClient=mnsClient;
this.cloudTopic =mnsClient.getTopicRef(topicName);
this.start= true;
this.pool=pool;
}
......@@ -110,7 +108,7 @@ public class TopicProducer implements ProducerInt<MessageObject,TopicMessage> {
public void close() {
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
ClientLifeCycleManger.INSTANCE.reclaimClient(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
......