SimpleJobConsumer.java
2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*******************************************************************************
* Copyright (c) 2005, 2014 springside.github.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
*******************************************************************************/
package com.cjs.cms.util.redis.scheduler;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cjs.cms.util.redis.JedisTemplate;
import com.cjs.cms.util.redis.JedisTemplate.JedisAction;
import com.cjs.cms.util.redis.Threads;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Pool;
/*
* 简单的基于brpop()API, 阻塞的取出任务。
* brpop的阻塞,在线程中断时不会自动退出,所以还是设置有限timeout时间,另外在线程池退出时已比timeout时间长的时间调用awaitTermination()等待线程结束.
*/
public class SimpleJobConsumer {
public static final int DEFAULT_POPUP_TIMEOUT_SECONDS = 5;
public static final int DEFAULT_CONNECTION_RETRY_MILLS = 5000;
private static Logger logger = LoggerFactory
.getLogger(SimpleJobConsumer.class);
private JedisTemplate jedisTemplate;
private String readyJobKey;
private int popupTimeoutSecs = DEFAULT_POPUP_TIMEOUT_SECONDS;
public SimpleJobConsumer(String jobName, Pool<Jedis> jedisPool) {
jedisTemplate = new JedisTemplate(jedisPool);
readyJobKey = Keys.getReadyJobKey(jobName);
}
/**
* 阻塞直到返回任务,如果popupTimeoutSecs内(默认5秒)无任务到达,返回null.
* 如发生redis连接异常, 线程会sleep 5秒后返回null,
*/
public String popupJob() {
List<String> nameValuePair = null;
try {
nameValuePair = jedisTemplate.execute(new JedisAction<List<String>>() {
@Override
public List<String> action(Jedis jedis) {
return jedis.brpop(popupTimeoutSecs, readyJobKey);
}
});
} catch (JedisConnectionException e) {
Threads.sleep(DEFAULT_CONNECTION_RETRY_MILLS);
}
if ((nameValuePair != null) && !nameValuePair.isEmpty()) {
return nameValuePair.get(1);
} else {
return null;
}
}
public void setPopupTimeoutSecs(int popupTimeoutSecs) {
this.popupTimeoutSecs = popupTimeoutSecs;
}
}