traitTimerTaskextendsRunnable{ // 定时任务超时时间 val delayMs: Long// timestamp in millisecond // 关联TimerTaskEntry private[this] var timerTaskEntry: TimerTaskEntry = null // 取消定时任务 defcancel(): Unit = { synchronized { if (timerTaskEntry != null) timerTaskEntry.remove() timerTaskEntry = null } } // 关联TimerTaskEntry private[timer] defsetTimerTaskEntry(entry: TimerTaskEntry): Unit = { synchronized { // if this timerTask is already held by an existing timer task entry, // we will remove such an entry first. if (timerTaskEntry != null && timerTaskEntry != entry) timerTaskEntry.remove()
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
// 当前 bucket 对应的链表 @volatile var list: TimerTaskList = null // 后指针 var next: TimerTaskEntry = null // 前指针 var prev: TimerTaskEntry = null
// if this timerTask is already held by an existing timer task entry, // setTimerTaskEntry will remove it. if (timerTask != null) timerTask.setTimerTaskEntry(this)
// 从链表中移除 def remove(): Unit = { var currentList = list // If remove is called when another thread is moving the entry from a task entry list to another, // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. while (currentList != null) { currentList.remove(this) currentList = list } }
// TimerTaskList forms a doubly linked cyclic list using a dummy root entry // root.next points to the head // root.prev points to the tail // dummyNode ,简化边界条件 private[this] val root = newTimerTaskEntry(null, -1) root.next = root root.prev = root
// TimerTaskList 的过期时间 private[this] val expiration = newAtomicLong(-1L) // Set the bucket's expiration time // Returns true if the expiration time is changed // 设置过期时间 defsetExpiration(expirationMs: Long): Boolean = { expiration.getAndSet(expirationMs) != expirationMs }
// Get the bucket's expiration time defgetExpiration(): Long = { expiration.get() }
@nonthreadsafe private[timer] classTimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { // 时间轮的总时间跨度 private[this] val interval = tickMs * wheelSize // 时间格 private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => newTimerTaskList(taskCounter) } // 当前时间,取小于当前时间的,最大基本时间跨度的整数倍 private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
// 上层时间轮 // overflowWheel can potentially be updated and read by two concurrent threads through add(). // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM @volatileprivate[this] var overflowWheel: TimingWheel = null
// 添加延时任务 defadd(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs // 任务是否取消 if (timerTaskEntry.cancelled) { // Cancelled false } elseif (expiration < currentTime + tickMs) { // 任务过期 // Already expired false } elseif (expiration < currentTime + interval) { // Put in its own bucket // 添加任务到当前时间轮 val virtualId = expiration / tickMs // 定位存放的 bucket val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)
// 设置过期时间,此时的过期时间也设置为 tickMs 时间格的整数倍 // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. // 添加到 DelayQueue queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() // 添加任务到上层时间轮 overflowWheel.add(timerTaskEntry) } }
/** * A background reaper to expire delayed operations that have timed out */ privateclassExpiredOperationReaperextendsShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) {
overridedefdoWork(): Unit = { advanceClock(200L) } }