yangyoupeng

修复bug

......@@ -127,7 +127,7 @@ public class CloudPullTopicProducer implements ProducerInt<MessageObject,TopicMe
!id.isEmpty()){
topicMessage.setMessageId(id);
}
topicMessage.setBaseMessageBody(msgObject.toJson());
topicMessage.setBaseMessageBody(msgObject.toStr());
TopicMessage result =cloudPullTopic.publishMessage(topicMessage);
return result;
}
......
......@@ -12,11 +12,14 @@ package com.zhaoonline.message.queue;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.SerializationUtils;
import com.aliyun.mns.model.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhaoonline.message.test.queue.KyroTestObject;
import com.zhaoonline.microservice.framework.serialization.KryoSerialize;
/**
......@@ -27,7 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author zhaoonline)yangyoupeng
*/
public class MessageObject implements Serializable {
private static ObjectMapper objectMapper=new ObjectMapper();
// private static ObjectMapper objectMapper=new ObjectMapper();
private static KryoSerialize kryoSerialize=new KryoSerialize();
/**
* serialVersionUID:TODO
*/
......@@ -112,9 +116,16 @@ public class MessageObject implements Serializable {
*/
public <T> T toObject(Class<T> clazz){
try {
return objectMapper.readValue(message.getMessageBodyAsRawString(), clazz);
if(message == null){
return null;
}
byte[] bytes=Base64.decodeBase64(message.getMessageBodyAsRawBytes());
return kryoSerialize.deserialize(bytes, clazz);
// return objectMapper.readValue(message.getMessageBodyAsRawString(), clazz);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
......@@ -173,10 +184,13 @@ public class MessageObject implements Serializable {
* Remark: <BR>
* @return String<BR>
*/
public String toJson(){
public String toStr(){
try {
return objectMapper.writeValueAsString(insideObject);
} catch (JsonProcessingException e) {
// return objectMapper.writeValueAsString(insideObject);
byte[] serializedBytes=kryoSerialize.serialize(insideObject);
String encoderString =Base64.encodeBase64String(serializedBytes);
return encoderString;
} catch (IOException e) {
e.printStackTrace();
}
return null;
......
......@@ -76,7 +76,8 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
}
public Message send(String msgBody,String id) throws IOException {
Message msg =new Message(msgBody);
Message msg =new Message();
msg.setBaseMessageBody(msgBody);
if(null != id && !id.isEmpty()){
msg.setMessageId(id);
}
......@@ -107,7 +108,8 @@ public class P2PProducer implements ProducerInt<MessageObject,Message>{
*/
@Override
public Message send(MessageObject object) throws IOException {
Message msg =new Message(object.toJson());
Message msg =new Message();
msg.setBaseMessageBody(object.toStr());
String id = object.getId();
if(id != null && id.isEmpty() ){
msg.setMessageId(id);
......
......@@ -75,7 +75,7 @@ public class TopicProducer implements ProducerInt<MessageObject,TopicMessage> {
!id.isEmpty()){
topicMessage.setMessageId(id);
}
topicMessage.setBaseMessageBody(msgObject.toJson());
topicMessage.setBaseMessageBody(msgObject.toStr());
return cloudTopic.publishMessage(topicMessage);
}
......
......@@ -124,7 +124,7 @@ public class TransactionQueueProducer implements ProducerInt<MessageObject,Messa
@Override
public Message send(MessageObject t) throws IOException {
Message msg= new Message ();
msg.setBaseMessageBody(t.toJson());
msg.setBaseMessageBody(t.toStr());
msg.setMessageId(t.getId());
Message result=transQueue.sendTransMessage(msg, transOperation);
return result;
......
......@@ -4,7 +4,6 @@ import java.util.NoSuchElementException;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.junit.Assert;
import org.junit.Test;
......
package com.zhaoonline.message.queue;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.aliyun.mns.model.Message;
import com.zhaoonline.message.test.queue.KyroTestObject;
import com.zhaoonline.microservice.framework.serialization.KryoSerialize;
public class TestCloudPullTopicConsumer {
......@@ -39,7 +41,7 @@ public class TestCloudPullTopicConsumer {
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
String testQueue1 = "testQu1eue";
String testQueue2 = "testQ1ueu";
TopicProducer topic=builder.buildTopic(topicName);
// TopicProducer topic=builder.buildTopic(topicName);
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, testQueue1,testQueue2);
topicProducer.deleteTopic(true);
......@@ -105,4 +107,69 @@ public class TestCloudPullTopicConsumer {
}
@Test
public void testSendStringWithKyroBase64() throws Exception{
String topicName="testTopic1";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
String testQueue1 = "testQu1eue";
String testQueue2 = "testQ1ueu";
TopicProducer topic=builder.buildTopic(topicName);
topic.deleteTopic();
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, testQueue1,testQueue2);
topic=builder.buildTopic(topicName);
KyroTestObject kyroTestObject =new KyroTestObject();
kyroTestObject.setId(1);
kyroTestObject.setName("hellow world1");
KryoSerialize kryoSerialize=new KryoSerialize();
byte[] serializedBytes=kryoSerialize.serialize(kyroTestObject);
// BASE64Encoder encoder=new sun.misc.BASE64Encoder();
// String encoderString=encoder.encode(serializedBytes);
String encoderString =Base64.encodeBase64String(serializedBytes);
topic.send(encoderString);
CloudPullTopicConsumer consumer1=builder.buildTopicPullConsumer(topicName, testQueue1);
Message msg1=consumer1.popMessage();
// BASE64Decoder decoder=new sun.misc.BASE64Decoder();
// byte[] bytes=decoder.decodeBuffer(msg1.getMessageBodyAsBase64());
byte[] bytes=Base64.decodeBase64(msg1.getMessageBodyAsRawBytes());
KyroTestObject deSerObject= kryoSerialize.deserialize(bytes, KyroTestObject.class);
Assert.assertEquals(kyroTestObject.getId(),deSerObject.getId());
Assert.assertEquals(kyroTestObject.getName(),deSerObject.getName());
topicProducer.deleteTopic(true);
}
@Test
public void testSendObjectWithKyroBase64() throws Exception{
String topicName="testTopic1";
//createQueue=false的是时候,只会创建topic,而不会创建queue,测试如果publish的时候就会报异常错误
String testQueue1 = "testQu1eue";
String testQueue2 = "testQ1ueu";
TopicProducer topic=builder.buildTopic(topicName);
topic.deleteTopic();
CloudPullTopicProducer topicProducer=builder.buildCloudTopic(topicName,true, testQueue1,testQueue2);
topic=builder.buildTopic(topicName);
KyroTestObject kyroTestObject =new KyroTestObject();
kyroTestObject.setId(1);
kyroTestObject.setName("hellow world1");
topic.send(kyroTestObject);
CloudPullTopicConsumer consumer1=builder.buildTopicPullConsumer(topicName, testQueue1);
MessageObject msg1=consumer1.popMessageObject();
KyroTestObject deSerObject= msg1.toObject(KyroTestObject.class);
Assert.assertEquals(kyroTestObject.getId(),deSerObject.getId());
Assert.assertEquals(kyroTestObject.getName(),deSerObject.getName());
topicProducer.deleteTopic(true);
}
}
......
package com.zhaoonline.message.queue;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
......@@ -8,6 +9,8 @@ import org.junit.rules.ExpectedException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.Message;
import com.zhaoonline.message.test.queue.KyroTestObject;
import com.zhaoonline.microservice.framework.serialization.KryoSerialize;
/**
* class name:TestP2PProducer <BR>
......@@ -43,14 +46,22 @@ public class TestP2PProducer {
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.popMessageObject();
Assert.assertNotNull(object.getMessage());
System.out.println(object.getMessage().getReceiptHandle());
System.out.println("----first pop-----"+object.getMessage());
System.out.println("----first pop-----"+object.getMessage().getMessageBodyAsString());
System.out.println("----first pop-----"+object.getMessage().getMessageBodyAsRawString());
Assert.assertEquals("helloworld", object.getMessage().getMessageBodyAsRawString());
//由于send的时候。并没有走object接口,所以是不会对其进行序列化的。返回null
String string=object.toObject(String.class);
Assert.assertNull(string);
MessageObject object2= consumer.popMessageObject();
Assert.assertNull(object2.getMessage());
System.out.println("--second pop should null--"+object2.getMessage());
producer.deleteQueue();
......@@ -68,11 +79,14 @@ public class TestP2PProducer {
System.out.println(message.getReceiptHandle());
Consumer consumer=builder.buildConsumer("p2pTestQueue");
MessageObject object= consumer.peekMessageObject();
System.out.println(object.getMessage().getReceiptHandle());
Assert.assertNotNull(object.getMessage());
Assert.assertEquals("helloworld", object.getMessage().getMessageBodyAsString());
Assert.assertEquals("helloworld", object.toObject(String.class));
System.out.println("----first peek-----"+object.getMessage());
System.out.println("----first peek-----"+object.getMessage().getMessageBodyAsString());
......@@ -91,6 +105,76 @@ public class TestP2PProducer {
}
@Test
public void testSendStringWithKyroBase64() throws Exception{
P2PProducer producer=builder.buildP2PProducer("p2pTestQueue1");
producer.deleteQueue();
producer.createQueue();
KyroTestObject kyroTestObject =new KyroTestObject();
kyroTestObject.setId(1);
kyroTestObject.setName("hellow world1");
KryoSerialize kryoSerialize=new KryoSerialize();
byte[] serializedBytes=kryoSerialize.serialize(kyroTestObject);
// BASE64Encoder encoder=new sun.misc.BASE64Encoder();
// String encoderString=encoder.encode(serializedBytes);
String encoderString =Base64.encodeBase64String(serializedBytes);
producer.send(encoderString);
Consumer consumer=builder.buildConsumer("p2pTestQueue1");
Message msg1=consumer.popMessage();
// BASE64Decoder decoder=new sun.misc.BASE64Decoder();
// byte[] bytes=decoder.decodeBuffer(msg1.getMessageBodyAsRawBytes()());
byte[] bytes=Base64.decodeBase64(msg1.getMessageBodyAsRawBytes());
KyroTestObject deSerObject= kryoSerialize.deserialize(bytes, KyroTestObject.class);
Assert.assertEquals(kyroTestObject.getId(),deSerObject.getId());
Assert.assertEquals(kyroTestObject.getName(),deSerObject.getName());
producer.deleteQueue();
}
@Test
public void testSendObjectWithKyroBase64() throws Exception{
P2PProducer producer=builder.buildP2PProducer("p2pTestQueue1");
producer.deleteQueue();
producer.createQueue();
KyroTestObject kyroTestObject =new KyroTestObject();
kyroTestObject.setId(1);
kyroTestObject.setName("hellow world1");
producer.send(kyroTestObject);
Consumer consumer=builder.buildConsumer("p2pTestQueue1");
MessageObject msg1=consumer.popMessageObject();
// BASE64Decoder decoder=new sun.misc.BASE64Decoder();
// byte[] bytes=decoder.decodeBuffer(msg1.getMessageBodyAsRawBytes()());
KyroTestObject deSerObject= msg1.toObject(KyroTestObject.class);
Assert.assertEquals(kyroTestObject.getId(),deSerObject.getId());
Assert.assertEquals(kyroTestObject.getName(),deSerObject.getName());
producer.deleteQueue();
}
@Test
public void testTimeout() throws Exception{
P2PProducer producer=builder.buildP2PProducer("p2pTestQueue1");
producer.deleteQueue();
producer.createQueue();
Consumer consumer=builder.buildConsumer("p2pTestQueue1");
MessageObject msg1=consumer.popMessageObject(30);
Assert.assertNull(msg1.getMessage());
}
}
......