Administrator
发布于 2023-11-20 / 21 阅读
0
0

okhttp-原理解析-TaskRunner

TaskRunner概况.

- TaskRunner 是线程创建和执行的,是执行Task的执行者。

TaskRunner的创建和获取

  companion object {
    @JvmField
    val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true)))

    val logger: Logger = Logger.getLogger(TaskRunner::class.java.name)
  }
}

TaskRunner的成员变量

 private var nextQueueName = 10000
  private var coordinatorWaiting = false
  private var coordinatorWakeUpAt = 0L

  /** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
  /** 存 正在执行的TaskQueue **/
  private val busyQueues = mutableListOf<TaskQueue>()

  /** Queues not in [busyQueues] that have non-empty [TaskQueue.futureTasks]. */
  /** 存 准备执行的TaskQueue **/
  private val readyQueues = mutableListOf<TaskQueue>()

  private val runnable: Runnable = object : Runnable { //用来执行Task
    override fun run() {
      while (true) {
      	//1.readyQueue为空-> 结束循环,线程执行结束
      	//2.未找到task, 当前是非协同者线程 ->唤醒“协同者线程” ,结束循环,线程执行结束 ;
      	//3.未找到task, 当前无协同者线程 -> 成为协同者线程,休眠,下次唤醒或休眠结束再找;
      	//4.找到了task -> 执行它
        val task = synchronized(this@TaskRunner) {
          awaitTaskToRun() //如果有可以执行的Task
        } ?: return

        logElapsed(task, task.queue!!) {
          var completedNormally = false
          try {
            runTask(task)
            completedNormally = true
          } finally {
            // If the task is crashing start another thread to service the queues.
            if (!completedNormally) { //有异常,开另个线程执行
              backend.execute(this)
            }
          }
        }
      }
    }
  }

TaskRunnner.awaitTaskToRun()

TaskRunnner.awaitTaskToRun() 遍历readyQueue ,找到可执行的Task ;

有三种情况:

1.readyQueue 为空,结束循环,return null

2.找到【立刻需执行】的task, return task ; 若遍历发现还有大于1个Task可以执行,就去在开个线程执行;Task;backend.execute(runnable)

3.没【立刻需执行】task,coordinatorWaiting ==true : 唤醒**协同者线程**,并返回 null (当前线程也执行完成),结束循环

3.没【立刻需执行】task,coordinatorWaiting ==false :当前线程成为**协同者线程** ,让当前线程wait一定时间(由最近那个任务的执行时间决定),再进入下次循环寻找任务。

fun awaitTaskToRun(): Task? {
    this.assertThreadHoldsLock()

    while (true) {
      if (readyQueues.isEmpty()) {
        return null // Nothing to do.
      }

      val now = backend.nanoTime()
      var minDelayNanos = Long.MAX_VALUE
      var readyTask: Task? = null
      var multipleReadyTasks = false

      //循环做以下事情
      //  * 当前线程怎么做,执行任务或者睡眠
      //  * 如果多个任务,就再开个线程; 
      eachQueue@ for (queue in readyQueues) {
        val candidate = queue.futureTasks[0]
        val candidateDelay = maxOf(0L, candidate.nextExecuteNanoTime - now)

        when {
          // 计算最快可执行时间,休眠最短时间
          candidateDelay > 0L -> {
            minDelayNanos = minOf(candidateDelay, minDelayNanos)
            continue@eachQueue
          }

          // 如果已经找到一个,又找到第二个,跳出循环,停止查找
          readyTask != null -> {
            multipleReadyTasks = true
            break@eachQueue
          }

          // 找到需立刻执行任务
          else -> {
            readyTask = candidate
          }
        }
      }

      // Implement the decision.
      when {
        // 我们有任务需要执行,返回任务
        readyTask != null -> {
          beforeRun(readyTask)

          // 再开个线程,去执行另个任务(多个任务是可执行的)。
          if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
            backend.execute(runnable)
          }

          return readyTask
        }

        // 唤醒调度器,当任务快开始了。并跳出while循环
        coordinatorWaiting -> {
          if (minDelayNanos < coordinatorWakeUpAt - now) {
            backend.coordinatorNotify(this@TaskRunner)
          }
          return null
        }

        // 没有任务,且没有其他线程是调度者,我就是调度者
        else -> {
          coordinatorWaiting = true
          coordinatorWakeUpAt = now + minDelayNanos
          try {
            backend.coordinatorWait(this@TaskRunner, minDelayNanos)
          } catch (_: InterruptedException) {
            // Will cause all tasks to exit unless more are scheduled!
            cancelAll()
          } finally {
            coordinatorWaiting = false
          }
        }
      }
    }
  }

找到可执行Task后

1.beforeRun

private fun beforeRun(task: Task) {
       this.assertThreadHoldsLock()
   
       task.nextExecuteNanoTime = -1L
       val queue = task.queue!!
       queue.futureTasks.remove(task) //将task从所属的taskQueue.futureTasks数组 移除
       readyQueues.remove(queue) //task 所属的taskQueue 从readyQueues中移到busyQueues
       queue.activeTask = task
       busyQueues.add(queue)   
}

2.runTask

private fun runTask(task: Task) {
    this.assertThreadDoesntHoldLock()

    val currentThread = Thread.currentThread()
    val oldName = currentThread.name
    currentThread.name = task.name

    var delayNanos = -1L
    try {
      delayNanos = task.runOnce() //执行,返回下次执行的时间
    } finally {
      synchronized(this) {
        afterRun(task, delayNanos)
      }
      currentThread.name = oldName
    }
  }

3.afterRun

//@delayNanos 下次执行的时间
private fun afterRun(task: Task, delayNanos: Long) {
    this.assertThreadHoldsLock()

    val queue = task.queue!!
    check(queue.activeTask === task)

    val cancelActiveTask = queue.cancelActiveTask
    queue.cancelActiveTask = false
    queue.activeTask = null //执行完成,置空
    busyQueues.remove(queue)/task 所属的taskQueue 从busyQueues中移到readyQueues

	//需要下次执行 & 任务没被取消 & queue 未关闭
    if (delayNanos != -1L && !cancelActiveTask && !queue.shutdown) {
      queue.scheduleAndDecide(task, delayNanos, recurrence = true) //安排下次执行热舞
    }

    if (queue.futureTasks.isNotEmpty()) {
      readyQueues.add(queue) 
    }
  }

TaskRunner的内部类 RealBackend

内部是线程池,执行TaskRunnner的runner

interface Backend {
    fun beforeTask(taskRunner: TaskRunner)
    fun nanoTime(): Long
    fun coordinatorNotify(taskRunner: TaskRunner)
    fun coordinatorWait(taskRunner: TaskRunner, nanos: Long)
    fun execute(runnable: Runnable)
  }

  class RealBackend(threadFactory: ThreadFactory) : Backend {
    private val executor = ThreadPoolExecutor(
        0, // corePoolSize.
        Int.MAX_VALUE, // maximumPoolSize.
        60L, TimeUnit.SECONDS, // keepAliveTime.
        SynchronousQueue(), //没有容量大小的对列;放进去不能够立马返回的,需等待别人把数据消费掉了,才能够返回
        threadFactory
    )

    override fun beforeTask(taskRunner: TaskRunner) {
    }

    override fun nanoTime() = System.nanoTime()

    override fun coordinatorNotify(taskRunner: TaskRunner) {//唤醒
      taskRunner.notify()
    }

    /**
     * Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
     * "don't wait" instead of "wait forever".
     */
    @Throws(InterruptedException::class)
    @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
    override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) { //等待
      val ms = nanos / 1_000_000L
      val ns = nanos - (ms * 1_000_000L)
      if (ms > 0L || nanos > 0) {
        (taskRunner as Object).wait(ms, ns.toInt())
      }
    }

    override fun execute(runnable: Runnable) {//执行
      executor.execute(runnable)
    }

    fun shutdown() {
      executor.shutdown()
    }
  }

介绍TaskQueue

队列中的工作不是并发的。这相当于每个队列都有一个专用线程为其工作;在实践中,一组队列可以共享一组线程以节省资源。

  internal var shutdown = false
  /** 当前正在执行的. */
  internal var activeTask: Task? = null

  /** 按照执行顺序排列的 Scheduled tasks ordered by [Task.nextExecuteNanoTime].  */
  internal val futureTasks = mutableListOf<Task>()

  /** 当activeTask要被取消试,标识为true */
  internal var cancelActiveTask = false

schedule()

任务将在delayNanos后执行。一个任务只执行一次;若任务已在队列中,且时间更早,就舍弃当前。若不在,与futureTasks 中的任务的执行时间比较,按执行时间加入futureTasks 。

fun schedule(task: Task, delayNanos: Long = 0L) {
    synchronized(taskRunner) {
      if (shutdown) {
        if (task.cancelable) {
          taskLog(task, this) { "schedule canceled (queue is shutdown)" }
          return
        }
        taskLog(task, this) { "schedule failed (queue is shutdown)" }
        throw RejectedExecutionException()
      }

      if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
        taskRunner.kickCoordinator(this)  ////加入且是在futureTasks的第一个,让taskRunner执行TaskQueue
      }
    }
  }

scheduleAndDecide()

1.futureTasks看看是否存在task .

1.1若存在,且比task参数更早执行,return false ;

1.2若存在,且比task参数更完执行,删除存在的task ; 按1.3走

1.3不存在,加入;

按个比较futureTasks中任务的时间。按时间早晚找到它的index .放进去

internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
    task.initQueue(this)

    val now = taskRunner.backend.nanoTime()
    val executeNanoTime = now + delayNanos

    // If the task is already scheduled, take the earlier of the two times.
    val existingIndex = futureTasks.indexOf(task)
    if (existingIndex != -1) {
      if (task.nextExecuteNanoTime <= executeNanoTime) {
        taskLog(task, this) { "already scheduled" }
        return false
      }
      futureTasks.removeAt(existingIndex) // Already scheduled later: reschedule below!
    }
    task.nextExecuteNanoTime = executeNanoTime
    taskLog(task, this) {
      if (recurrence) "run again after ${formatDuration(executeNanoTime - now)}"
      else "scheduled after ${formatDuration(executeNanoTime - now)}"
    }

    // Insert in chronological order. Always compare deltas because nanoTime() is permitted to wrap.
    // 插入按时间顺去编排。比较执行时间,找到第一个比他晚的
    var insertAt = futureTasks.indexOfFirst { it.nextExecuteNanoTime - now > delayNanos }
    if (insertAt == -1) insertAt = futureTasks.size 
    futureTasks.add(insertAt, task)

    // 如果我们放在第一个,要去影响TaskRunner
    return insertAt == 0
  }

cancelAll()

关于cancelAll()方法,与shutdown()相似

fun cancelAll() {
    this.assertThreadDoesntHoldLock()

    synchronized(taskRunner) {
      if (cancelAllAndDecide()) { //feturnTasks 数组是有变动的,被移除Task元素
        taskRunner.kickCoordinator(this)  //TaskRunner 执行 
      }
    }
  }
  internal fun cancelAllAndDecide(): Boolean {
    if (activeTask != null && activeTask!!.cancelable) { //activeTask 能被取消
      cancelActiveTask = true //标识
    }
	//遍历数组,task 能被取消,就将从futureTasks它移除
    var tasksCanceled = false
    for (i in futureTasks.size - 1 downTo 0) {
      if (futureTasks[i].cancelable) {
        taskLog(futureTasks[i], this) { "canceled" }
        tasksCanceled = true
        futureTasks.removeAt(i)
      }
    }
    return tasksCanceled //只要futureTasks有被删除,就返回true
  }


评论