yangyoupeng

EXcludeLock的并发测试,以及transaction的并发测试

......@@ -11,6 +11,12 @@
<dependencies>
<dependency>
<groupId>com.zhaoonline</groupId>
<artifactId>alpaca-config-zookeeper</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version><!--$NO-MVN-MAN-VER$ -->
......
package com.zhaoonline.redis.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import com.zhaoonline.redis.template.RedisTemplateFactory;
@Configuration
public class ClientConfig {
@Bean
@ConfigurationProperties("alpaca.components.redis")
public Config config(){
return new Config();
}
@Bean
@Autowired
public JedisConnectionFactory jedisConnectionFactory(Config config){
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(config.getHost());
factory.setPort(config.getPort());
factory.setUsePool(true);
factory.afterPropertiesSet();
return factory;
}
@Bean
@Autowired
public RedisTemplateFactory redisTemplateFactory(JedisConnectionFactory jedisConnectionFactory){
RedisTemplateFactory templateFacotory=new RedisTemplateFactory();
templateFacotory.setConnectionFactory(jedisConnectionFactory);
return templateFacotory;
}
}
package com.zhaoonline.redis.config;
import java.lang.annotation.Annotation;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* class name:redistConfig <BR>
* @Description:搜索服务相关配置 <BR>
* Remark: <BR>
* @version 1.00 2016年10月24日
* @author zhaoonline.com
*/
@Component
@EnableAutoConfiguration
//@ConfigurationProperties("alpaca.components.redis")
public class Config {
private String host;
private int port;
public Config(){
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
......@@ -5,11 +5,15 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import com.zhaoonline.redis.template.DeSerializer;
import com.zhaoonline.redis.template.RedisTemplateFactory;
import com.zhaoonline.redis.transaction.TransactionValueOperation;
/**
* class name:ExcludeLock <BR>
......@@ -30,15 +34,16 @@ public class ExcludeLock {
private final String lockKeyPath;
private static final int ONE_SECOND = 1000;
public static final int DEFAULT_EXPIRY_TIME_MILLIS = Integer.getInteger("com.zhaoonline.redis.lock.expiry.millis",
60 * ONE_SECOND);
5 * ONE_SECOND);
public static final int DEFAULT_ACQUIRE_TIMEOUT_MILLIS = Integer
.getInteger("com.zhaoonline.redis.lock.acquiry.millis", 10 * ONE_SECOND);
.getInteger("com.zhaoonline.redis.lock.acquiry.millis", 3 * ONE_SECOND);
public static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = Integer
.getInteger("com.zhaoonline.redis.lock.acquiry.resolution.millis", 100);
.getInteger("com.zhaoonline.redis.lock.acquiry.resolution.millis", 500);
private final UUID lockUUID;
private Lock lock = null;
private static final Lock NO_LOCK = new Lock(new UUID(0l, 0l), 0l);
public ExcludeLock(RedisTemplateFactory factory, String lockName) {
this(factory, lockName, DEFAULT_ACQUIRE_TIMEOUT_MILLIS);
......@@ -55,6 +60,7 @@ public class ExcludeLock {
public ExcludeLock(RedisTemplateFactory factory, String lockName, int acquireTimeoutMillis, int expiryTimeMillis,
UUID uuid) {
redisTemplate = factory.createTemplate();
// redisTemplate.setEnableTransactionSupport(true);
deSerializer.setValueSerializer(redisTemplate.getValueSerializer());
lockExpiryInMillis = expiryTimeMillis;
acquiryTimeoutInMillis = acquireTimeoutMillis;
......@@ -66,11 +72,13 @@ public class ExcludeLock {
return lockKeyPath;
}
public boolean acquireLock() throws InterruptedException {
public boolean acquireLock() throws InterruptedException {
int timeout = acquiryTimeoutInMillis;
BoundValueOperations<String, String> valuOper = redisTemplate.boundValueOps(lockKeyPath);
while (timeout >= 0) {
final Lock newLock = asLock(System.currentTimeMillis() + lockExpiryInMillis);
BoundValueOperations<String, String> valuOper = redisTemplate.boundValueOps(lockKeyPath);
valuOper.expire(lockExpiryInMillis,TimeUnit.MILLISECONDS);
Boolean result = valuOper.setIfAbsent(newLock.toString());
if (result) {
this.lock = newLock;
......@@ -81,6 +89,7 @@ public class ExcludeLock {
final Lock currentLock = Lock.fromString(currentValueStr);
if (currentLock.isExpiredOrMine(lockUUID)) {
String oldValueStr = valuOper.getAndSet(newLock.toString());
valuOper.expire(lockExpiryInMillis,TimeUnit.MILLISECONDS);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
this.lock = newLock;
return true;
......
......@@ -4,7 +4,6 @@ import java.util.concurrent.TimeUnit;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.TimeoutUtils;
......@@ -12,12 +11,12 @@ import org.springframework.data.redis.core.TimeoutUtils;
import com.zhaoonline.redis.template.DeSerializer;
import com.zhaoonline.redis.template.RedisTemplateFactory;
import redis.clients.util.SafeEncoder;
/**
* class name:TransactionValueOperation <BR>
* class description: 所有的操作都是基于Transaction的<BR>
* Remark: <BR>
* Remark: * Description: 并发访问相同的key会出现异常错误"Incorrect number of transaction results"
* 请参考:http://redis.io/topics/transactions#Errors inside a transaction
* 异常可能出现在exec之前或者之后,出现在之前,会抛出异常,出现在之后,就会出现值错误<BR>
* @version 1.00 2016年8月22日
* @author zhaoonline)yangyoupeng
*/
......@@ -30,15 +29,15 @@ public class TransactionValueOperation<V> {
redisTemplate = factory.createTemplate();
deSerializer.setValueSerializer(redisTemplate.getValueSerializer());
deSerializer.setKeySerializer(redisTemplate.getKeySerializer());
redisTemplate.setEnableTransactionSupport(true);
}
public void setIfAbsent(String key,V value){
final byte[] rawKey = deSerializer.rawKey(key);
final byte[] rawValue = deSerializer.rawValue(value);
redisTemplate.execute(new RedisCallback<Object>() {
redisTemplate.execute(new RedisCallback<Void>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
public Void doInRedis(RedisConnection connection) throws DataAccessException {
connection.watch(rawKey);
connection.multi();
connection.setNX(rawKey, rawValue);
......@@ -52,9 +51,9 @@ public class TransactionValueOperation<V> {
public void set(String key,V value){
final byte[] rawValue = deSerializer.rawValue(value);
final byte[] rawKey = deSerializer.rawKey(key);
redisTemplate.execute(new RedisCallback<Object>() {
redisTemplate.execute(new RedisCallback<Void>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
public Void doInRedis(RedisConnection connection) throws DataAccessException {
connection.watch(rawKey);
connection.multi();
connection.set(rawKey, rawValue);
......@@ -64,12 +63,14 @@ public class TransactionValueOperation<V> {
}
});
}
public void set(String key,V value,long expireTimeount,TimeUnit unit){
final byte[] rawKey = deSerializer.rawKey(key);
final byte[] rawValue = deSerializer.rawValue(value);
redisTemplate.execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
redisTemplate.execute(new RedisCallback<Void>() {
public Void doInRedis(RedisConnection connection) throws DataAccessException {
potentiallyUsePsetEx(connection);
return null;
}
......@@ -94,8 +95,7 @@ public class TransactionValueOperation<V> {
}
return !failed;
}
}, true);
});
}
public V get(String key){
......
package com.zhaoonline.redis.application;
import java.util.concurrent.CountDownLatch;
import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import com.zhaoonline.alpaca.config.bootstrap.AlpacaApplication;
@SpringBootApplication
@ComponentScan(basePackages="com.zhaoonline.redis.config")
public class Application {
@Bean
public CountDownLatch closeLatch() {
return new CountDownLatch(1);
}
public static void main(String[] args) throws InterruptedException {
// SpringApplicationBuilder builder= new SpringApplicationBuilder().sources(Application.class).bannerMode(Banner.Mode.OFF).web(false);
// builder.registerShutdownHook(true);
ConfigurableApplicationContext context = AlpacaApplication.run(Application.class,false, args);
CountDownLatch closeLatch = context.getBean(CountDownLatch.class);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
closeLatch.countDown();
}
}));
closeLatch.await();
context.close();
System.exit(0);
}
}
package com.zhaoonline.redis.lock;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import com.zhaoonline.redis.template.RedisTemplateFactory;
import com.zhaoonline.redis.transaction.TransactionValueOperation;
public class TestThread {
private static String testLock="testLock";
RedisTemplateFactory factory=null;
JedisConnectionFactory connectionFactory=null;
@Before
public void prepare(){
connectionFactory=createSimpleFactory();
factory=new RedisTemplateFactory();
factory.setConnectionFactory(connectionFactory);
TransactionValueOperation<String> oper=new TransactionValueOperation<String>(factory);
oper.delete("lock" + ":"+testLock);
String re=oper.get("lock" + ":"+testLock);
System.out.println("----------"+re);
}
@Test
public void testRun(){
int num=500;
CountDownLatch latch=new CountDownLatch(num);
for(int i=0;i<num;i++){
RunThread t=new RunThread(latch,i,factory);
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
}
System.out.println("done");
}
static class RunThread extends Thread{
CountDownLatch latch;
RedisTemplateFactory factory;
int id;
public RunThread(CountDownLatch latch,int id,RedisTemplateFactory factory){
this.latch = latch;
this.factory= factory;
this.id=id;
}
@Override
public void run() {
ExcludeLock lock = new ExcludeLock(factory, testLock,4000,6000);
try {
boolean result=lock.acquireLock();
System.out.println("thread "+id+" acquireLock result:"+result);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("thread "+id+" run done");
latch.countDown();
// lock.release();
super.run();
}
}
private JedisConnectionFactory createSimpleFactory(){
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName("192.168.0.188");
factory.setPort(6377);
factory.setUsePool(true);
factory.afterPropertiesSet();
return factory;
}
@After
public void after(){
connectionFactory.destroy();
}
}
package com.zhaoonline.redis.transaction;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.TimeoutUtils;
public class TestTransaction2 {
@Test
......@@ -22,9 +24,10 @@ public class TestTransaction2 {
System.out.println("------1-------"+value1);
connection.watch(testKey);
connection.multi();
Long incrValue=connection.incrBy(testKey, 20);
connection.setEx(testKey, TimeoutUtils.toSeconds(100, TimeUnit.MICROSECONDS), testvalue1);
// Long incrValue=connection.incrBy(testKey, 20);
// value1 =testvalue1 ==null?null:new String(testvalue1,Charset.forName("UTF-8"));
System.out.println("------2-------"+incrValue);
// System.out.println("------2-------"+incrValue);
connection.exec();
connection.unwatch();
testvalue1=connection.get(testKey);
......
package com.zhaoonline.redis.transaction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import com.zhaoonline.redis.lock.ExcludeLock;
import com.zhaoonline.redis.template.RedisTemplateFactory;
public class TestTransactionValueOperation {
RedisTemplateFactory factory=null;
JedisConnectionFactory connectionFactory=null;
private String key ="testMT:1";
private String value ="1";
@Before
public void prepare(){
connectionFactory=createSimpleFactory();
factory=new RedisTemplateFactory();
factory.setConnectionFactory(connectionFactory);
}
@Test
public void testsetIfAbsent(){
TransactionValueOperation<String> oper=new TransactionValueOperation<String>(factory);
oper.setIfAbsent("testkey:1", "111");
String value=oper.get("testkey:1");
Assert.assertEquals("111", value);
}
@Test
public void testset(){
TransactionValueOperation<String> oper=new TransactionValueOperation<String>(factory);
oper.setIfAbsent("testkey:2", "111");
String value=oper.get("testkey:2");
Assert.assertEquals("111", value);
oper.set("testkey:2", "112");
String value1=oper.get("testkey:2");
Assert.assertEquals("112", value1);
}
@Test
public void testsetWithExpire() throws InterruptedException{
TransactionValueOperation<String> oper=new TransactionValueOperation<String>(factory);
oper.delete("testkey:2");
oper.set("testkey:2", "112",2000, TimeUnit.MILLISECONDS);
String value=oper.get("testkey:2");
Assert.assertEquals("112", value);
Thread.sleep(2000);
String value1=oper.get("testkey:2");
Assert.assertNull(value1);
oper.set("testkey:3", "112",3, TimeUnit.SECONDS);
String value3=oper.get("testkey:3");
Assert.assertEquals("112", value3);
Thread.sleep(3000);
value3=oper.get("testkey:3");
Assert.assertNull(value3);
}
/**
* Method name: testMultiThread <BR>
* Description: 并发访问相同的key会出现异常错误"Incorrect number of transaction results"
* 请参考:http://redis.io/topics/transactions#Errors inside a transaction
* <BR>
* Remark: <BR> void<BR>
*/
@Test
public void testMultiThread(){
int num=2;
CountDownLatch latch=new CountDownLatch(num);
TransactionValueOperation<String> oper=new TransactionValueOperation<String>(factory);
for(int i=0;i<num;i++){
RunThread t=new RunThread(latch,key,""+i,i,oper);
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
}
System.out.println("done");
oper.delete(key);
}
static class RunThread extends Thread{
TransactionValueOperation<String> oper=null;
CountDownLatch latch;
RedisTemplateFactory factory;
int id;
private String key;
private String value;
public RunThread(CountDownLatch latch,String key,String value,int id,TransactionValueOperation<String> oper){
this.latch = latch;
this.id=id;
this.oper=oper;
this.key=key;
this.value=value;
}
@Override
public void run() {
oper.set(key, value);
String value=oper.get(key);
System.out.println("thread "+id+" get value:"+value);
latch.countDown();
super.run();
}
}
private JedisConnectionFactory createSimpleFactory(){
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName("192.168.0.188");
factory.setPort(6377);
factory.setUsePool(true);
factory.afterPropertiesSet();
return factory;
}
@After
public void after(){
connectionFactory.destroy();
}
}
spring:
application:
name: default
profiles:
active: dev
main:
web-environment: false
flyway:
enabled: false
zhao:
alpaca:
zookeeper:
connectString: 192.168.0.205:2181,192.168.0.203:2181
\ No newline at end of file