JobDispatcher.java
4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*******************************************************************************
* Copyright (c) 2005, 2014 springside.github.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
*******************************************************************************/
package com.cjs.cms.util.redis.scheduler;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.cjs.cms.util.redis.JedisScriptExecutor;
import com.cjs.cms.util.redis.Threads;
import com.cjs.cms.util.redis.Threads.WrapExceptionRunnable;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;
/**
* 定时分发任务的线程,定时从scheduled job sorted set 中取出到期的任务放入ready job list,并在高可靠模式下,将lock job 中 已超时的任务重新放入 ready job.
* 线程池可自行创建,也可以从外部传入共用。
*
* @author calvin
*/
public class JobDispatcher implements Runnable {
public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua";
public static final long DEFAULT_INTERVAL_MILLIS = 1000;
public static final boolean DEFAULT_RELIABLE = false;
public static final long DEFAULT_JOB_TIMEOUT_SECONDS = 60;
private ScheduledExecutorService internalScheduledThreadPool;
private ScheduledFuture dispatchJob;
private long intervalMillis = DEFAULT_INTERVAL_MILLIS;
private boolean reliable = DEFAULT_RELIABLE;
private long jobTimeoutSecs = DEFAULT_JOB_TIMEOUT_SECONDS;
private JedisScriptExecutor scriptExecutor;
private String scriptPath = DEFAULT_DISPATCH_LUA_FILE;
private String jobName;
private List<String> keys;
public JobDispatcher(String jobName, Pool<Jedis> jedisPool) {
this.jobName = jobName;
String scheduledJobKey = Keys.getScheduledJobKey(jobName);
String readyJobKey = Keys.getReadyJobKey(jobName);
String dispatchCounterKey = Keys.getDispatchCounterKey(jobName);
String lockJobKey = Keys.getLockJobKey(jobName);
String retryCounterKey = Keys.getRetryCounterKey(jobName);
keys = Arrays.asList(scheduledJobKey, readyJobKey, dispatchCounterKey, lockJobKey,
retryCounterKey);
scriptExecutor = new JedisScriptExecutor(jedisPool);
}
/**
* 启动分发线程, 自行创建scheduler线程池.
*/
public void start() {
internalScheduledThreadPool = Executors.newScheduledThreadPool(1,
Executors.defaultThreadFactory());
start(internalScheduledThreadPool);
}
/**
* 启动分发线程, 使用传入的scheduler线程池.
*/
public void start(ScheduledExecutorService scheduledThreadPool) {
scriptExecutor.loadFromFile(scriptPath);
dispatchJob = scheduledThreadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0,
intervalMillis, TimeUnit.MILLISECONDS);
}
/**
* 停止分发任务,如果是自行创建的threadPool则自行销毁,关闭时最多等待5秒。
*/
public void stop() {
dispatchJob.cancel(false);
if (internalScheduledThreadPool != null) {
Threads.normalShutdown(internalScheduledThreadPool, 5, TimeUnit.SECONDS);
}
}
/**
* 以当前时间为参数执行Lua Script分发任务。
*/
@Override
public void run() {
long currTime = System.currentTimeMillis();
List<String> args = Arrays.asList(String.valueOf(currTime), String.valueOf(reliable),
String.valueOf(jobTimeoutSecs));
scriptExecutor.execute(keys, args);
}
/**
* 设置非默认的script path, 格式为spring的Resource路径风格。
*/
public void setScriptPath(String scriptPath) {
this.scriptPath = scriptPath;
}
/**
* 设置非默认1秒的分发间隔.
*/
public void setIntervalMillis(long intervalMillis) {
this.intervalMillis = intervalMillis;
}
/**
* 设置是否支持高可靠性.
*/
public void setReliable(boolean reliable) {
this.reliable = reliable;
}
/**
* 设置高可靠性模式下,非默认1分钟的任务执行超时时间。
*/
public void setJobTimeoutSecs(long jobTimeoutSecs) {
this.jobTimeoutSecs = jobTimeoutSecs;
}
}