yangyoupeng

新增CloudPullTopicConsumer已经测试例,作用是检查队列是否存在、若不存在就创建

package com.zhaoonline.message.queue;
import java.util.Vector;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
public class CloudPullTopicConsumer {
private static Logger logger=LoggerFactory.getLogger(CloudPullTopicConsumer.class);
private TopicMeta topicMeta =new TopicMeta();
private String queueName;
private GenericObjectPool<MNSClient> pool;
private QueueMeta queueMetaTemplate=new QueueMeta();
private CloudQueue queue=null;
private MNSClient mnsClient =null;
private boolean initiated=false;
public CloudPullTopicConsumer(String topic,GenericObjectPool<MNSClient> pool,String queue) throws Exception {
this(topic,pool,null,queue);
}
public CloudPullTopicConsumer(String topic,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,null,pool,queueMetaTemplate,queue);
}
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queue) throws Exception {
this(topic,topicMeta,pool,queueMetaTemplate,queue,true);
}
public CloudPullTopicConsumer(String topic,TopicMeta topicMeta,GenericObjectPool<MNSClient> pool,QueueMeta queueMetaTemplate,String queueName,boolean initiate) throws Exception {
if(topicMeta != null){
this.topicMeta =topicMeta;
}
this.topicMeta.setTopicName(topic);
this.queueName = queueName;
if(queueMetaTemplate != null){
this.queueMetaTemplate=queueMetaTemplate;
}else{
this.queueMetaTemplate.setPollingWaitSeconds(30);
}
initiated=initiate;
this.pool=pool;
init();
}
public void init() throws Exception{
mnsClient=pool.borrowObject();
queue=mnsClient.getQueueRef(this.queueName);
if(initiated){
if(queueNotExisted(queue)){
logger.info("queue:[{}] is not existed going to create it with queueMeta:[{}].",queue,queueMetaTemplate);
subScribeToTopic(queue);
}
}
}
public boolean queueExisted(){
return queue.isQueueExist();
}
private void subScribeToTopic(CloudQueue queue) throws Exception {
Vector<String> queueNameVector=new Vector<>(1);
queueNameVector.add(queueName);
CloudPullTopic topic = mnsClient.createPullTopic(topicMeta, queueNameVector, true, queueMetaTemplate);
logger.info("create a new queue:[{}] with queueMeta for topic:[{}] with topicMeta:[{}]",queue,queueMetaTemplate,topic,topicMeta);
}
private boolean queueNotExisted(CloudQueue queue) {
return !queue.isQueueExist();
}
public Message popMessage(){
return queue.popMessage();
}
public MessageObject popMessageObject(){
Message result=queue.popMessage();
return MessageObject.wrapMessage(result);
}
public Message popMessage(int timeount){
return queue.popMessage(timeount);
}
public MessageObject popMessageObject(int timeount){
Message result=queue.popMessage(timeount);
return MessageObject.wrapMessage(result);
}
public Message peekMessage() {
return queue.peekMessage();
}
public MessageObject peekMessageObject() {
Message result=queue.peekMessage();
return MessageObject.wrapMessage(result);
}
public void deleteAlreadyGetMessage(Message result) {
if(result !=null){
queue.deleteMessage(result.getReceiptHandle());
}
}
public void close(){
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -145,6 +145,22 @@ public class CloudPullTopicProducer implements ProducerInt<MessageObject,TopicMe
return cloudPullTopic.getQueueNameList();
}
/**
* Method name: deleteTopic <BR>
* Description: 该delete方法的作用是需要做重点说明:
* 调用deleteTopic(true) ,要想连同topic和queue一起删除,需要在构建CloudPullTopicProducer的使用
* {@code
* boolean createQueue=true;
* CloudPullTopicProducer cloudPullTopicProducer = CloudTopicProducerBuilder.buildCloudTopic(topicName,createQueue, 队列名1,队列名2)
* 或者使用MNSClient来创建:
* boolean createQueue=true;
* CloudPullTopic cloudPullTopic = mnsClient.createPullTopic(topicMeta, queueNameVector,createQueue,queueMetaTemplate);
* }
*直接只能删除初始化时输入的队列名。如上面的["队列名1,队列名2"] 或者是queueNameVector
* <BR>
* Remark: <BR>
* @param needDeleteQueues void<BR>
*/
public void deleteTopic(boolean needDeleteQueues){
cloudPullTopic.delete(needDeleteQueues);
}
......
......@@ -64,6 +64,48 @@ public class CloudTopicProducerBuilder {
}
/**
* Method name: buildTopicPullConsumer <BR>
* Description: topicPullConsumer在检查queue是否存在,不存在就会创建.topicMeta和queue都采用默认值 <BR>
* Remark: <BR>
* @param queue
* @return Consumer<BR>
* @throws Exception
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topic,String queue) throws Exception{
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,pool,queue);
return consumer;
}
/**
* Method name: buildTopicPullConsumer <BR>
* Description: <BR>
* Remark: <BR>
* @param topicName
* @param queue
* @param initiate 是否需要立刻init,如果为false,那么需要手动调用{@link CloudPullTopicConsumer#init()}方法,主要供测试用
* @return
* @throws Exception CloudPullTopicConsumer<BR>
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topicName, String queue, boolean initiate) throws Exception {
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topicName,null,pool,null,queue,initiate);
return consumer;
}
/**
* Method name: buildTopicPullConsumer <BR>
* Description: topicPullConsumer在检查queue是否存在,不存在就会创建。指定使用topMeta以及queueMeta<BR>
* Remark: <BR>
* @param queue
* @return Consumer<BR>
* @throws Exception
*/
public CloudPullTopicConsumer buildTopicPullConsumer(String topic,TopicMeta topicMeta,QueueMeta queueMeta,String queue) throws Exception{
CloudPullTopicConsumer consumer=new CloudPullTopicConsumer(topic,topicMeta, pool, queueMeta,queue);
return consumer;
}
/**
* Method name: buildConsumer <BR>
* Description: 构建queue的consumer <BR>
* Remark: <BR>
......@@ -129,4 +171,6 @@ public class CloudTopicProducerBuilder {
logger.error("error when pool closed,cause by ",e.getMessage());
}
}
}
......
......@@ -70,21 +70,34 @@ public class Consumer {
* @param timeount
* @return Message<BR>
*/
public MessageObject popMessage(int timeount){
public Message popMessage(int timeount){
return queueForConsumer.popMessage(timeount);
}
public MessageObject popMessageObject(int timeount){
Message result=queueForConsumer.popMessage(timeount);
return MessageObject.wrapMessage(result);
}
/**
* Method name: popMessage <BR>
* Description: pop完了之后会自动删除message。 <BR>
* Remark: <BR>
* @return MessageObject<BR>
*/
public MessageObject popMessage(){
public Message popMessage(){
return queueForConsumer.popMessage();
}
public MessageObject popMessageObject(){
Message result=queueForConsumer.popMessage();
return MessageObject.wrapMessage(result);
}
// /**
// * Method name: popMessageAndDelete <BR>
// * Description: 获取消息之后,删除队列中的消息,此时才能继续消费下一条消息,若队列中暂时没有消息,则等待timeout时间 <BR>
......@@ -143,12 +156,14 @@ public class Consumer {
* Remark: <BR>
* @return MessageObject<BR>
*/
public MessageObject peekMessage() {
public MessageObject peekMessageObject() {
Message result=queueForConsumer.peekMessage();
return MessageObject.wrapMessage(result);
}
public Message peekMessage() {
return queueForConsumer.peekMessage();
}
}
......
package com.zhaoonline.message.queue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.aliyun.mns.model.Message;
public class TestCloudPullTopicConsumer {
CloudTopicProducerBuilder builder=null;
AliMNSConfig config=null;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void prepare(){
config=new AliMNSConfig();
config.setAccountEndpoint("http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
config.setAccessId("QfXG4pxBPbUEVk0p");
config.setAccessKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
builder=new CloudTopicProducerBuilder(config);
}
/**
* Method name: testCheckQueueIsExisted <BR>
* Description: 测试queue是否已经存在 <BR>
* Remark: <BR>
* @throws Exception void<BR>
*/
@Test
public void testCheckQueueIsExisted() throws Exception{
String topicName="testTopic1";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
String testQueue1 = "testQu1eue";
String testQueue2 = "testQ1ueu";
TopicProducer topic=builder.buildTopic(topicName);
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, testQueue1,testQueue2);
topicProducer.deleteTopic(true);
boolean initiate=false;
CloudPullTopicConsumer consumer=builder.buildTopicPullConsumer(topicName, testQueue1,initiate);
Assert.assertFalse(consumer.queueExisted());
topicProducer.deleteTopic(true);
//
CloudPullTopicConsumer consumer2=builder.buildTopicPullConsumer(topicName, testQueue1,true);
Assert.assertTrue(consumer2.queueExisted());
topicProducer.deleteTopic(true);
}
/**
* Method name: testAddNewQueue2ExitedTopic <BR>
* Description: 新增一个队列到已经存在的topic上面去 <BR>
* Remark: <BR>
* @throws Exception void<BR>
*/
@Test
public void testAddNewQueue2ExitedTopic() throws Exception{
String topicName="testTopic1";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
String testQueue1 = "testQu1eue";
String testQueue2 = "testQ1ueu";
TopicProducer topic=builder.buildTopic(topicName);
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, testQueue1,testQueue2);
topicProducer.deleteTopic(true);
//创建一个topic,并且他包含一个testQueue1;
CloudPullTopicProducer topicProducer2=builder.buildCloudTopic(topicName,true, testQueue1);
topicProducer2.send("hellow world1");
CloudPullTopicConsumer consumer1=builder.buildTopicPullConsumer(topicName, testQueue1);
Message msg1=consumer1.popMessage();
Assert.assertEquals("hellow world1", msg1.getMessageBodyAsRawString());
//新建一个consumer。他消费testQueue2,这个testQueue2并不存在
CloudPullTopicConsumer consumer2=builder.buildTopicPullConsumer(topicName, testQueue2);
topicProducer2.send("hellow world2");
Message msg2_1=consumer1.popMessage();
Message msg2_2=consumer2.popMessage();
Assert.assertEquals("hellow world2", msg2_1.getMessageBodyAsRawString());
Assert.assertEquals("hellow world2", msg2_2.getMessageBodyAsRawString());
topicProducer.deleteTopic(true);
}
}
......@@ -42,14 +42,14 @@ public class TestP2PProducer {
Message message=producer.send("helloworld");
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.popMessage();
MessageObject object= consumer.popMessageObject();
Assert.assertNotNull(object.getMessage());
System.out.println(object.getMessage().getReceiptHandle());
System.out.println("----first pop-----"+object.getMessage());
System.out.println("----first pop-----"+object.getMessage().getMessageBodyAsString());
MessageObject object2= consumer.popMessage();
MessageObject object2= consumer.popMessageObject();
Assert.assertNull(object2.getMessage());
System.out.println("--second pop should null--"+object2.getMessage());
......@@ -68,7 +68,7 @@ public class TestP2PProducer {
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.peekMessage();
MessageObject object= consumer.peekMessageObject();
System.out.println(object.getMessage().getReceiptHandle());
......