考慮這樣的一個(gè)場(chǎng)景,當(dāng)前你有1000個(gè)任務(wù),要讓這1000個(gè)任務(wù)每隔幾分鐘觸發(fā)某個(gè)操作。要是實(shí)現(xiàn)這樣的需求,很多人第一想法就是弄一個(gè)定時(shí)器。但是1000個(gè)任務(wù)就是1000個(gè)定時(shí)器,一個(gè)定時(shí)器是一個(gè)線程。為了解決這個(gè)問題,就出現(xiàn)了時(shí)間輪算法。本篇文章將為您講述什么是時(shí)間輪算法,以及在Java中怎么用代碼實(shí)現(xiàn)時(shí)間輪算法。
時(shí)間輪
時(shí)間輪簡(jiǎn)介:時(shí)間輪方案將現(xiàn)實(shí)生活中的時(shí)鐘概念引入到軟件設(shè)計(jì)中,主要思路是定義一個(gè)時(shí)鐘周期(比如時(shí)鐘的12小時(shí))和步長(zhǎng)(比如時(shí)鐘的一秒走一次),當(dāng)指針每走一步的時(shí)候,會(huì)獲取當(dāng)前時(shí)鐘刻度上掛載的任務(wù)并執(zhí)行。
核心思想
- 一個(gè)環(huán)形數(shù)組存儲(chǔ)時(shí)間輪的所有槽(看你的手表),每個(gè)槽對(duì)應(yīng)當(dāng)前時(shí)間輪的最小精度
- 超過當(dāng)前時(shí)間輪最大表示范圍的會(huì)被丟到上層時(shí)間輪,上層時(shí)間輪的最小精度即為下層時(shí)間輪能表達(dá)的最大時(shí)間(時(shí)分秒概念)
- 每個(gè)槽對(duì)應(yīng)一個(gè)環(huán)形鏈表存儲(chǔ)該時(shí)間應(yīng)該被執(zhí)行的任務(wù)
- 需要一個(gè)線程去驅(qū)動(dòng)指針運(yùn)轉(zhuǎn),獲取到期任務(wù)
以下給出java 簡(jiǎn)單手寫版本實(shí)現(xiàn)
代碼實(shí)現(xiàn)
時(shí)間輪主數(shù)據(jù)結(jié)構(gòu)
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:31
*/
@Slf4j
public class TimeWheel {
/**
* 一個(gè)槽的時(shí)間間隔(時(shí)間輪最小刻度)
*/
private long tickMs;
/**
* 時(shí)間輪大小(槽的個(gè)數(shù))
*/
private int wheelSize;
/**
* 一輪的時(shí)間跨度
*/
private long interval;
private long currentTime;
/**
* 槽
*/
private TimerTaskList[] buckets;
/**
* 上層時(shí)間輪
*/
private volatile TimeWheel overflowWheel;
/**
* 一個(gè)timer只有一個(gè)delayqueue
*/
private DelayQueue<TimerTaskList> delayQueue;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new TimerTaskList[wheelSize];
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new TimerTaskList();
}
}
public boolean add(TimerTaskEntry entry) {
long expiration = entry.getExpireMs();
if (expiration < tickMs + currentTime) {
//到期了
return false;
} else if (expiration < currentTime + interval) {
//扔進(jìn)當(dāng)前時(shí)間輪的某個(gè)槽里,只有時(shí)間大于某個(gè)槽,才會(huì)放進(jìn)去
long virtualId = (expiration / tickMs);
int index = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets[index];
bucket.addTask(entry);
//設(shè)置bucket 過期時(shí)間
if (bucket.setExpiration(virtualId * tickMs)) {
//設(shè)好過期時(shí)間的bucket需要入隊(duì)
delayQueue.offer(bucket);
return true;
}
} else {
//當(dāng)前輪不能滿足,需要扔到上一輪
TimeWheel timeWheel = getOverflowWheel();
return timeWheel.add(entry);
}
return false;
}
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
/**
* 推進(jìn)指針
*
* @param timestamp
*/
public void advanceLock(long timestamp) {
if (timestamp > currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
this.getOverflowWheel().advanceLock(timestamp);
}
}
}
}
定時(shí)器接口
/** * 定時(shí)器 * @author apdoer * @version 1.0 * @date 2021/3/22 20:30 */ public interface Timer { /** * 添加一個(gè)新任務(wù) * * @param timerTask */ void add(TimerTask timerTask); /** * 推動(dòng)指針 * * @param timeout */ void advanceClock(long timeout); /** * 等待執(zhí)行的任務(wù) * * @return */ int size(); /** * 關(guān)閉服務(wù),剩下的無法被執(zhí)行 */ void shutdown(); }
定時(shí)器實(shí)現(xiàn)
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 20:33
*/
@Slf4j
public class SystemTimer implements Timer {
/**
* 底層時(shí)間輪
*/
private TimeWheel timeWheel;
/**
* 一個(gè)Timer只有一個(gè)延時(shí)隊(duì)列
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 過期任務(wù)執(zhí)行線程
*/
private ExecutorService workerThreadPool;
/**
* 輪詢delayQueue獲取過期任務(wù)線程
*/
private ExecutorService bossThreadPool;
public SystemTimer() {
this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
this.workerThreadPool = Executors.newFixedThreadPool(100);
this.bossThreadPool = Executors.newFixedThreadPool(1);
//20ms推動(dòng)一次時(shí)間輪運(yùn)轉(zhuǎn)
this.bossThreadPool.submit(() -> {
for (; ; ) {
this.advanceClock(20);
}
});
}
public void addTimerTaskEntry(TimerTaskEntry entry) {
if (!timeWheel.add(entry)) {
//已經(jīng)過期了
TimerTask timerTask = entry.getTimerTask();
log.info("=====任務(wù):{} 已到期,準(zhǔn)備執(zhí)行============",timerTask.getDesc());
workerThreadPool.submit(timerTask);
}
}
@Override
public void add(TimerTask timerTask) {
log.info("=======添加任務(wù)開始====task:{}", timerTask.getDesc());
TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
timerTask.setTimerTaskEntry(entry);
addTimerTaskEntry(entry);
}
/**
* 推動(dòng)指針運(yùn)轉(zhuǎn)獲取過期任務(wù)
*
* @param timeout 時(shí)間間隔
* @return
*/
@Override
public synchronized void advanceClock(long timeout) {
try {
TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (bucket != null) {
//推進(jìn)時(shí)間
timeWheel.advanceLock(bucket.getExpiration());
//執(zhí)行過期任務(wù)(包含降級(jí))
bucket.clear(this::addTimerTaskEntry);
}
} catch (InterruptedException e) {
log.error("advanceClock error");
}
}
@Override
public int size() {
//todo
return 0;
}
@Override
public void shutdown() {
this.bossThreadPool.shutdown();
this.workerThreadPool.shutdown();
this.timeWheel = null;
}
}
存儲(chǔ)任務(wù)的環(huán)形鏈表
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
@Slf4j
class TimerTaskList implements Delayed {
/**
* TimerTaskList 環(huán)形鏈表使用一個(gè)虛擬根節(jié)點(diǎn)root
*/
private TimerTaskEntry root = new TimerTaskEntry(null, -1);
{
root.next = root;
root.prev = root;
}
/**
* bucket的過期時(shí)間
*/
private AtomicLong expiration = new AtomicLong(-1L);
public long getExpiration() {
return expiration.get();
}
/**
* 設(shè)置bucket的過期時(shí)間,設(shè)置成功返回true
*
* @param expirationMs
* @return
*/
boolean setExpiration(long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public boolean addTask(TimerTaskEntry entry) {
boolean done = false;
while (!done) {
//如果TimerTaskEntry已經(jīng)在別的list中就先移除,同步代碼塊外面移除,避免死鎖,一直到成功為止
entry.remove();
synchronized (this) {
if (entry.timedTaskList == null) {
//加到鏈表的末尾
entry.timedTaskList = this;
TimerTaskEntry tail = root.prev;
entry.prev = tail;
entry.next = root;
tail.next = entry;
root.prev = entry;
done = true;
}
}
}
return true;
}
/**
* 從 TimedTaskList 移除指定的 timerTaskEntry
*
* @param entry
*/
public void remove(TimerTaskEntry entry) {
synchronized (this) {
if (entry.getTimedTaskList().equals(this)) {
entry.next.prev = entry.prev;
entry.prev.next = entry.next;
entry.next = null;
entry.prev = null;
entry.timedTaskList = null;
}
}
}
/**
* 移除所有
*/
public synchronized void clear(Consumer<TimerTaskEntry> entry) {
TimerTaskEntry head = root.next;
while (!head.equals(root)) {
remove(head);
entry.accept(head);
head = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
}
存儲(chǔ)任務(wù)的容器entry
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
private TimerTask timerTask;
private long expireMs;
volatile TimerTaskList timedTaskList;
TimerTaskEntry next;
TimerTaskEntry prev;
public TimerTaskEntry(TimerTask timedTask, long expireMs) {
this.timerTask = timedTask;
this.expireMs = expireMs;
this.next = null;
this.prev = null;
}
void remove() {
TimerTaskList currentList = timedTaskList;
while (currentList != null) {
currentList.remove(this);
currentList = timedTaskList;
}
}
@Override
public int compareTo(TimerTaskEntry o) {
return ((int) (this.expireMs - o.expireMs));
}
}
任務(wù)包裝類(這里也可以將工作任務(wù)以線程變量的方式去傳入)
@Data
@Slf4j
class TimerTask implements Runnable {
/**
* 延時(shí)時(shí)間
*/
private long delayMs;
/**
* 任務(wù)所在的entry
*/
private TimerTaskEntry timerTaskEntry;
private String desc;
public TimerTask(String desc, long delayMs) {
this.desc = desc;
this.delayMs = delayMs;
this.timerTaskEntry = null;
}
public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
// 如果這個(gè)timetask已經(jīng)被一個(gè)已存在的TimerTaskEntry持有,先移除一個(gè)
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
public TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
@Override
public void run() {
log.info("============={}任務(wù)執(zhí)行", desc);
}
}
以上就是關(guān)于手寫Java實(shí)現(xiàn)時(shí)間輪算法的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持W3Cschool。