使用Redis實現延時任務(二)
前提
前一篇文章通過Redis
的有序集合Sorted Set
和排程框架Quartz
例項一版簡單的延時任務,但是有兩個相對重要的問題沒有解決:
- 分片。
- 監控。
這篇文章的內容就是要完善這兩個方面的功能。前置文章:使用Redis實現延時任務(一)。
為什麼需要分片
這裡重新貼一下查詢指令碼dequeue.lua
的內容:
-- 參考jesque的部分Lua指令碼實現
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3 ]
local limit = ARGV[4]
-- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裡利用next做一輪迭代
local status,type = next(redis.call('TYPE',zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE',zset_key,max_score,min_score,'LIMIT',offset,limit)
if list ~= nil and #list > 0 then
-- unpack函式能把table轉化為可變引數
redis.call('ZREM',unpack(list))
local result = redis.call('HMGET',hash_key,unpack(list))
redis.call('HDEL',unpack(list))
return result
end
end
end
return nil
複製程式碼
這個指令碼一共用到了四個命令ZREVRANGEBYSCORE
ZREM
、HMGET
和HDEL
(TYPE
命令的時間複雜度可以忽略):
命令 | 時間複雜度 | 引數說明 |
---|---|---|
ZREVRANGEBYSCORE |
O(log(N)+M) |
N 是有序集合中的元素總數,M 是返回的元素的數量 |
ZREM |
O(M*log(N)) |
N 是有序集合中的元素總數,M 是成功移除的元素的數量 |
HMGET |
O(L) |
L 是成功返回的域的數量 |
HDEL |
O(L) |
L 是要刪除的域的數量 |
接下來需要結合場景和具體引數分析,假如在生產環境,有序集合的元素總量維持在10000每小時(也就是說業務量是每小時下單1萬筆),由於查詢Sorted Set
和Hash
的資料同時做了刪除,那麼30分鐘內常駐在這兩個集合中的資料有5000條,也就是上面表中的N = 5000
。假設我們初步定義查詢的LIMIT
值為100,也就是上面的M
值為100,假設Redis
中每個操作單元的耗時簡單認為是T
,那麼分析一下5000條資料處理的耗時:
序號 | 集合基數 | ZREVRANGEBYSCORE |
ZREM |
HMGET |
HDEL |
---|---|---|---|---|---|
1 | 5000 |
log(5000T) + 100T |
log(5000T) * 100 |
100T |
100T |
2 | 4900 |
log(4900T) + 100T |
log(4900T) * 100 |
100T |
100T |
3 | 4800 |
log(4800T) + 100T |
log(4800T) * 100 |
100T |
100T |
... | ... | ... | ... | ... | ... |
理論上,指令碼用到的四個命令中,ZREM
命令的耗時是最大的,而ZREVRANGEBYSCORE
和ZREM
的時間複雜度函式都是M * log(N)
,因此控制集合元素基數N
對於降低Lua
指令碼執行的耗時是有一定幫助的。
分片
上面分析了dequeue.lua
的時間複雜度,準備好的分片方案有兩個:
- 方案一:單
Redis
例項,對Sorted Set
和Hash
兩個集合的資料進行分片。 - 方案二:基於多個
Redis
例項(可以是哨兵或者叢集),實施方案一的分片操作。
為了簡單起見,後面的例子中分片的數量(shardingCount
)設計為2,生產中分片數量應該根據實際情況定製。預設使用長整型的使用者ID欄位userId
取模進行分片,假定測試資料中的userId
是均勻分佈的。
通用實體:
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
private String timestamp;
}
複製程式碼
延遲佇列介面:
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min,String max,String offset,String limit,int index);
List<OrderMessage> dequeue(int index);
String enqueueSha();
String dequeueSha();
}
複製程式碼
單Redis例項分片
單Redis
例項分片比較簡單,示意圖如下:
編寫佇列實現程式碼如下(部分引數寫死,僅供參考,切勿照搬到生產中):
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue,InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
/**
* 分片數量
*/
private static final long SHARDING_COUNT = 2L;
private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
private final JedisProvider jedisProvider;
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis()));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
List<String> keys = Lists.newArrayList();
long index = message.getUserId() % SHARDING_COUNT;
keys.add(ORDER_QUEUE_PREFIX + index);
keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(),keys,args);
}
}
@Override
public List<OrderMessage> dequeue(int index) {
// 30分鐘之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE,maxScore,OFFSET,LIMIT,index);
}
@SuppressWarnings("unchecked")
@Override
public List<OrderMessage> dequeue(String min,int index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE_PREFIX + index);
keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
try (Jedis jedis = jedisProvider.provide()) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(),args);
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e,OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha() {
return ENQUEUE_LUA_SHA.get();
}
@Override
public String dequeueSha() {
return DEQUEUE_LUA_SHA.get();
}
@Override
public void afterPropertiesSet() throws Exception {
// 載入Lua指令碼
loadLuaScript();
}
private void loadLuaScript() throws Exception {
try (Jedis jedis = jedisProvider.provide()) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.compareAndSet(null,sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.compareAndSet(null,sha);
}
}
}
複製程式碼
消費者定時任務的實現如下:
DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
private static final AtomicInteger COUNTER = new AtomicInteger();
/**
* 初始化業務執行緒池
*/
private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 這裡為了簡單起見,分片的下標暫時使用Quartz的任務執行上下文存放
int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
LOGGER.info("訂單訊息消費者定時任務開始執行,shardingIndex:[{}]...",shardingIndex);
List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
if (null != dequeue) {
final CountDownLatch latch = new CountDownLatch(1);
BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch,dequeue,shardingIndex));
try {
latch.await();
} catch (InterruptedException ignore) {
//ignore
}
}
LOGGER.info("訂單訊息消費者定時任務執行完畢,shardingIndex);
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final CountDownLatch latch;
private final List<OrderMessage> messages;
private final int shardingIndex;
@Override
public void run() {
try {
for (OrderMessage message : messages) {
LOGGER.info("shardingIndex:[{}],處理訂單訊息,內容:{}",shardingIndex,JSON.toJSONString(message));
// 模擬耗時
TimeUnit.MILLISECONDS.sleep(50);
}
} catch (Exception ignore) {
} finally {
latch.countDown();
}
}
}
}
複製程式碼
啟動定時任務和寫入測試資料的CommandLineRunner
實現如下:
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
@Override
public void run(String... args) throws Exception {
int shardingCount = 2;
// 準備測試資料
prepareOrderMessageData(shardingCount);
for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
scheduler.scheduleJob(task.getJobDetail(),task.getTrigger());
}
}
private void prepareOrderMessageData(int shardingCount) throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
try (Jedis jedis = jedisProvider.provide()) {
List<OrderMessage> messages = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(i));
message.setOrderId("ORDER_ID_" + i);
message.setUserId((long) i);
message.setTimestamp(LocalDateTime.now().format(f));
messages.add(message);
}
for (OrderMessage message : messages) {
// 30分鐘前
Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
long index = message.getUserId() % shardingCount;
jedis.hset("ORDER_DETAIL_QUEUE_" + index,message.getOrderId(),JSON.toJSONString(message));
jedis.zadd("ORDER_QUEUE_" + index,score,message.getOrderId());
}
}
}
private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
List<ConsumerTask> tasks = Lists.newArrayList();
for (int i = 0; i < shardingCount; i++) {
JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer-" + i,"DelayTask")
.usingJobData("shardingIndex",i)
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("OrderMessageConsumerTrigger-" + i,"DelayTask")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
tasks.add(new ConsumerTask(jobDetail,trigger));
}
return tasks;
}
@Getter
@RequiredArgsConstructor
private static class ConsumerTask {
private final JobDetail jobDetail;
private final Trigger trigger;
}
}
複製程式碼
啟動應用,輸出如下:
2019-08-28 00:13:20.648 INFO 50248 --- [ main] c.t.s.s.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 訂單訊息消費者定時任務開始執行,shardingIndex:[0]...
2019-08-28 00:13:20.781 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 訂單訊息消費者定時任務開始執行,shardingIndex:[1]...
2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],內容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],內容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],內容:{"amount":97,"orderId":"ORDER_ID_97","userId":97}
2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],內容:{"amount":96,"orderId":"ORDER_ID_96","userId":96}
// ... 省略大量輸出
2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 訂單訊息消費者定時任務執行完畢,shardingIndex:[0]...
2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 訂單訊息消費者定時任務執行完畢,shardingIndex:[1]...
// ... 省略大量輸出
複製程式碼
多Redis例項分片
單Redis
例項分片其實存在一個問題,就是Redis
例項總是單執行緒處理客戶端的命令,即使客戶端是多個執行緒執行Redis
命令,示意圖如下:
這種情況下,雖然通過分片降低了Lua
指令碼命令的複雜度,但是Redis
的命令處理模型(單執行緒)也有可能成為另一個效能瓶頸隱患。因此,可以考慮基於多Redis
例項進行分片。
這裡為了簡單起見,用兩個單點的Redis
例項做編碼示例。程式碼如下:
// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
private final Map<Long,JedisPool> pools = Maps.newConcurrentMap();
private JedisPool defaultPool;
@Override
public void afterPropertiesSet() throws Exception {
JedisPool pool = new JedisPool("localhost");
defaultPool = pool;
pools.put(0L,pool);
// 這個是虛擬機器器上的redis例項
pool = new JedisPool("192.168.56.200");
pools.put(1L,pool);
}
public Jedis provide(Long index) {
return pools.getOrDefault(index,defaultPool).getResource();
}
}
// 訂單訊息
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
}
// 訂單延時佇列介面
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min,long index);
List<OrderMessage> dequeue(long index);
String enqueueSha(long index);
String dequeueSha(long index);
}
// 延時佇列實現
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue,InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
private static final long SHARDING_COUNT = 2L;
private static final String ORDER_QUEUE = "ORDER_QUEUE";
private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final ConcurrentMap<Long,String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
private static final ConcurrentMap<Long,String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();
private final JedisProvider jedisProvider;
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
List<String> keys = Lists.newArrayList();
long index = message.getUserId() % SHARDING_COUNT;
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(index),args);
}
}
@Override
public List<OrderMessage> dequeue(long index) {
// 30分鐘之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE,long index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index),OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha(long index) {
return ENQUEUE_LUA_SHA.get(index);
}
@Override
public String dequeueSha(long index) {
return DEQUEUE_LUA_SHA.get(index);
}
@Override
public void afterPropertiesSet() throws Exception {
// 載入Lua指令碼
loadLuaScript();
}
private void loadLuaScript() throws Exception {
for (long i = 0; i < SHARDING_COUNT; i++) {
try (Jedis jedis = jedisProvider.provide(i)) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.put(i,sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.put(i,sha);
}
}
}
}
// 消費者
public class OrderMessageConsumer implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
private static final AtomicInteger COUNTER = new AtomicInteger();
// 初始化業務執行緒池
private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
LOGGER.info("訂單訊息消費者定時任務開始執行,shardingIndex);
List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
if (null != dequeue) {
// 這裡的倒數柵欄,線上程池資源充足的前提下可以去掉
final CountDownLatch latch = new CountDownLatch(1);
businessWorkerPool.execute(new ConsumeTask(latch,shardingIndex);
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final CountDownLatch latch;
private final List<OrderMessage> messages;
private final long shardingIndex;
@Override
public void run() {
try {
for (OrderMessage message : messages) {
LOGGER.info("shardingIndex:[{}],JSON.toJSONString(message));
// 模擬處理耗時50毫秒
TimeUnit.MILLISECONDS.sleep(50);
}
} catch (Exception ignore) {
} finally {
latch.countDown();
}
}
}
}
// 配置
@Configuration
public class QuartzConfiguration {
@Bean
public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
return new AutowiredSupportQuartzJobFactory();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("RamScheduler");
factory.setAutoStartup(true);
factory.setJobFactory(autowiredSupportQuartzJobFactory);
return factory;
}
public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
}
@Override
protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
}
// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
@Override
public void run(String... args) throws Exception {
long shardingCount = 2;
prepareData(shardingCount);
for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
scheduler.scheduleJob(task.getJobDetail(),task.getTrigger());
}
}
private void prepareData(long shardingCount) {
for (long i = 0L; i < shardingCount; i++) {
Map<String,Double> z = Maps.newHashMap();
Map<String,String> h = Maps.newHashMap();
for (int k = 0; k < 100; k++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(k));
message.setUserId((long) k);
message.setOrderId("ORDER_ID_" + k);
// 30 min ago
z.put(message.getOrderId(),Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
h.put(message.getOrderId(),JSON.toJSONString(message));
}
Jedis jedis = jedisProvider.provide(i);
jedis.hmset("ORDER_DETAIL_QUEUE",h);
jedis.zadd("ORDER_QUEUE",z);
}
}
private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
List<ConsumerTask> tasks = Lists.newArrayList();
for (long i = 0; i < shardingCount; i++) {
JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer-" + i,trigger));
}
return tasks;
}
@Getter
@RequiredArgsConstructor
private static class ConsumerTask {
private final JobDetail jobDetail;
private final Trigger trigger;
}
}
複製程式碼
新增一個啟動函式並且啟動,控制檯輸出如下:
// ...省略大量輸出
2019-09-01 14:08:27.664 INFO 13056 --- [ main] c.t.multi.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 訂單訊息消費者定時任務開始執行,shardingIndex:[1]...
2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 訂單訊息消費者定時任務開始執行,shardingIndex:[0]...
2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],"userId":99}
2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],"userId":99}
2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],"userId":98}
2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],"userId":98}
// ...省略大量輸出
2019-09-01 14:08:28.239 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 訂單訊息消費者定時任務執行完畢,shardingIndex:[1]...
2019-09-01 14:08:28.240 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 訂單訊息消費者定時任務執行完畢,shardingIndex:[0]...
// ...省略大量輸出
複製程式碼
生產中應該避免Redis
服務單點,一般常用哨兵配合樹狀主從的部署方式(參考《Redis開發與運維》),2套Redis
哨兵的部署示意圖如下:
需要什麼監控項
我們需要相對實時地知道Redis
中的延時佇列集合有多少積壓資料,每次出隊的耗時大概是多少等等監控項引數,這樣我們才能更好地知道延時佇列模組是否正常執行、是否存在效能瓶頸等等。具體的監控項,需要按需定製,這裡為了方便舉例,只做兩個監控項的監控:
- 有序集合
Sorted Set
中積壓的元素數量。 - 每次呼叫
dequeue.lua
的耗時。
採用的是應用實時上報資料的方式,依賴於spring-boot-starter-actuator
、Prometheus
、Grafana
搭建的監控體系,如果並不熟悉這個體系可以看兩篇前置文章:
監控
引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.2.0</version>
</dependency>
複製程式碼
這裡選用Gauge
的Meter
進行監控資料收集,新增監控類OrderDelayQueueMonitor
:。
// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {
private static final long SHARDING_COUNT = 2L;
private final ConcurrentMap<Long,AtomicLong> remain = Maps.newConcurrentMap();
private final ConcurrentMap<Long,AtomicLong> lua = Maps.newConcurrentMap();
private ScheduledExecutorService executor;
@Autowired
private JedisProvider jedisProvider;
@Override
public void afterPropertiesSet() throws Exception {
executor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r,"OrderDelayQueueMonitor");
thread.setDaemon(true);
return thread;
});
for (long i = 0L; i < SHARDING_COUNT; i++) {
AtomicLong l = new AtomicLong();
Metrics.gauge("order.delay.queue.lua.cost",Collections.singleton(Tag.of("index",String.valueOf(i))),l,AtomicLong::get);
lua.put(i,l);
AtomicLong r = new AtomicLong();
Metrics.gauge("order.delay.queue.remain",r,AtomicLong::get);
remain.put(i,r);
}
// 每五秒上報一次集合中的剩餘資料
executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider),0,5,TimeUnit.SECONDS);
}
public void recordRemain(Long index,long count) {
remain.get(index).set(count);
}
public void recordLuaCost(Long index,long count) {
lua.get(index).set(count);
}
@RequiredArgsConstructor
private class MonitorTask implements Runnable {
private final JedisProvider jedisProvider;
@Override
public void run() {
for (long i = 0L; i < SHARDING_COUNT; i++) {
try (Jedis jedis = jedisProvider.provide(i)) {
recordRemain(i,jedis.zcount("ORDER_QUEUE","-inf","+inf"));
}
}
}
}
}
複製程式碼
原來的RedisOrderDelayQueue#dequeue()
進行改造:
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue,InitializingBean {
// ... 省略沒有改動的程式碼
private final OrderDelayQueueMonitor orderDelayQueueMonitor;
// ... 省略沒有改動的程式碼
@Override
public List<OrderMessage> dequeue(String min,long index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
long start = System.nanoTime();
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index),args);
long end = System.nanoTime();
// 新增dequeue的耗時監控-單位微秒
orderDelayQueueMonitor.recordLuaCost(index,TimeUnit.NANOSECONDS.toMicros(end - start));
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e,OrderMessage.class));
}
}
}
return result;
}
// ... 省略沒有改動的程式碼
}
複製程式碼
其他配置這裡簡單說一下。
application.yaml
要開放prometheus
端點的訪問許可權:
server:
port: 9091
management:
endpoints:
web:
exposure:
include: 'prometheus'
複製程式碼
Prometheus
服務配置儘量減少查詢的間隔時間,暫定為5秒:
# my global config
global:
scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
metrics_path: '/actuator/prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9091']
複製程式碼
Grafana
的基本配置項如下:
出隊耗時 order_delay_queue_lua_cost 分片編號-{{index}}
訂單延時佇列積壓量 order_delay_queue_remain 分片編號-{{index}}
複製程式碼
最終可以在Grafana
配置每5秒重新整理,見效果如下:
這裡的監控項更多時候應該按需定製,說實話,監控的工作往往是最複雜和繁瑣的。
小結
全文相對詳細地介紹了基於Redis
實現延時任務的分片和監控的具體實施過程,核心程式碼僅供參考,還有一些具體的細節例如Prometheus
、Grafana
的一些應用,這裡限於篇幅不會詳細地展開。說實話,基於實際場景做一次中介軟體和架構的選型並不是一件簡單的事,而且往往初期的實施並不是最大的難點,更大的難題在後面的優化以及監控。
附件
-
Markdown
原件:github.com/zjcscut/blo… - Github Page:www.throwable.club/2019/09/01/…
- Coding Page:throwable.coding.me/2019/09/01/…
(本文完 c-3-d 20190901 身體不適,拖了一下)