App下載

什么是時(shí)間輪? 手寫Java實(shí)現(xiàn)時(shí)間輪算法

猿友 2021-07-15 15:11:19 瀏覽數(shù) (7230)
反饋

考慮這樣的一個(gè)場(chǎng)景,當(dāng)前你有1000個(gè)任務(wù),要讓這1000個(gè)任務(wù)每隔幾分鐘觸發(fā)某個(gè)操作。要是實(shí)現(xiàn)這樣的需求,很多人第一想法就是弄一個(gè)定時(shí)器。但是1000個(gè)任務(wù)就是1000個(gè)定時(shí)器,一個(gè)定時(shí)器是一個(gè)線程。為了解決這個(gè)問題,就出現(xiàn)了時(shí)間輪算法。本篇文章將為您講述什么是時(shí)間輪算法,以及在Java中怎么用代碼實(shí)現(xiàn)時(shí)間輪算法。

時(shí)間輪

時(shí)間輪簡介:時(shí)間輪方案將現(xiàn)實(shí)生活中的時(shí)鐘概念引入到軟件設(shè)計(jì)中,主要思路是定義一個(gè)時(shí)鐘周期(比如時(shí)鐘的12小時(shí))和步長(比如時(shí)鐘的一秒走一次),當(dāng)指針每走一步的時(shí)候,會(huì)獲取當(dāng)前時(shí)鐘刻度上掛載的任務(wù)并執(zhí)行。

核心思想

  • 一個(gè)環(huán)形數(shù)組存儲(chǔ)時(shí)間輪的所有槽(看你的手表),每個(gè)槽對(duì)應(yīng)當(dāng)前時(shí)間輪的最小精度
  • 超過當(dāng)前時(shí)間輪最大表示范圍的會(huì)被丟到上層時(shí)間輪,上層時(shí)間輪的最小精度即為下層時(shí)間輪能表達(dá)的最大時(shí)間(時(shí)分秒概念)
  • 每個(gè)槽對(duì)應(yīng)一個(gè)環(huán)形鏈表存儲(chǔ)該時(shí)間應(yīng)該被執(zhí)行的任務(wù)
  • 需要一個(gè)線程去驅(qū)動(dòng)指針運(yùn)轉(zhuǎn),獲取到期任務(wù)

以下給出java 簡單手寫版本實(shí)現(xiàn)

代碼實(shí)現(xiàn)

時(shí)間輪主數(shù)據(jù)結(jié)構(gòu)

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:31
 */
@Slf4j
public class TimeWheel {
 /**
  * 一個(gè)槽的時(shí)間間隔(時(shí)間輪最小刻度)
  */
 private long tickMs;

 /**
  * 時(shí)間輪大小(槽的個(gè)數(shù))
  */
 private int wheelSize;

 /**
  * 一輪的時(shí)間跨度
  */
 private long interval;

 private long currentTime;

 /**
  * 槽
  */
 private TimerTaskList[] buckets;

 /**
  * 上層時(shí)間輪
  */
 private volatile TimeWheel overflowWheel;

 /**
  * 一個(gè)timer只有一個(gè)delayqueue
  */
 private DelayQueue<TimerTaskList> delayQueue;

 public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
  this.currentTime = currentTime;
  this.tickMs = tickMs;
  this.wheelSize = wheelSize;
  this.interval = tickMs * wheelSize;
  this.buckets = new TimerTaskList[wheelSize];
  this.currentTime = currentTime - (currentTime % tickMs);
  this.delayQueue = delayQueue;
  for (int i = 0; i < wheelSize; i++) {
   buckets[i] = new TimerTaskList();
  }
 }

 public boolean add(TimerTaskEntry entry) {
  long expiration = entry.getExpireMs();
  if (expiration < tickMs + currentTime) {
   //到期了
   return false;
  } else if (expiration < currentTime + interval) {
   //扔進(jìn)當(dāng)前時(shí)間輪的某個(gè)槽里,只有時(shí)間大于某個(gè)槽,才會(huì)放進(jìn)去
   long virtualId = (expiration / tickMs);
   int index = (int) (virtualId % wheelSize);
   TimerTaskList bucket = buckets[index];
   bucket.addTask(entry);
   //設(shè)置bucket 過期時(shí)間
   if (bucket.setExpiration(virtualId * tickMs)) {
    //設(shè)好過期時(shí)間的bucket需要入隊(duì)
    delayQueue.offer(bucket);
    return true;
   }
  } else {
   //當(dāng)前輪不能滿足,需要扔到上一輪
   TimeWheel timeWheel = getOverflowWheel();
   return timeWheel.add(entry);
  }
  return false;
 }


 private TimeWheel getOverflowWheel() {
  if (overflowWheel == null) {
   synchronized (this) {
    if (overflowWheel == null) {
     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
    }
   }
  }
  return overflowWheel;
 }

 /**
  * 推進(jìn)指針
  *
  * @param timestamp
  */
 public void advanceLock(long timestamp) {
  if (timestamp > currentTime + tickMs) {
   currentTime = timestamp - (timestamp % tickMs);
   if (overflowWheel != null) {
    this.getOverflowWheel().advanceLock(timestamp);
   }
  }
 }
}

定時(shí)器接口

/**
 * 定時(shí)器
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:30
 */
public interface Timer {

 /**
  * 添加一個(gè)新任務(wù)
  *
  * @param timerTask
  */
 void add(TimerTask timerTask);


 /**
  * 推動(dòng)指針
  *
  * @param timeout
  */
 void advanceClock(long timeout);

 /**
  * 等待執(zhí)行的任務(wù)
  *
  * @return
  */
 int size();

 /**
  * 關(guān)閉服務(wù),剩下的無法被執(zhí)行
  */
 void shutdown();
}

定時(shí)器實(shí)現(xiàn)

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:33
 */
@Slf4j
public class SystemTimer implements Timer {
 /**
  * 底層時(shí)間輪
  */
 private TimeWheel timeWheel;
 /**
  * 一個(gè)Timer只有一個(gè)延時(shí)隊(duì)列
  */
 private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
 /**
  * 過期任務(wù)執(zhí)行線程
  */
 private ExecutorService workerThreadPool;
 /**
  * 輪詢delayQueue獲取過期任務(wù)線程
  */
 private ExecutorService bossThreadPool;


 public SystemTimer() {
  this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
  this.workerThreadPool = Executors.newFixedThreadPool(100);
  this.bossThreadPool = Executors.newFixedThreadPool(1);
  //20ms推動(dòng)一次時(shí)間輪運(yùn)轉(zhuǎn)
  this.bossThreadPool.submit(() -> {
   for (; ; ) {
    this.advanceClock(20);
   }
  });
 }


 public void addTimerTaskEntry(TimerTaskEntry entry) {
  if (!timeWheel.add(entry)) {
   //已經(jīng)過期了
   TimerTask timerTask = entry.getTimerTask();
   log.info("=====任務(wù):{} 已到期,準(zhǔn)備執(zhí)行============",timerTask.getDesc());
   workerThreadPool.submit(timerTask);
  }
 }

 @Override
 public void add(TimerTask timerTask) {
  log.info("=======添加任務(wù)開始====task:{}", timerTask.getDesc());
  TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
  timerTask.setTimerTaskEntry(entry);
  addTimerTaskEntry(entry);
 }

 /**
  * 推動(dòng)指針運(yùn)轉(zhuǎn)獲取過期任務(wù)
  *
  * @param timeout 時(shí)間間隔
  * @return
  */
 @Override
 public synchronized void advanceClock(long timeout) {
  try {
   TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
   if (bucket != null) {
    //推進(jìn)時(shí)間
    timeWheel.advanceLock(bucket.getExpiration());
    //執(zhí)行過期任務(wù)(包含降級(jí))
    bucket.clear(this::addTimerTaskEntry);
   }
  } catch (InterruptedException e) {
   log.error("advanceClock error");
  }
 }

 @Override
 public int size() {
  //todo
  return 0;
 }

 @Override
 public void shutdown() {
  this.bossThreadPool.shutdown();
  this.workerThreadPool.shutdown();
  this.timeWheel = null;
 }
}

存儲(chǔ)任務(wù)的環(huán)形鏈表

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
@Slf4j
class TimerTaskList implements Delayed {
 /**
  * TimerTaskList 環(huán)形鏈表使用一個(gè)虛擬根節(jié)點(diǎn)root
  */
 private TimerTaskEntry root = new TimerTaskEntry(null, -1);

 {
  root.next = root;
  root.prev = root;
 }

 /**
  * bucket的過期時(shí)間
  */
 private AtomicLong expiration = new AtomicLong(-1L);

 public long getExpiration() {
  return expiration.get();
 }

 /**
  * 設(shè)置bucket的過期時(shí)間,設(shè)置成功返回true
  *
  * @param expirationMs
  * @return
  */
 boolean setExpiration(long expirationMs) {
  return expiration.getAndSet(expirationMs) != expirationMs;
 }

 public boolean addTask(TimerTaskEntry entry) {
  boolean done = false;
  while (!done) {
   //如果TimerTaskEntry已經(jīng)在別的list中就先移除,同步代碼塊外面移除,避免死鎖,一直到成功為止
   entry.remove();
   synchronized (this) {
    if (entry.timedTaskList == null) {
     //加到鏈表的末尾
     entry.timedTaskList = this;
     TimerTaskEntry tail = root.prev;
     entry.prev = tail;
     entry.next = root;
     tail.next = entry;
     root.prev = entry;
     done = true;
    }
   }
  }
  return true;
 }

 /**
  * 從 TimedTaskList 移除指定的 timerTaskEntry
  *
  * @param entry
  */
 public void remove(TimerTaskEntry entry) {
  synchronized (this) {
   if (entry.getTimedTaskList().equals(this)) {
    entry.next.prev = entry.prev;
    entry.prev.next = entry.next;
    entry.next = null;
    entry.prev = null;
    entry.timedTaskList = null;
   }
  }
 }

 /**
  * 移除所有
  */
 public synchronized void clear(Consumer<TimerTaskEntry> entry) {
  TimerTaskEntry head = root.next;
  while (!head.equals(root)) {
   remove(head);
   entry.accept(head);
   head = root.next;
  }
  expiration.set(-1L);
 }

 @Override
 public long getDelay(TimeUnit unit) {
  return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
 }

 @Override
 public int compareTo(Delayed o) {
  if (o instanceof TimerTaskList) {
   return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
  }
  return 0;
 }
}

存儲(chǔ)任務(wù)的容器entry


/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
 private TimerTask timerTask;
 private long expireMs;
 volatile TimerTaskList timedTaskList;
 TimerTaskEntry next;
 TimerTaskEntry prev;

 public TimerTaskEntry(TimerTask timedTask, long expireMs) {
  this.timerTask = timedTask;
  this.expireMs = expireMs;
  this.next = null;
  this.prev = null;
 }

 void remove() {
  TimerTaskList currentList = timedTaskList;
  while (currentList != null) {
   currentList.remove(this);
   currentList = timedTaskList;
  }
 }

 @Override
 public int compareTo(TimerTaskEntry o) {
  return ((int) (this.expireMs - o.expireMs));
 }
}

任務(wù)包裝類(這里也可以將工作任務(wù)以線程變量的方式去傳入)

@Data
@Slf4j
class TimerTask implements Runnable {
 /**
  * 延時(shí)時(shí)間
  */
 private long delayMs;
 /**
  * 任務(wù)所在的entry
  */
 private TimerTaskEntry timerTaskEntry;

 private String desc;

 public TimerTask(String desc, long delayMs) {
  this.desc = desc;
  this.delayMs = delayMs;
  this.timerTaskEntry = null;
 }

 public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
  // 如果這個(gè)timetask已經(jīng)被一個(gè)已存在的TimerTaskEntry持有,先移除一個(gè)
  if (timerTaskEntry != null && timerTaskEntry != entry) {
   timerTaskEntry.remove();
  }
  timerTaskEntry = entry;
 }

 public TimerTaskEntry getTimerTaskEntry() {
  return timerTaskEntry;
 }

 @Override
 public void run() {
  log.info("============={}任務(wù)執(zhí)行", desc);
 }
}

以上就是關(guān)于手寫Java實(shí)現(xiàn)時(shí)間輪算法的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持W3Cschool。


0 人點(diǎn)贊