yangyoupeng

重构对阿里云消息服务的调用,以及增加相关测试。

Showing 23 changed files with 1280 additions and 656 deletions
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: AliMNSClinetFactory.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
/**
* class name:AliMNSClinetFactory <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
@Configuration
public class AliMNSClientFactory {
@Autowired
private AliMNSPropertiesConfigBean aliMNSPropertiesConfigBean;
@Bean
public MNSClient mNSClient(){
CloudAccount account = new CloudAccount(aliMNSPropertiesConfigBean.getSecurityAccessKey(), aliMNSPropertiesConfigBean.getSecuritySecretKey(), aliMNSPropertiesConfigBean.getEndPoint());
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全
return client;
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: AliMNSPropertiesConfigBean.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
/**
* class name:AliMNSPropertiesConfigBean <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
public class AliMNSConfig {
private String accessId;
private String accessKey;
private String accountEndpoint;
private String securityToken="";
private int poolMaxActive=8;
private int poolMaxIdle=2;
private int poolMinIdle=1;
public String getAccessId() {
return accessId;
}
public void setAccessId(String accessId) {
this.accessId = accessId;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getAccountEndpoint() {
return accountEndpoint;
}
public void setAccountEndpoint(String accountEndpoint) {
this.accountEndpoint = accountEndpoint;
}
public String getSecurityToken() {
return securityToken;
}
public void setSecurityToken(String securityToken) {
this.securityToken = securityToken;
}
public int getPoolMaxActive() {
return poolMaxActive;
}
public void setPoolMaxActive(int poolMaxActive) {
this.poolMaxActive = poolMaxActive;
}
public int getPoolMaxIdle() {
return poolMaxIdle;
}
public void setPoolMaxIdle(int poolMaxIdle) {
this.poolMaxIdle = poolMaxIdle;
}
public int getPoolMinIdle() {
return poolMinIdle;
}
public void setPoolMinIdle(int poolMinIdle) {
this.poolMinIdle = poolMinIdle;
}
public String toString(){
StringBuilder sbuilder=new StringBuilder();
sbuilder.append("{\"").append("accessId").append("\":\"").append(accessId).append("\",")
.append("accessKey").append("\":\"").append(accessKey).append("\",")
.append("accountEndpoint").append("\":\"").append(accountEndpoint).append("\",")
.append("securityToken").append("\":\"").append(securityToken).append("\",")
.append("poolMaxActive").append("\":").append(poolMaxActive).append(",")
.append("poolMaxIdle").append("\":").append(poolMaxIdle).append(",")
.append("poolMinIdle").append("\":").append(poolMinIdle).append("")
.append("}");
return sbuilder.toString();
}
}
package com.zhaoonline.message.queue;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* class name:AliMNSConfigBean <BR>
* class description: 阿里云消息服务的bean <BR>
* Remark: <BR>
* @version 1.00 2016年11月28日
* @author zhaoonline)yangyoupeng
*/
@Configuration
public class AliMNSConfigBean {
@Bean
@ConfigurationProperties("alpaca.components.alpaca-aliyun-mns2")
public AliMNSConfig aliyunConfig(){
return new AliMNSConfig();
}
@Bean
public CloudTopicProducerBuilder aliyunMnsBuilder(AliMNSConfig aliyunConfig){
return new CloudTopicProducerBuilder(aliyunConfig);
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: AliMNSFactory.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月4日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import java.util.Vector;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
/**
* class name:AliMNSFactory <BR>
* class description:阿里云的消息服务的的工厂类 <BR>
* Remark: <BR>
* @version 1.00 2016年8月4日
* @author zhaoonline)yangyoupeng
*/
public class AliMNSFactory {
public static CloudTopic getTopicRef( MNSClient mnsClient,String topic){
CloudTopic cloudTopic = mnsClient.getTopicRef(topic);
return cloudTopic;
}
public static CloudPullTopic createCloudPullTopic(MNSClient mnsClient,String topic,Vector<String> queueNameList){
Vector<CloudQueue> queueList = new Vector<CloudQueue>();
for(String queueName:queueNameList){
CloudQueue queue = mnsClient.getQueueRef(queueName);
queueList.add(queue);
}
CloudTopic cT=getTopicRef(mnsClient,topic);
CloudPullTopic pullTopic = new CloudPullTopic(cT, queueNameList, queueList);
return pullTopic;
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: AliMNSPropertiesConfigBean.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* class name:AliMNSPropertiesConfigBean <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
@Component
@ConfigurationProperties("alpaca.components.aliyun")
public class AliMNSPropertiesConfigBean {
//@Value("${component.mq.aliyun.topic}")
private String topic;
//@Value("${component.mq.aliyun.securityAccessKey}")
private String securityAccessKey;
//@Value("${component.mq.aliyun.securitySecretKey}")
private String securitySecretKey;
//@Value("${component.mq.aliyun.endPoint}")
private String endPoint;
//@Value("${component.mq.aliyun.topicQueue1}")
private String topicQueue1;
/**
* Method name: getTopic <BR>
* Description: please write your description <BR>
* @return String <BR>
*/
public String getTopic() {
return topic;
}
/**
* Method name: setTopic <BR>
* Description: please write your description <BR>
* @param topic String <BR>
*/
public void setTopic(String topic) {
this.topic = topic;
}
/**
* Method name: getSecurityAccessKey <BR>
* Description: please write your description <BR>
* @return String <BR>
*/
public String getSecurityAccessKey() {
return securityAccessKey;
}
/**
* Method name: setSecurityAccessKey <BR>
* Description: please write your description <BR>
* @param securityAccessKey String <BR>
*/
public void setSecurityAccessKey(String securityAccessKey) {
this.securityAccessKey = securityAccessKey;
}
/**
* Method name: getSecuritySecretKey <BR>
* Description: please write your description <BR>
* @return String <BR>
*/
public String getSecuritySecretKey() {
return securitySecretKey;
}
/**
* Method name: setSecuritySecretKey <BR>
* Description: please write your description <BR>
* @param securitySecretKey String <BR>
*/
public void setSecuritySecretKey(String securitySecretKey) {
this.securitySecretKey = securitySecretKey;
}
/**
* Method name: getEndPoint <BR>
* Description: please write your description <BR>
* @return String <BR>
*/
public String getEndPoint() {
return endPoint;
}
/**
* Method name: setEndPoint <BR>
* Description: please write your description <BR>
* @param endPoint String <BR>
*/
public void setEndPoint(String endPoint) {
this.endPoint = endPoint;
}
/**
* Method name: getTopicQueue1 <BR>
* Description: please write your description <BR>
* @return String <BR>
*/
public String getTopicQueue1() {
return topicQueue1;
}
/**
* Method name: setTopicQueue1 <BR>
* Description: please write your description <BR>
* @param topicQueue1 String <BR>
*/
public void setTopicQueue1(String topicQueue1) {
this.topicQueue1 = topicQueue1;
}
}
package com.zhaoonline.message.queue;
import org.apache.commons.pool.PoolableObjectFactory;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.DefaultMNSClient;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
/**
* class name:ClientPool <BR>
* class description: 阿里云client的链接对象池 <BR>
* Remark: <BR>
* @version 1.00 2016年11月28日
* @author zhaoonline)yangyoupeng
*/
public class ClientPoolFactory implements PoolableObjectFactory<MNSClient> {
AliMNSConfig config=null;
CloudAccount account =null;
private ClientConfiguration aliyunconfig =null;
public ClientPoolFactory(AliMNSConfig config){
this.config=config;
this.account = new CloudAccount(config.getAccessId(), config.getAccessKey(), config.getAccountEndpoint());
}
@Override
public MNSClient makeObject() throws Exception {
String accountEndpoint = account.getAccountEndpoint();
if(aliyunconfig == null){
aliyunconfig = new ClientConfiguration();
}
MNSClient mnsClient = new DefaultMNSClient(accountEndpoint, config.getAccessId(), config.getAccessKey(),aliyunconfig);
return mnsClient;
}
@Override
public void destroyObject(MNSClient client) throws Exception {
client.close();
}
@Override
public boolean validateObject(MNSClient client) {
return client.isOpen();
}
@Override
public void activateObject(MNSClient client) throws Exception {
}
@Override
public void passivateObject(MNSClient obj) throws Exception {
}
}
\ No newline at end of file
......@@ -10,10 +10,13 @@
package com.zhaoonline.message.queue;
import java.io.IOException;
import java.util.Vector;
import java.util.List;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.RawTopicMessage;
......@@ -26,47 +29,29 @@ import com.aliyun.mns.model.TopicMessage;
* @version 1.00 2016年8月3日
* @author zhaoonline)yangyoupeng
*/
public class CloudProducer implements ProducerInt<MessageObject,TopicMessage>{
public class CloudPullTopicProducer implements ProducerInt<MessageObject,TopicMessage>{
private static final Logger logger = LoggerFactory.getLogger(CloudProducer.class);
private static final Logger logger = LoggerFactory.getLogger(CloudPullTopicProducer.class);
private MNSClient mnsClient;
private String topic;
private CloudPullTopic cloudPullTopic;
private GenericObjectPool<MNSClient> pool;
private TopicQueueNameBuilder queueNameBuilder;
public CloudProducer(MNSClient mnsClient,TopicQueueNameBuilder queueNameBuilder){
@Autowired
public CloudPullTopicProducer(CloudPullTopic cloudPullTopic,MNSClient mnsClient, GenericObjectPool<MNSClient> pool){
this.mnsClient = mnsClient;
this.queueNameBuilder = queueNameBuilder;
this.cloudPullTopic = cloudPullTopic;
this.pool=pool;
startProducer();
}
private boolean start=false;
private void startProducer() {
start=true;
}
private Vector<String> queueNamelist=null;
/**
* cloudPullTopic:这是个广播topic.
*/
private CloudPullTopic cloudPullTopic =null;
/**
* Method name: init <BR>
* Description: init方法当topic不存在的时候就会抛出ServiceException异常 <BR>
* Remark: <BR> void<BR>
*/
public void init(){
topic=queueNameBuilder.getTopic();
queueNamelist=queueNameBuilder.getQueueNameList();
cloudPullTopic= AliMNSFactory.createCloudPullTopic(mnsClient, topic, queueNamelist);
startProducer();
}
/**
* Method name: getCloudPullTopic <BR>
* Description: please write your description <BR>
......@@ -76,15 +61,51 @@ public class CloudProducer implements ProducerInt<MessageObject,TopicMessage>{
return cloudPullTopic;
}
@Override
public void close(){
if(mnsClient!=null){
mnsClient.close();
try {
pool.returnObject(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
}
start=false;
}
public TopicMessage send(String msgBody) throws IOException {
return send(msgBody,null,null);
}
public TopicMessage send(String msgBody,String id,String tag) throws IOException {
TopicMessage topicMessage= new RawTopicMessage();
topicMessage.setBaseMessageBody(msgBody);
if(null != id && !id.isEmpty()){
topicMessage.setMessageId(id);
}
if(null != tag && !tag.isEmpty()){
topicMessage.setMessageTag(id);
}
return cloudPullTopic.publishMessage(topicMessage);
}
public TopicMessage send(Object object) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
return send(msgWrap);
}
public TopicMessage send(Object object,String id) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
msgWrap.setId(id);
return send(msgWrap);
}
public TopicMessage send(Object object,String tag,String id) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
msgWrap.setId(id);
msgWrap.setTag(tag);
return send(msgWrap);
}
/**
* @Override
* @see com.zhaoonline.message.queue.ProducerInt#send(java.lang.Object) <BR>
......@@ -96,17 +117,38 @@ public class CloudProducer implements ProducerInt<MessageObject,TopicMessage>{
* @throws IOException
*/
@Override
public TopicMessage send(MessageObject object) throws IOException {
public TopicMessage send(MessageObject msgObject) throws IOException {
TopicMessage topicMessage= new RawTopicMessage();
topicMessage.setMessageBody(object.insideObjectToBytesWithKryo());
topicMessage.setMessageTag(object.getTag());
topicMessage.setMessageId(object.getId());
String tag=msgObject.getTag();
if(null != tag &&
!tag.isEmpty()){
topicMessage.setMessageTag(tag);
}
String id=msgObject.getId();
if(null != id &&
!id.isEmpty()){
topicMessage.setMessageId(id);
}
topicMessage.setBaseMessageBody(msgObject.toJson());
TopicMessage result =cloudPullTopic.publishMessage(topicMessage);
return result;
}
/**
* Method name: getQueueNameList <BR>
* Description: 获取topic中存在的queue的名称 <BR>
* Remark: <BR>
* @return List<String><BR>
*/
public List<String> getQueueNameList(){
return cloudPullTopic.getQueueNameList();
}
public void deleteTopic(boolean needDeleteQueues){
cloudPullTopic.delete(needDeleteQueues);
}
/**
* @Override
* @see com.zhaoonline.message.queue.ProducerInt#isStart() <BR>
......@@ -119,4 +161,8 @@ public class CloudProducer implements ProducerInt<MessageObject,TopicMessage>{
public boolean isStart() {
return start;
}
public MNSClient getMnsClient() {
return mnsClient;
}
}
......
package com.zhaoonline.message.queue;
import java.util.Vector;
import javax.annotation.PreDestroy;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
/**
* class name:CloudTopicProducer <BR>
* class description:阿里云的Topic 的Producer的builder
* <BR>
* Remark: <BR>
* @version 1.00 2016年11月28日
* @author zhaoonline)yangyoupeng
*/
public class CloudTopicProducerBuilder {
private AliMNSConfig config=null;
private final GenericObjectPool<MNSClient> pool;
private static Logger logger=LoggerFactory.getLogger(CloudTopicProducerBuilder.class);
@Autowired
public CloudTopicProducerBuilder(AliMNSConfig config){
this.config=config;
pool= new GenericObjectPool<MNSClient>(new ClientPoolFactory(this.config));
pool.setMaxActive(this.config.getPoolMaxActive());
pool.setMaxIdle(this.config.getPoolMaxActive());
pool.setMinIdle(this.config.getPoolMinIdle());
logger.debug("init connect pool with config:[{}]",config);
}
/**
* Method name: buildTopic <BR>
* Description: 创建阿里云topic <BR>
* Remark: <BR>
* @param topic
* @return TopicProducer<BR>
* @throws Exception
*/
public P2PProducer buildP2PProducer(String queue) throws Exception{
P2PProducer producer=new P2PProducer(queue,pool);
return producer;
}
/**
* Method name: buildTopic <BR>
* Description: 创建阿里云topic <BR>
* Remark: <BR>
* @param topic
* @return TopicProducer<BR>
* @throws Exception
*/
public TopicProducer buildTopic(String topic) throws Exception{
TopicProducer producer=new TopicProducer(topic,pool);
return producer;
}
/**
* Method name: buildConsumer <BR>
* Description: 构建queue的consumer <BR>
* Remark: <BR>
* @param queue
* @return Consumer<BR>
* @throws Exception
*/
public Consumer buildConsumer(String queue) throws Exception{
Consumer consumer=new Consumer(queue,pool);
return consumer;
}
/**
* Method name: buildCloudTopic <BR>
* Description: 创建广播拉取消息模型
* https://help.aliyun.com/document_detail/34483.html?spm=5176.doc27434.6.709.WmlU7f <BR>
* Remark:让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到1对多的广播消息,又不需要暴露消费者的地址 <BR>
* @param topic
* @param createQueue 是否需要创建队列。
* @param queueNameList 队列的名称
* @return CloudPullTopicProducer<BR>
* @throws Exception
*/
public CloudPullTopicProducer buildCloudTopic(String topic,boolean createQueue,String... queueNameList) throws Exception{
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
return buildCloudTopic(topic, createQueue, queueMetaTemplate, queueNameList);
}
/**
* Method name: buildCloudTopic <BR>
* Description: buildCloudTopic <BR>
* Remark: <BR>
* @param topic
* @param createQueue
* @param queueMetaTemplate 创建队列的模板元数据
* @param queueNameList
* @return CloudPullTopicProducer<BR>
* @throws Exception
*/
public CloudPullTopicProducer buildCloudTopic(String topic,boolean createQueue,QueueMeta queueMetaTemplate,String... queueNameList) throws Exception{
MNSClient mnsClient =pool.borrowObject();
TopicMeta topicMeta =new TopicMeta();
topicMeta.setTopicName(topic);
Vector<String> queueNameVector=new Vector<>(queueNameList.length);
for(String queueName:queueNameList){
queueNameVector.add(queueName);
}
CloudPullTopic cloudPullTopic = mnsClient.createPullTopic(topicMeta, queueNameVector,createQueue,queueMetaTemplate);
CloudPullTopicProducer producer=new CloudPullTopicProducer(cloudPullTopic,mnsClient,pool);
return producer;
}
@PreDestroy
public void close(){
try {
pool.close();
logger.info("pool has bean closed");
} catch (Exception e) {
logger.error("error when pool closed,cause by ",e.getMessage());
}
}
}
......@@ -10,6 +10,9 @@
package com.zhaoonline.message.queue;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.springframework.beans.factory.annotation.Autowired;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
......@@ -37,12 +40,16 @@ public class Consumer {
* queueForConsumer:队列的消费者
*/
private CloudQueue queueForConsumer;
private GenericObjectPool<MNSClient> pool;
public Consumer(String queueName,MNSClient mnsClient){
@Autowired
public Consumer(String queueName,GenericObjectPool<MNSClient> pool) throws Exception{
this.queueName=queueName;
this.mnsClient = mnsClient;
this.pool=pool;
this.mnsClient = pool.borrowObject();
init();
}
/**
......@@ -51,14 +58,14 @@ public class Consumer {
* Remark: <BR>
* @return ZhaoMQConsumer<BR>
*/
public Consumer init(){
private Consumer init(){
queueForConsumer = mnsClient.getQueueRef(queueName);
return this;
}
/**
* Method name: popMessage <BR>
* Description: 只获取消息,并不删除消息,若队列中暂时没有消息,则等待timeout时间 <BR>
* Description: 若队列中暂时没有消息,则等待timeout时间 <BR>
* Remark: <BR>
* @param timeount
* @return Message<BR>
......@@ -68,45 +75,80 @@ public class Consumer {
return MessageObject.wrapMessage(result);
}
public MessageObject popMessage(){
Message result=queueForConsumer.popMessage();
return MessageObject.wrapMessage(result);
}
/**
* Method name: popMessageAndDelete <BR>
* Description: 获取消息之后,删除队列中的消息,此时才能继续消费下一条消息,若队列中暂时没有消息,则等待timeout时间 <BR>
* Method name: popMessage <BR>
* Description: pop完了之后会自动删除message。 <BR>
* Remark: <BR>
* @param timeount
* @return Message<BR>
* @return MessageObject<BR>
*/
public MessageObject popMessageAndDelete(int timeount){
Message result=queueForConsumer.popMessage(timeount);
deleteAlreadyGetMessage(result);
return MessageObject.wrapMessage(result);
}
public MessageObject popMessageAndDelete(){
Message result=queueForConsumer.popMessage();
deleteAlreadyGetMessage(result);
return MessageObject.wrapMessage(result);
public MessageObject popMessage(){
Message result=queueForConsumer.popMessage();
return MessageObject.wrapMessage(result);
}
// /**
// * Method name: popMessageAndDelete <BR>
// * Description: 获取消息之后,删除队列中的消息,此时才能继续消费下一条消息,若队列中暂时没有消息,则等待timeout时间 <BR>
// * Remark: <BR>
// * @param timeount
// * @return Message<BR>
// */
// public MessageObject popMessageAndDelete(int timeount){
// Message result=queueForConsumer.popMessage(timeount);
// deleteAlreadyGetMessage(result);
// return MessageObject.wrapMessage(result);
// }
//
// public MessageObject popMessageAndDelete(){
// Message result=queueForConsumer.popMessage();
// deleteAlreadyGetMessage(result);
// return MessageObject.wrapMessage(result);
// }
/**
* Method name: deleteAlreadyGetMessage <BR>
* Description: deleteAlreadyGetMessage <BR>
* Description: 供手动删除消息,因为peek返回的message的getReceiptHandle是null
* 删除是会抛出异常提示The receipt handle you provide is not valid
* <BR>
* Remark: <BR>
* @param result void<BR>
*/
private void deleteAlreadyGetMessage(Message result) {
public void deleteAlreadyGetMessage(Message result) {
if(result !=null){
queueForConsumer.deleteMessage(result.getReceiptHandle());
}
}
public void close(){
if(mnsClient!=null){
mnsClient.close();
try {
pool.returnObject(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* Method name: peekMessage <BR>
* Description: peek方法不会删除消息,但是peek方法可能会有些延时,所以需要
* {@code
* while(true){
* MessageObject object=peekMessage()
* if(object.getMessage() !=null){
* break;
* }
* }
* }等待拿到返回消息。注意无法调用调用{@link #deleteAlreadyGetMessage()}方法 来删除消息
* 因为peek返回的message的getReceiptHandle是null<BR>
* Remark: <BR>
* @return MessageObject<BR>
*/
public MessageObject peekMessage() {
Message result=queueForConsumer.peekMessage();
return MessageObject.wrapMessage(result);
}
}
......
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: MNSProperties.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
/**
* class name:MNSProperties <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
public class MNSProperties {
}
......@@ -15,8 +15,8 @@ import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import com.aliyun.mns.model.Message;
import com.zhaoonline.microservice.framework.serialization.KryoSerialize;
import com.zhaoonline.microservice.framework.serialization.ZhaoSerialize;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
......@@ -27,9 +27,7 @@ import com.zhaoonline.microservice.framework.serialization.ZhaoSerialize;
* @author zhaoonline)yangyoupeng
*/
public class MessageObject implements Serializable {
private static ZhaoSerialize serialization=new KryoSerialize() ;
private static ObjectMapper objectMapper=new ObjectMapper();
/**
* serialVersionUID:TODO
*/
......@@ -42,6 +40,20 @@ public class MessageObject implements Serializable {
*/
private Object insideObject;
private String id;
/**
* Method name: getId <BR>
* Description: getId <BR>
* Remark: <BR>
* @return String<BR>
*/
public String getId() {
return id;
}
public void setId(String id){
this.id=id;
}
/**
* Method name: MessageObject<BR>
* Description: 我们不开放构造函数,只能通过静态函数来完成对象构建<BR>
......@@ -84,9 +96,25 @@ public class MessageObject implements Serializable {
* @throws ClassNotFoundException
* @throws IOException T<BR>
*/
public <T> T msgToObjectWithKryo(Class<T> clazz) throws ClassNotFoundException, IOException{
if(!isMessageNull()){
return serialization.deserialize(message.getMessageBodyAsRawBytes(), clazz);
// public <T> T msgToObjectWithKryo(Class<T> clazz) throws ClassNotFoundException, IOException{
// if(!isMessageNull()){
// return serialization.deserialize(message.getMessageBodyAsRawBytes(), clazz);
// }
// return null;
// }
/**
* Method name: toObject <BR>
* Description: 将返回的消息(如果是json),转换为对象 <BR>
* Remark: <BR>
* @param clazz
* @return T<BR>
*/
public <T> T toObject(Class<T> clazz){
try {
return objectMapper.readValue(message.getMessageBodyAsRawString(), clazz);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
......@@ -135,23 +163,24 @@ public class MessageObject implements Serializable {
* @return byte []<BR>
* @throws IOException
*/
public byte[] insideObjectToBytesWithKryo() throws IOException {
return serialization.serialize(insideObject);
}
// public byte[] insideObjectToBytesWithKryo() throws IOException {
// return serialization.serialize(insideObject);
// }
private String id;
/**
* Method name: getId <BR>
* Description: getId <BR>
* Method name: toJson <BR>
* Description: 将内部封装的object转换为json <BR>
* Remark: <BR>
* @return String<BR>
*/
public String getId() {
return id;
public String toJson(){
try {
return objectMapper.writeValueAsString(insideObject);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public void setId(String id){
this.id=id;
}
}
......
......@@ -11,9 +11,13 @@ package com.zhaoonline.message.queue;
import java.io.IOException;
import org.apache.commons.pool.impl.GenericObjectPool;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.TopicMessage;
/**
* class name:P2PProducer <BR>
......@@ -31,13 +35,17 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
private MNSClient mnsClient;
private CloudQueue queue;
private GenericObjectPool<MNSClient> pool;
public P2PProducer(String queueName,MNSClient mnsClient){
public P2PProducer(String queueName,GenericObjectPool<MNSClient> pool) throws Exception{
this.queueName=queueName;
this.mnsClient = mnsClient;
this.mnsClient = pool.borrowObject();
this.pool = pool;
init();
}
public void init(){
private void init(){
queue=mnsClient.getQueueRef(this.queueName);
startProducer();
}
......@@ -56,10 +64,38 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
@Override
public void close() {
if(mnsClient!=null){
mnsClient.close();
try {
pool.returnObject(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public Message send(String msgBody) throws IOException {
return send(msgBody,null);
}
public Message send(String msgBody,String id) throws IOException {
Message msg =new Message(msgBody);
if(null != id && !id.isEmpty()){
msg.setMessageId(id);
}
return queue.putMessage(msg);
}
public Message send(Object object) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
return send(msgWrap);
}
public Message send(Object object,String id) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
msgWrap.setId(id);
return send(msgWrap);
}
/**
* @Override
* @see com.zhaoonline.message.queue.ProducerInt#send(java.lang.Object) <BR>
......@@ -72,8 +108,11 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
*/
@Override
public Message send(MessageObject object) throws IOException {
Message msg =new Message(object.insideObjectToBytesWithKryo());
msg.setMessageId(object.getId());
Message msg =new Message(object.toJson());
String id = object.getId();
if(id != null && id.isEmpty() ){
msg.setMessageId(id);
}
return queue.putMessage(msg);
}
......@@ -89,4 +128,12 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
public boolean isStart() {
return start;
}
public void deleteQueue(){
queue.delete();
}
public void createQueue(){
queue.create();
}
}
......
package com.zhaoonline.message.queue;
import java.io.IOException;
import org.apache.commons.pool.impl.GenericObjectPool;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.TopicMeta;
/**
* class name:TopicProducer <BR>
* class description: 发送topic信息 <BR>
* Remark: <BR>
* @version 1.00 2016年11月28日
* @author zhaoonline)yangyoupeng
*/
public class TopicProducer implements ProducerInt<MessageObject,TopicMessage> {
private CloudTopic cloudTopic;
private boolean start=false;
private MNSClient mnsClient=null;
private GenericObjectPool<MNSClient> pool =null;
public TopicProducer(String topicName , GenericObjectPool<MNSClient> pool) throws Exception {
this.mnsClient=pool.borrowObject();
this.cloudTopic =mnsClient.getTopicRef(topicName);
this.start= true;
this.pool=pool;
}
public TopicMessage send(String msgBody) throws IOException {
return send(msgBody,null,null);
}
public TopicMessage send(String msgBody,String id,String tag) throws IOException {
TopicMessage topicMessage= new RawTopicMessage();
topicMessage.setBaseMessageBody(msgBody);
if(null != id && !id.isEmpty()){
topicMessage.setMessageId(id);
}
if(null != tag && !tag.isEmpty()){
topicMessage.setMessageTag(id);
}
return cloudTopic.publishMessage(topicMessage);
}
public TopicMessage send(Object object) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
return send(msgWrap);
}
public TopicMessage send(Object object,String id) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
msgWrap.setId(id);
return send(msgWrap);
}
public TopicMessage send(Object object,String tag,String id) throws IOException {
MessageObject msgWrap=MessageObject.wrapObject(object);
msgWrap.setId(id);
msgWrap.setTag(tag);
return send(msgWrap);
}
@Override
public TopicMessage send(MessageObject msgObject) throws IOException {
TopicMessage topicMessage= new RawTopicMessage();
String tag=msgObject.getTag();
if(null != tag &&
!tag.isEmpty()){
topicMessage.setMessageTag(tag);
}
String id=msgObject.getId();
if(null != id &&
!id.isEmpty()){
topicMessage.setMessageId(id);
}
topicMessage.setBaseMessageBody(msgObject.toJson());
return cloudTopic.publishMessage(topicMessage);
}
/**
* Method name: creatTopic <BR>
* Description: 创建默认的topic <BR>
* Remark: <BR>
* @return String<BR>
*/
public String creatTopic(){
return cloudTopic.create();
}
/**
* Method name: creatTopic <BR>
* Description: 根据元数据创建topic <BR>
* Remark: <BR>
* @param meta
* @return String<BR>
*/
public String creatTopic(TopicMeta meta){
return cloudTopic.create(meta);
}
public void deleteTopic(){
cloudTopic.delete();
}
@Override
public void close() {
if(mnsClient!=null){
try {
pool.returnObject(mnsClient);
} catch (Exception e) {
e.printStackTrace();
}
}
start=false;
}
@Override
public boolean isStart() {
return start;
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: QueueNameBuilder.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月4日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import java.util.Vector;
/**
* class name:QueueNameBuilder <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月4日
* @author zhaoonline)yangyoupeng
*/
public class TopicQueueNameBuilder {
private String topic;
private Vector<String> queueNameList;
/**
* Method name: getQueueNameList <BR>
* Description: please write your description <BR>
* @return Vector<String> <BR>
*/
public Vector<String> getQueueNameList() {
return queueNameList;
}
/**
* Method name: setQueueNameList <BR>
* Description: please write your description <BR>
* @param queueNameList Vector<String> <BR>
*/
public void setQueueNameList(Vector<String> queueNameList) {
this.queueNameList = queueNameList;
}
/**
* Method name: getTopic <BR>
* Description: getTopic <BR>
* Remark: <BR>
* @return String<BR>
*/
public String getTopic() {
return topic;
}
/**
* Method name: setTopic <BR>
* Description: setTopic <BR>
* Remark: <BR>
* @param topic void<BR>
*/
public void setTopic(String topic) {
this.topic =topic;
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: TopicQueueNameBuilderFactory.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import java.util.Vector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* class name:TopicQueueNameBuilderFactory <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
@Configuration
public class TopicQueueNameBuilderFactory {
@Autowired
private AliMNSPropertiesConfigBean aliMNSPropertiesConfigBean;
@Bean
public TopicQueueNameBuilder queueNameBuilder(){
TopicQueueNameBuilder builder=new TopicQueueNameBuilder();
builder.setTopic(aliMNSPropertiesConfigBean.getTopic());
Vector<String> queueNameList = new Vector<String>();
String queue=aliMNSPropertiesConfigBean.getTopicQueue1();
queueNameList.add(queue);
builder.setQueueNameList(queueNameList);
return builder;
}
}
......@@ -124,7 +124,7 @@ public class TransactionQueueProducer implements ProducerInt<MessageObject,Messa
@Override
public Message send(MessageObject t) throws IOException {
Message msg= new Message ();
msg.setBaseMessageBody(t.insideObjectToBytesWithKryo());
msg.setBaseMessageBody(t.toJson());
msg.setMessageId(t.getId());
Message result=transQueue.sendTransMessage(msg, transOperation);
return result;
......@@ -142,8 +142,5 @@ public class TransactionQueueProducer implements ProducerInt<MessageObject,Messa
public boolean isStart() {
return start;
}
}
......
package com.zhaoonline.message.queue;
import java.util.NoSuchElementException;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.junit.Assert;
import org.junit.Test;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.DefaultMNSClient;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
public class TestClientPool {
@Test(expected=NoSuchElementException.class)
public void testCreatePool() throws Exception{
AliMNSConfig config=new AliMNSConfig();
config.setAccountEndpoint("http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
config.setAccessId("QfXG4pxBPbUEVk0p");
config.setAccessKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
final GenericObjectPool<MNSClient> mtPool =
new GenericObjectPool<MNSClient>(new TestSimpleFactory(config));
Assert.assertEquals(false, mtPool.isClosed());
int maxActive=3;
mtPool.setMaxActive(maxActive);
//三秒
mtPool.setMaxWait(3000);
Assert.assertEquals(maxActive, mtPool.getMaxActive());
MNSClient client1=mtPool.borrowObject();
MNSClient client2=mtPool.borrowObject();
MNSClient client3=mtPool.borrowObject();
//获取第四个client的时候应该报错或者有异常
MNSClient client4=mtPool.borrowObject();
}
@Test
public void testCreatePoolEvitAConnection() throws Exception{
AliMNSConfig config=new AliMNSConfig();
config.setAccountEndpoint("http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
config.setAccessId("QfXG4pxBPbUEVk0p");
config.setAccessKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
final GenericObjectPool<MNSClient> mtPool =
new GenericObjectPool<MNSClient>(new TestSimpleFactory(config));
Assert.assertEquals(false, mtPool.isClosed());
int maxActive=3;
mtPool.setMaxActive(maxActive);
//三秒
mtPool.setMaxWait(3000);
Assert.assertEquals(maxActive, mtPool.getMaxActive());
MNSClient client1=mtPool.borrowObject();
MNSClient client2=mtPool.borrowObject();
MNSClient client3=mtPool.borrowObject();
mtPool.returnObject(client1);
//获取第四个client的时候应该报错或者有异常
MNSClient client4=mtPool.borrowObject();
Assert.assertNotNull(client4);
}
static class TestSimpleFactory implements PoolableObjectFactory<MNSClient> {
AliMNSConfig config=null;
CloudAccount account =null;
private ClientConfiguration aliyunconfig =null;
public TestSimpleFactory(AliMNSConfig config){
this.config=config;
this.account = new CloudAccount(config.getAccessId(), config.getAccessKey(), config.getAccountEndpoint());
}
@Override
public MNSClient makeObject() throws Exception {
String accountEndpoint = account.getAccountEndpoint();
if(aliyunconfig == null){
aliyunconfig = new ClientConfiguration();
}
MNSClient mnsClient = new DefaultMNSClient(accountEndpoint, config.getAccessId(), config.getAccessKey(),aliyunconfig);
return mnsClient;
}
@Override
public void destroyObject(MNSClient client) throws Exception {
client.close();
}
@Override
public boolean validateObject(MNSClient client) {
return client.isOpen();
}
@Override
public void activateObject(MNSClient client) throws Exception {
}
@Override
public void passivateObject(MNSClient obj) throws Exception {
}
}
}
package com.zhaoonline.message.queue;
import java.io.IOException;
import java.util.Vector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.TopicMeta;
import com.zhaoonline.message.test.queue.KyroTestObject;
public class TestCloudProduceRef {
private static final String TEST_TOPIC_QUEUE = "testTopicQueue1";
CloudTopic topic=null;
MNSClient client =null;
CloudPullTopic cloudPullTopic=null;
@Before
public void prepare(){
// AliMNSPropertiesConfigBean aliMNSPropertiesConfigBean=new AliMNSPropertiesConfigBean();
// aliMNSPropertiesConfigBean.setSecuritySecretKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
// aliMNSPropertiesConfigBean.setEndPoint( "https://1384553363882565.mns.cn-hangzhou.aliyuncs.com/");
// aliMNSPropertiesConfigBean.setSecurityAccessKey("QfXG4pxBPbUEVk0p");
CloudAccount account = new CloudAccount("QfXG4pxBPbUEVk0p", "SI56mM6jEHE0pRzb7oawKeX8CiXC83", "http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全
topic= client.getTopicRef("testTopic1");
topic.delete();
CloudQueue cloudQueue= client.getQueueRef(TEST_TOPIC_QUEUE);
cloudQueue.delete();
Vector<String> queueNameList=new Vector<>(1);
queueNameList.add(TEST_TOPIC_QUEUE);
TopicMeta topicMeta=new TopicMeta();
topicMeta.setTopicName("testTopic1");
QueueMeta queryMetaTemplate=new QueueMeta();
cloudPullTopic=client.createPullTopic(topicMeta, queueNameList,true,queryMetaTemplate);
}
@Test
public void testPushWithTopicRef() throws IOException, ClassNotFoundException{
KyroTestObject o = new KyroTestObject();
o.setId(1);
o.setName("Kyro Hello");
// MessageObject messageObject = MessageObject.wrapObject(o);
TopicMessage topicMessage= new RawTopicMessage();
// topicMessage.setMessageBody("111111111");
topicMessage.setBaseMessageBody("111111111");
TopicMessage topicMessage2= topic.publishMessage(topicMessage);
CloudQueue cloudQueue= client.getQueueRef(TEST_TOPIC_QUEUE);
//等待十秒
Message popedMsg=cloudQueue.popMessage(10);
// KyroTestObject o2= MessageObject.wrapMessage(popedMsg).msgToObjectWithKryo(KyroTestObject.class);
// Assert.assertEquals(o.getId(), o2.getId());
// byte[] bodys=popedMsg.getMessageBodyAsBytes();
// System.out.println(popedMsg.getMessageBodyAsRawString());
Assert.assertEquals("111111111", popedMsg.getMessageBodyAsRawString());
topicMessage2= cloudPullTopic.publishMessage(topicMessage);
Message popedMsgAgain=cloudQueue.popMessage(10);
Assert.assertEquals("111111111", popedMsgAgain.getMessageBodyAsRawString());
// KyroTestObject o3= MessageObject.wrapMessage(popedMsgAgain).msgToObjectWithKryo(KyroTestObject.class);
// Assert.assertEquals(o.getId(), o3.getId());
cloudQueue.deleteMessage(popedMsgAgain.getReceiptHandle());
Message popedMsgAfterDelete=cloudQueue.popMessage(10);
System.out.println(popedMsgAfterDelete);
}
@After
public void clean(){
topic.delete();
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: TestCloudProducer.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月8日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.queue;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import org.hamcrest.core.IsEqual;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.TopicMessage;
import com.zhaoonline.alpaca.config.bootstrap.AlpacaApplication;
import com.zhaoonline.message.queue.CloudProducer;
import com.zhaoonline.message.queue.Consumer;
import com.zhaoonline.message.queue.MessageObject;
import com.zhaoonline.message.queue.TopicQueueNameBuilder;
import com.zhaoonline.message.test.queue.KyroTestObject;
/**
* class name:TestCloudProducer <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月8日
* @author zhaoonline)yangyoupeng
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-test.xml")
@EnableAutoConfiguration
@ComponentScan(basePackages={"com.zhaoonline.message.queue"})
public class TestCloudProducer {
private MNSClient mnsClient;
private TopicQueueNameBuilder queueNameBuilder;
@BeforeClass
public static void prepare(){
mockMnsPropertyFromZK();
}
/**
* Method name: mockMnsPropertyFromZK <BR>
* Description: mockMnsPropertyFromZK <BR>
* Remark: <BR> void<BR>
*/
private static void mockMnsPropertyFromZK() {
System.getProperties().setProperty("component.mq.aliyun.topic", "alpaca-test");
System.getProperties().setProperty("component.mq.aliyun.securityAccessKey", "QfXG4pxBPbUEVk0p");
System.getProperties().setProperty("component.mq.aliyun.securitySecretKey", "SI56mM6jEHE0pRzb7oawKeX8CiXC83");
System.getProperties().setProperty("component.mq.aliyun.endPoint", "http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
System.getProperties().setProperty("component.mq.aliyun.topicQueue1", "consumer001");
}
@Test
public void initCloudProducer() throws IOException, ClassNotFoundException{
ConfigurableApplicationContext context = AlpacaApplication.run(TestCloudProducer.class, "aa","aa");
mnsClient =context.getBean(MNSClient.class);
assertThat(mnsClient, notNullValue());
queueNameBuilder =context.getBean(TopicQueueNameBuilder.class);
assertThat(queueNameBuilder, notNullValue());
assertThat(queueNameBuilder.getTopic(), IsEqual.equalTo("alpaca-test"));
CloudProducer producer= new CloudProducer(mnsClient,queueNameBuilder);
producer.init();
assertThat(producer.getCloudPullTopic(), notNullValue());
assertThat(producer.isStart(), IsEqual.equalTo(true));
KyroTestObject o = new KyroTestObject();
o.setId(1);
o.setName("Kyro Hello");
MessageObject messageObject = MessageObject.wrapObject(o);
TopicMessage result = producer.send(messageObject);
Consumer consumer= new Consumer("consumer001",mnsClient);
consumer.init();
MessageObject consumeredObject = consumer.popMessage();
KyroTestObject kyroTestObject = consumeredObject.msgToObjectWithKryo(KyroTestObject.class);
// System.out.println(kyroTestObject.getId());
}
}
package com.zhaoonline.message.queue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.Message;
/**
* class name:TestP2PProducer <BR>
* class description: <BR>
* Remark: <BR>
* @version 1.00 2016年11月28日
* @author zhaoonline)yangyoupeng
*/
public class TestP2PProducer {
CloudTopicProducerBuilder builder=null;
AliMNSConfig config=null;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void prepare(){
config=new AliMNSConfig();
config.setAccountEndpoint("http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
config.setAccessId("QfXG4pxBPbUEVk0p");
config.setAccessKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
builder=new CloudTopicProducerBuilder(config);
}
@Test
public void testSendAndPopMesage() throws Exception{
P2PProducer producer=builder.buildP2PProducer("p2pTestQueue");
producer.deleteQueue();
producer.createQueue();
Message message=producer.send("helloworld");
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.popMessage();
Assert.assertNotNull(object.getMessage());
System.out.println(object.getMessage().getReceiptHandle());
System.out.println("----first pop-----"+object.getMessage());
System.out.println("----first pop-----"+object.getMessage().getMessageBodyAsString());
MessageObject object2= consumer.popMessage();
Assert.assertNull(object2.getMessage());
System.out.println("--second pop should null--"+object2.getMessage());
producer.deleteQueue();
}
@Test
public void testSendAndPeekMesage() throws Exception{
P2PProducer producer=builder.buildP2PProducer("p2pTestQueue");
producer.deleteQueue();
producer.createQueue();
Message message=producer.send("helloworld");
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.peekMessage();
System.out.println(object.getMessage().getReceiptHandle());
Assert.assertNotNull(object.getMessage());
System.out.println("----first peek-----"+object.getMessage());
System.out.println("----first peek-----"+object.getMessage().getMessageBodyAsString());
//删除失败,因为peek返回的message的getReceiptHandle是空,所以无法删除
thrown.expect(ServiceException.class);
consumer.deleteAlreadyGetMessage(message);
// MessageObject object2= consumer.peekMessage();
// Assert.assertNotNull(object2.getMessage());
// System.out.println("--second peek should not null--"+object2.getMessage());
// MessageObject object3= consumer.peekMessage();
// Assert.assertNull(object3.getMessage());
// System.out.println("--third peek is null after delete manually--"+object3.getMessage());
producer.deleteQueue();
}
}
package com.zhaoonline.message.queue;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.TopicMeta;
import com.zhaoonline.message.test.queue.KyroTestObject;
public class TestTopicProducer {
CloudTopicProducerBuilder builder=null;
AliMNSConfig config=null;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void prepare(){
config=new AliMNSConfig();
config.setAccountEndpoint("http://1384553363882565.mns.cn-hangzhou.aliyuncs.com");
config.setAccessId("QfXG4pxBPbUEVk0p");
config.setAccessKey("SI56mM6jEHE0pRzb7oawKeX8CiXC83");
builder=new CloudTopicProducerBuilder(config);
}
/**
* Method name: testPublishMessage <BR>
* Description: 会抛出异常错误:TopicNotExist <BR>
* Remark: <BR> void<BR>
* @throws Exception
*/
@Test(expected=ServiceException.class)
public void testPublishMessageNotExisted() throws Exception {
TopicProducer topicProducer=builder.buildTopic("testTopic2");
topicProducer.deleteTopic();
TopicMessage msg;
msg = topicProducer.send("this is a test");
System.out.println(msg);
}
/**
* Method name: testCreateTopic <BR>
* Description: 返回topic的url <BR>
* Remark: <BR> void<BR>
* @throws Exception
*/
@Test
public void testCreateTopic() throws Exception{
String topicName="testTopic2";
TopicProducer topicProducer=builder.buildTopic(topicName);
topicProducer.deleteTopic();
TopicMeta meta= new TopicMeta();
meta.setTopicName(topicName);
String topicUrl=config.getAccountEndpoint()+"/topics/"+topicName;
Assert.assertEquals(topicUrl, topicProducer.creatTopic(meta));
//create again.重复create没有异常抛出
String again=topicProducer.creatTopic(meta);
Assert.assertEquals(topicUrl,again);
topicProducer.deleteTopic();
//在TopicProducer创建不同的topicNamel;抛出异常
TopicMeta meta2= new TopicMeta();
String topicName2="testTopic3";
meta2.setTopicName(topicName2);
String topicUrl2=config.getAccountEndpoint()+"/topics/"+topicName2;
//TopicName conflict between meta topic name and topic url offered
thrown.expect(ClientException.class);
topicProducer.creatTopic(meta2);
}
/**
* Method name: testPublishMessageAfteCreateButNoQueue <BR>
* Description: 只创建topic而没有创建topic下属的queue,实际上就是只是采用了topic的sub/publish的功能:
* {@link https://help.aliyun.com/document_detail/32450.html?spm=5176.doc27431.6.635.Hm16st}
* 这是一种被动接受的消息只有在subscribe的后,才能收到消息 <BR>
* Remark:testPublishMessageAfteCreateButNoQueuen能够发送成功 <BR>
* @throws Exception
*/
@Test
public void testPublishMessageAfteCreateButNoQueue() throws Exception{
String topicName="testTopic2";
TopicProducer topicProducer=builder.buildTopic(topicName);
topicProducer.deleteTopic();
TopicMeta meta= new TopicMeta();
meta.setTopicName(topicName);
String topicUrl=config.getAccountEndpoint()+"/topics/"+topicName;
Assert.assertEquals(topicUrl, topicProducer.creatTopic(meta));
TopicMessage msg= topicProducer.send("this is a test","1");
System.out.println(msg);
}
/**
* Method name: testPublishMessageAfteCreateWithQueue <BR>
* Description: 采用基于topic的CloudPullTopic
* {@link https://help.aliyun.com/document_detail/34483.html?spm=5176.doc27434.6.709.WmlU7f} <BR>
* Remark: <BR>
* @throws Exception
*/
@Test
public void testPublishMessageAfteCreateWithQueue() throws Exception{
String topicName="testTopic2";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
topicProducer.deleteTopic(true);
topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
TopicMessage msg= topicProducer.send("this is a test","1","testTag");
//MessageID:18AF66F22CD8C6A-1-158A948D08F-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,
System.out.println(msg);
Consumer consumer=builder.buildConsumer("testQueue1");
MessageObject object= consumer.popMessage();
//MessageID:54153599B8D016BF-1-158A948DAA3-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,RequestID:583BB887048A930DC9816DB8,MessageBody:"���ƭz�",ReceiptHandle:"1-ODU4OTkzNDU5My0xNDgwMzA4OTAzLTEtOA==",DequeueCount:"1",EnqueueTime:"Mon Nov 28 12:54:33 CST 2016",FirstDequeueTime:"Mon Nov 28 12:54:33 CST 2016",NextVisibleTime:"Mon Nov 28 12:55:03 CST 2016",Priority:"8"
System.out.println(object.getMessage());
System.out.println(object.getMessage().getMessageBodyAsRawString());
topicProducer.deleteTopic(true);
}
@Test
public void publishObjectAndPop() throws Exception{
String topicName="testTopic2";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
topicProducer.deleteTopic(true);
topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
KyroTestObject testObject =new KyroTestObject();
testObject.setId(1);
testObject.setName("helloworld");
TopicMessage msg= topicProducer.send(testObject,"1","testTag");
//MessageID:18AF66F22CD8C6A-1-158A948D08F-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,
System.out.println(msg);
Consumer consumer=builder.buildConsumer("testQueue1");
MessageObject object= consumer.popMessage();
//MessageID:54153599B8D016BF-1-158A948DAA3-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,RequestID:583BB887048A930DC9816DB8,MessageBody:"���ƭz�",ReceiptHandle:"1-ODU4OTkzNDU5My0xNDgwMzA4OTAzLTEtOA==",DequeueCount:"1",EnqueueTime:"Mon Nov 28 12:54:33 CST 2016",FirstDequeueTime:"Mon Nov 28 12:54:33 CST 2016",NextVisibleTime:"Mon Nov 28 12:55:03 CST 2016",Priority:"8"
System.out.println("----first Pop------"+object.getMessage());
KyroTestObject testObjectFromAliyunMns= object.toObject(KyroTestObject.class);
Assert.assertEquals(testObject.getId(), testObjectFromAliyunMns.getId());
Assert.assertEquals(testObject.getName(), testObjectFromAliyunMns.getName());
//第二次pop。返回的是null。返回之间需要等待timeout
MessageObject object2= consumer.popMessage(4);
Assert.assertNull(object2.getMessage());
System.out.println("-------second pop----------"+object2.getMessage());
topicProducer.deleteTopic(true);
}
/**
* Method name: publishObjectAndPeek <BR>
* Description: peek方法不会删除消息,但是peek方法可能会有些延时,所以需要while等待拿到返回消息。如果需要删除,就需要调用delete方法<BR>
* Remark: <BR>
* @throws Exception void<BR>
*/
@Test
public void publishObjectAndPeek() throws Exception{
String topicName="testTopic2";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
topicProducer.deleteTopic(true);
topicProducer=builder.buildCloudTopic(topicName,true, "testQueue1");
KyroTestObject testObject =new KyroTestObject();
testObject.setId(1);
testObject.setName("helloworld");
TopicMessage msg= topicProducer.send(testObject,"1","testTag");
//MessageID:18AF66F22CD8C6A-1-158A948D08F-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,
System.out.println(msg);
Consumer consumer=builder.buildConsumer("testQueue1");
MessageObject object= null;
int count =1000;
while( count > 0){
object= consumer.peekMessage();
if(object.getMessage() !=null){
break;
}
Thread.sleep(count);
//count = count-1;
}
//MessageID:54153599B8D016BF-1-158A948DAA3-200000001,MessageMD5:54B0C58C7CE9F2A8B551351102EE0938,RequestID:583BB887048A930DC9816DB8,MessageBody:"���ƭz�",ReceiptHandle:"1-ODU4OTkzNDU5My0xNDgwMzA4OTAzLTEtOA==",DequeueCount:"1",EnqueueTime:"Mon Nov 28 12:54:33 CST 2016",FirstDequeueTime:"Mon Nov 28 12:54:33 CST 2016",NextVisibleTime:"Mon Nov 28 12:55:03 CST 2016",Priority:"8"
System.out.println(object.getMessage());
KyroTestObject testObjectFromAliyunMns= object.toObject(KyroTestObject.class);
Assert.assertEquals(testObject.getId(), testObjectFromAliyunMns.getId());
Assert.assertEquals(testObject.getName(), testObjectFromAliyunMns.getName());
//第二次pop。返回的是null。返回之间需要等待timeout
MessageObject object2= consumer.peekMessage();
Assert.assertNotNull(object2);
KyroTestObject testObjectFromAliyunMns2= object2.toObject(KyroTestObject.class);
Assert.assertEquals(testObject.getId(), testObjectFromAliyunMns2.getId());
Assert.assertEquals(testObject.getName(), testObjectFromAliyunMns2.getName());
System.out.println("-------second peek----------"+object2.getMessage());
topicProducer.deleteTopic(true);
}
}
package com.zhaoonline.message.queue;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import com.zhaoonline.alpaca.config.bootstrap.AlpacaApplication;
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(basePackageClasses={AliMNSConfigBean.class})
public class UsageExample {
public static void main(String[] args) {
AlpacaApplication.run(UsageExample.class, args);
}
}
/*
* All rights Reserved, Copyright (C) ZhaoOnline LIMITED 2016
* FileName: TestPropertiesLoader.java
* Version: $Revision$
* Package Name:com.zhaoonline.message.queue
* Modify record:
* NO. | Date | Name | Content
* 1 | 2016年8月4日 | zhaoonline)yangyoupeng | original version
*/
package com.zhaoonline.message.test.queue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.zhaoonline.alpaca.config.bootstrap.AlpacaApplication;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import org.hamcrest.core.IsEqual;
/**
* class name:TestPropertiesLoader <BR>
* class description: please write your description <BR>
* Remark: <BR>
* @version 1.00 2016年8月4日
* @author zhaoonline)yangyoupeng
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-test.xml")
@EnableAutoConfiguration
@ComponentScan(basePackageClasses={com.zhaoonline.message.test.queue.MockProperitesLoader.class,com.zhaoonline.message.test.queue.MockProperitesFactory.class})
public class TestPropertiesLoader implements ApplicationContextAware{
private ConfigurableApplicationContext configcontext;
/**
* Method name: testLoadGenericPropertyFromZK <BR>
* Description: 从ZK上面获取属性的key的方式。key的命名方式:"component"+Component.getName()+@ConfigurationProperties("component.testComponet")+属性字段名 <BR>
* @see com.zhaoonline.alpaca.config.bootstrap.Component#getName()
* @see com.zhaoonline.message.test.queue.MockProperitesLoader 中@ConfigurationProperties("component.testComponet")
* @see com.zhaoonline.message.test.queue.MockProperitesLoader#property
* 需要说明的是这里的MockProperitesLoader是一个POJO,
* 其中的所有属性可一个对应成为一个配置项。需要的是该POJO的代表的配置项用@ConfigurationProperties("component.testComponet")来指定一个前缀
* Remark: <BR>void<BR>
*/
@Test
public void testLoadGenericPropertyFromZK(){
System.setProperty("logging.config", "classpath:META/log/logback.xml");
ConfigurableApplicationContext context = AlpacaApplication.run(TestPropertiesLoader.class, "aa","aa");
String aa =context.getEnvironment().getProperty("component.testComponent.component.testComponent.property");
assertThat(aa, notNullValue());
// MockProperitesLoader mockProperitesLoader = context.getBean(MockProperitesLoader.class);
// assertThat(mockProperitesLoader.getProperty(),nullValue());
MockProperitesFactory configFactory=(MockProperitesFactory) context.getBean(MockProperitesFactory.class);
System.out.println(configFactory.getAccessKey());
assertThat(aa,IsEqual.equalTo(configFactory.getAccessKey()));
System.out.println(aa);
}
/**
* @Override
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) <BR>
* Method name: setApplicationContext <BR>
* Description: please write your description <BR>
* Remark: <BR>
* @param applicationContext
* @throws BeansException <BR>
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
configcontext= (ConfigurableApplicationContext) applicationContext;
}
public static void main(String[] args) {
System.setProperty("logging.config", "classpath:logback.xml");
ConfigurableApplicationContext context = AlpacaApplication.run(TestPropertiesLoader.class, "aa","aa");
String aa =context.getEnvironment().getProperty("component.testComponent.component.testComponent.property");
assertThat(aa, notNullValue());
// MockProperitesLoader mockProperitesLoader = context.getBean(MockProperitesLoader.class);
// System.out.println(mockProperitesLoader.getProperty());
// assertThat(mockProperitesLoader.getProperty(),nullValue());
MockProperitesFactory configFactory=(MockProperitesFactory) context.getBean(MockProperitesFactory.class);
System.out.println(configFactory.getAccessKey());
assertThat(aa,IsEqual.equalTo(configFactory.getAccessKey()));
System.out.println(aa);
}
}