介绍
ConectionPool 是管理HTTP和HTTP/2连接的重用,以减少网络延迟。相同[地址]的HTTP请求可以共享[连接]。此类实现了保持打开以备将来使用的连接的策略。
ConnectionPool
以下可见,内部持有delegate(RealConnectionPool ),这是代理设计模式。创建了RealConnectionPool :最大空闲连接为5,最大空闲时长为5分钟时间
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool //实际的连接
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit
))
//创建ConnectionPool :最大空闲连接为5,最大空闲时长为5分钟时间
constructor() : this(5, 5, TimeUnit.MINUTES)
/** 返回空闲连接的总量 */
fun idleConnectionCount(): Int = delegate.idleConnectionCount()
/** 返回pool中的总数量 */
fun connectionCount(): Int = delegate.connectionCount()
/** 关闭和移除所有空闲的链接 */
fun evictAll() {
delegate.evictAll()
}
}
RealConnectionPool
RealConnectionPool 的成员变量
maxIdleConnections:Int
keepAliveDurationNs:Long
cleanupQueue:TaskQueue
cleanupTask:Task
connections:ConcurrentLinkedQueue<RealConnection>(java7.是分段锁)
class RealConnectionPool(
taskRunner: TaskRunner,
/** 1.最大空闲连接数量【每个地址对应自己的连接】*/
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
//2.空闲时长(纳秒)
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
//3.清理任务栈
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
//5.清理任务
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
/**
* 在删除或添加时操作时需要持有锁
* 6.存放连接栈
*/
private val connections = ConcurrentLinkedQueue<RealConnection>()
RealConnectionPool 关键方法:
callAcquirePooledConnection
方法目地:寻找连接池中是否有可复用的连接。
可复用的判断条件:该连接是isMultiplexed==true),且在分配限制内,noNewExchanges = false ,必须是http/2 ,共享IP地址,证书能覆盖该地址。
如果有call(RealCall),关联关系(call.connection = connection ,connection.calls.put(CallReference(this, callStackTrace))),并返回true;反之,return false 。
/**寻找连接池中是否有可复用的连接 */
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
//可多路复用 且 连接是可多路复用(http2)
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
//connection 是否可分配
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection) //关联关系
return true //找到了
}
}
return false
}
connectionBecameIdle(connection: RealConnection): Boolean
通知连接池【该连接】变空闲。当返回true,该连接被移除并关闭
fun connectionBecameIdle(connection: RealConnection): Boolean {
connection.assertThreadHoldsLock()
//连接不在使用 或 连接池的最大空闲数量 == 0 ,就将【连接】移除
return if (connection.noNewExchanges || maxIdleConnections == 0) {
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true
} else { //执行清理任务
cleanupQueue.schedule(cleanupTask)
false
}
}
put(connection: RealConnection)
加入任务栈,并执行清理任务
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
connections.add(connection)
cleanupQueue.schedule(cleanupTask)
}
evictAll()
逐出所有连接。
遍历连接栈。每个连接,当它的calls 为空就移除,并关闭连接的socket。
遍历结束,发现连接栈为空,就取消清理任务栈中的所有任务
fun evictAll() {
val i = connections.iterator()
while (i.hasNext()) {
val connection = i.next()
val socketToClose = synchronized(connection) {
if (connection.calls.isEmpty()) {
i.remove()
connection.noNewExchanges = true
return@synchronized connection.socket()
} else {
return@synchronized null
}
}
socketToClose?.closeQuietly()
}
if (connections.isEmpty()) cleanupQueue.cancelAll()
}
cleanup()
清除空闲过久的连接。
遍历连接栈(concurrentLinkedQueue)。计算inUseConnectionCount(使用中的连接数),idleConnectionCount(空闲连接连接数),longestIdleConnection(最久空闲的连接),longestIdleDurationNs (最久空闲连接的空闲时长)
遍历结束,四种情况
1.结束遍历后,如果空闲连接数 超过限制的(5个) 或 最久空闲连接的时长 超过限制(5分钟),就移除。并返回0(马上执行下次清理任务)
2.空闲连接数>0 ,返回下次被移除连接时间
3.使用中的连接数>0,空闲连接数==0 ,返回下次被移除连接时间
4.栈中无连接,返回-1,不执行
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
// Find either a connection to evict, or the time that the next eviction is due.
for (connection in connections) {
synchronized(connection) {
// 如果连接还在使用继续搜索
if (pruneAndGetAllocationCount(connection, now) > 0) {//返回存活calls的数量
inUseConnectionCount++
} else {
idleConnectionCount++
// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
} else {
Unit
}
}
}
}
when {
//1.空闲连接数 超过限制的(5个) 或 最久空闲连接的时长 超过限制(5分钟)
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
//选择连接驱逐(evict)。再次确认是否被驱逐
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // 不再是最久的
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}
connection.socket().closeQuietly() //socket 关闭
if (connections.isEmpty()) cleanupQueue.cancelAll()
// Clean up again immediately. 清理任务再次马上执行
return 0L
}
//2.空闲连接 > 0
idleConnectionCount > 0 -> {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs
}
//3.正在使用的连接 > 0
inUseConnectionCount > 0 -> {
// All connections are in use. It'll be at least the keep alive duration 'til we run
// again.
return keepAliveDurationNs //成员变量,保持alive时长
}
//4.没有连接,不执行
else -> {
// No connections, idle or in use.
return -1
}
}
}
pruneAndGetAllocationCount
cleanup()方法调用该方法
删除任何可能泄露的calls,并返回存活calls的数量。
如果连接正在跟踪调用,但应用程序代码已放弃,则调用会泄漏。依赖垃圾回收是不准确的,容易内存泄露
private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
connection.assertThreadHoldsLock()
val references = connection.calls // MutableList<Reference<RealCall>> , CallReference 继承 WeakReference
var i = 0
while (i < references.size) {
val reference = references[i]
if (reference.get() != null) {
i++
continue
}
// 发现泄露call。这是application bug
val callReference = reference as CallReference
val message = "A connection to ${connection.route().address.url} was leaked. " +
"Did you forget to close a response body?"
Platform.get().logCloseableLeak(message, callReference.callStackTrace)
references.removeAt(i) //数组里面移除它,这里i不自增,继续循环
connection.noNewExchanges = true //设为true, 将无新的交流,且要被移除pool
//这是最后一个call, 且移除。这个connection将可被驱逐
if (references.isEmpty()) {
connection.idleAtNs = now - keepAliveDurationNs
return 0
}
}
return references.size
}