TaskRunner概况.
- TaskRunner 是线程创建和执行的,是执行Task的执行者。
TaskRunner的创建和获取
companion object {
@JvmField
val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true)))
val logger: Logger = Logger.getLogger(TaskRunner::class.java.name)
}
}
TaskRunner的成员变量
private var nextQueueName = 10000
private var coordinatorWaiting = false
private var coordinatorWakeUpAt = 0L
/** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
/** 存 正在执行的TaskQueue **/
private val busyQueues = mutableListOf<TaskQueue>()
/** Queues not in [busyQueues] that have non-empty [TaskQueue.futureTasks]. */
/** 存 准备执行的TaskQueue **/
private val readyQueues = mutableListOf<TaskQueue>()
private val runnable: Runnable = object : Runnable { //用来执行Task
override fun run() {
while (true) {
//1.readyQueue为空-> 结束循环,线程执行结束
//2.未找到task, 当前是非协同者线程 ->唤醒“协同者线程” ,结束循环,线程执行结束 ;
//3.未找到task, 当前无协同者线程 -> 成为协同者线程,休眠,下次唤醒或休眠结束再找;
//4.找到了task -> 执行它
val task = synchronized(this@TaskRunner) {
awaitTaskToRun() //如果有可以执行的Task
} ?: return
logElapsed(task, task.queue!!) {
var completedNormally = false
try {
runTask(task)
completedNormally = true
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) { //有异常,开另个线程执行
backend.execute(this)
}
}
}
}
}
}
TaskRunnner.awaitTaskToRun()
TaskRunnner.awaitTaskToRun() 遍历readyQueue ,找到可执行的Task ;
有三种情况:
1.readyQueue 为空,结束循环,return null
2.找到【立刻需执行】的task, return task ; 若遍历发现还有大于1个Task可以执行,就去在开个线程执行;Task;backend.execute(runnable)
3.没【立刻需执行】task,coordinatorWaiting ==true : 唤醒**协同者线程**,并返回 null (当前线程也执行完成),结束循环
3.没【立刻需执行】task,coordinatorWaiting ==false :当前线程成为**协同者线程** ,让当前线程wait一定时间(由最近那个任务的执行时间决定),再进入下次循环寻找任务。
fun awaitTaskToRun(): Task? {
this.assertThreadHoldsLock()
while (true) {
if (readyQueues.isEmpty()) {
return null // Nothing to do.
}
val now = backend.nanoTime()
var minDelayNanos = Long.MAX_VALUE
var readyTask: Task? = null
var multipleReadyTasks = false
//循环做以下事情
// * 当前线程怎么做,执行任务或者睡眠
// * 如果多个任务,就再开个线程;
eachQueue@ for (queue in readyQueues) {
val candidate = queue.futureTasks[0]
val candidateDelay = maxOf(0L, candidate.nextExecuteNanoTime - now)
when {
// 计算最快可执行时间,休眠最短时间
candidateDelay > 0L -> {
minDelayNanos = minOf(candidateDelay, minDelayNanos)
continue@eachQueue
}
// 如果已经找到一个,又找到第二个,跳出循环,停止查找
readyTask != null -> {
multipleReadyTasks = true
break@eachQueue
}
// 找到需立刻执行任务
else -> {
readyTask = candidate
}
}
}
// Implement the decision.
when {
// 我们有任务需要执行,返回任务
readyTask != null -> {
beforeRun(readyTask)
// 再开个线程,去执行另个任务(多个任务是可执行的)。
if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
backend.execute(runnable)
}
return readyTask
}
// 唤醒调度器,当任务快开始了。并跳出while循环
coordinatorWaiting -> {
if (minDelayNanos < coordinatorWakeUpAt - now) {
backend.coordinatorNotify(this@TaskRunner)
}
return null
}
// 没有任务,且没有其他线程是调度者,我就是调度者
else -> {
coordinatorWaiting = true
coordinatorWakeUpAt = now + minDelayNanos
try {
backend.coordinatorWait(this@TaskRunner, minDelayNanos)
} catch (_: InterruptedException) {
// Will cause all tasks to exit unless more are scheduled!
cancelAll()
} finally {
coordinatorWaiting = false
}
}
}
}
}
找到可执行Task后
1.beforeRun
private fun beforeRun(task: Task) {
this.assertThreadHoldsLock()
task.nextExecuteNanoTime = -1L
val queue = task.queue!!
queue.futureTasks.remove(task) //将task从所属的taskQueue.futureTasks数组 移除
readyQueues.remove(queue) //task 所属的taskQueue 从readyQueues中移到busyQueues
queue.activeTask = task
busyQueues.add(queue)
}
2.runTask
private fun runTask(task: Task) {
this.assertThreadDoesntHoldLock()
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = task.name
var delayNanos = -1L
try {
delayNanos = task.runOnce() //执行,返回下次执行的时间
} finally {
synchronized(this) {
afterRun(task, delayNanos)
}
currentThread.name = oldName
}
}
3.afterRun
//@delayNanos 下次执行的时间
private fun afterRun(task: Task, delayNanos: Long) {
this.assertThreadHoldsLock()
val queue = task.queue!!
check(queue.activeTask === task)
val cancelActiveTask = queue.cancelActiveTask
queue.cancelActiveTask = false
queue.activeTask = null //执行完成,置空
busyQueues.remove(queue)/task 所属的taskQueue 从busyQueues中移到readyQueues
//需要下次执行 & 任务没被取消 & queue 未关闭
if (delayNanos != -1L && !cancelActiveTask && !queue.shutdown) {
queue.scheduleAndDecide(task, delayNanos, recurrence = true) //安排下次执行热舞
}
if (queue.futureTasks.isNotEmpty()) {
readyQueues.add(queue)
}
}
TaskRunner的内部类 RealBackend
内部是线程池,执行TaskRunnner的runner
interface Backend {
fun beforeTask(taskRunner: TaskRunner)
fun nanoTime(): Long
fun coordinatorNotify(taskRunner: TaskRunner)
fun coordinatorWait(taskRunner: TaskRunner, nanos: Long)
fun execute(runnable: Runnable)
}
class RealBackend(threadFactory: ThreadFactory) : Backend {
private val executor = ThreadPoolExecutor(
0, // corePoolSize.
Int.MAX_VALUE, // maximumPoolSize.
60L, TimeUnit.SECONDS, // keepAliveTime.
SynchronousQueue(), //没有容量大小的对列;放进去不能够立马返回的,需等待别人把数据消费掉了,才能够返回
threadFactory
)
override fun beforeTask(taskRunner: TaskRunner) {
}
override fun nanoTime() = System.nanoTime()
override fun coordinatorNotify(taskRunner: TaskRunner) {//唤醒
taskRunner.notify()
}
/**
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
* "don't wait" instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) { //等待
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(taskRunner as Object).wait(ms, ns.toInt())
}
}
override fun execute(runnable: Runnable) {//执行
executor.execute(runnable)
}
fun shutdown() {
executor.shutdown()
}
}
介绍TaskQueue
队列中的工作不是并发的。这相当于每个队列都有一个专用线程为其工作;在实践中,一组队列可以共享一组线程以节省资源。
internal var shutdown = false
/** 当前正在执行的. */
internal var activeTask: Task? = null
/** 按照执行顺序排列的 Scheduled tasks ordered by [Task.nextExecuteNanoTime]. */
internal val futureTasks = mutableListOf<Task>()
/** 当activeTask要被取消试,标识为true */
internal var cancelActiveTask = false
schedule()
任务将在delayNanos后执行。一个任务只执行一次;若任务已在队列中,且时间更早,就舍弃当前。若不在,与futureTasks 中的任务的执行时间比较,按执行时间加入futureTasks 。
fun schedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (shutdown) {
if (task.cancelable) {
taskLog(task, this) { "schedule canceled (queue is shutdown)" }
return
}
taskLog(task, this) { "schedule failed (queue is shutdown)" }
throw RejectedExecutionException()
}
if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
taskRunner.kickCoordinator(this) ////加入且是在futureTasks的第一个,让taskRunner执行TaskQueue
}
}
}
scheduleAndDecide()
1.futureTasks看看是否存在task .
1.1若存在,且比task参数更早执行,return false ;
1.2若存在,且比task参数更完执行,删除存在的task ; 按1.3走
1.3不存在,加入;
按个比较futureTasks中任务的时间。按时间早晚找到它的index .放进去
internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
task.initQueue(this)
val now = taskRunner.backend.nanoTime()
val executeNanoTime = now + delayNanos
// If the task is already scheduled, take the earlier of the two times.
val existingIndex = futureTasks.indexOf(task)
if (existingIndex != -1) {
if (task.nextExecuteNanoTime <= executeNanoTime) {
taskLog(task, this) { "already scheduled" }
return false
}
futureTasks.removeAt(existingIndex) // Already scheduled later: reschedule below!
}
task.nextExecuteNanoTime = executeNanoTime
taskLog(task, this) {
if (recurrence) "run again after ${formatDuration(executeNanoTime - now)}"
else "scheduled after ${formatDuration(executeNanoTime - now)}"
}
// Insert in chronological order. Always compare deltas because nanoTime() is permitted to wrap.
// 插入按时间顺去编排。比较执行时间,找到第一个比他晚的
var insertAt = futureTasks.indexOfFirst { it.nextExecuteNanoTime - now > delayNanos }
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)
// 如果我们放在第一个,要去影响TaskRunner
return insertAt == 0
}
cancelAll()
关于cancelAll()方法,与shutdown()相似
fun cancelAll() {
this.assertThreadDoesntHoldLock()
synchronized(taskRunner) {
if (cancelAllAndDecide()) { //feturnTasks 数组是有变动的,被移除Task元素
taskRunner.kickCoordinator(this) //TaskRunner 执行
}
}
}
internal fun cancelAllAndDecide(): Boolean {
if (activeTask != null && activeTask!!.cancelable) { //activeTask 能被取消
cancelActiveTask = true //标识
}
//遍历数组,task 能被取消,就将从futureTasks它移除
var tasksCanceled = false
for (i in futureTasks.size - 1 downTo 0) {
if (futureTasks[i].cancelable) {
taskLog(futureTasks[i], this) { "canceled" }
tasksCanceled = true
futureTasks.removeAt(i)
}
}
return tasksCanceled //只要futureTasks有被删除,就返回true
}