Android okhttp的启动流程及源码解析

前言

这篇文章主要讲解了okhttp的主要工作流程以及源码的解析。

什么是OKhttp

简单来说 OkHttp 就是一个客户端用来发送 HTTP 消息并对服务器的响应做出处理的应用层框架。 那么它有什么优点呢?

  • 易使用、易扩展。
  • 支持 HTTP/2 协议,允许对同一主机的所有请求共用同一个 socket 连接。
  • 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟。
  • 支持 GZIP,减小了下载大小。
  • 支持缓存处理,可以避免重复请求。
  • 如果你的服务有多个 IP 地址,当第一次连接失败,OkHttp 会尝试备用地址。
  • OkHttp 还处理了代理服务器问题和SSL握手失败问题。

OkHttp是如何做网络请求的

1.它是如何使用的?

1.1 通过构造者模式添加 url,method,header,body 等完成一个请求的信息 Request 对象

val request = Request.Builder()

.url("")

.addHeader("","")

.get()

.build()

1.2 同样通过构造者模式创建一个 OkHttpClicent 实例,可以按需配置

val okHttpClient = OkHttpClient.Builder()

.connectTimeout(15, TimeUnit.SECONDS)

.readTimeout(15, TimeUnit.SECONDS)

.addInterceptor()

.build()

1.3 创建 Call 并且发起网络请求

val newCall = okHttpClient.newCall(request)

//异步请求数据

newCall.enqueue(object :Callback{

override fun onFailure(call: Call, e: IOException) {}

override fun onResponse(call: Call, response: Response) {}

})

//同步请求数据

val response = newCall.execute()

整个使用流程很简单,主要的地方在于如何通过 Call 对象发起同/异步请求,后续的源码追踪以方法开始。

2.如何通过 Call 发起请求?

2.1 Call 是什么

/** Prepares the [request] to be executed at some point in the future. */

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

2.2 发起请求-异步请求

//RealCall#enqueue(responseCallback: Callback)

override fun enqueue(responseCallback: Callback) {

synchronized(this) {

//检查这个call是否执行过,每个 call 只能被执行一次

check(!executed) { "Already Executed" }

executed = true

}

//此方法调用了EventListener#callStart(call: Call),

主要是用来监视应用程序的HTTP调用的数量,大小和各个阶段的耗时

callStart()

//创建AsyncCall,实际是个Runnable

client.dispatcher.enqueue(AsyncCall(responseCallback))

}

enqueue 最后一个方法分为两步

  • 第一步将响应的回调放入 AsyncCall 对象中 ,AsyncCall 对象是 RealCall 的一个内部类实现了 Runnable 接口。
  • 第二步通过 Dispatcher 类的 enqueue() 将 AsyncCall 对象传入

//Dispatcher#enqueue(call: AsyncCall)

/** Ready async calls in the order they'll be run. */

private val readyAsyncCalls = ArrayDeque<AsyncCall>()

internal fun enqueue(call: AsyncCall) {

synchronized(this) {

//将call添加到即将运行的异步队列

readyAsyncCalls.add(call)

...

promoteAndExecute()

}

//Dispatcher#promoteAndExecute()

//将[readyAsyncCalls]过渡到[runningAsyncCalls]

private fun promoteAndExecute(): Boolean {

...

for (i in 0 until executableCalls.size) {

val asyncCall = executableCalls[i]

//这里就是通过 ExecutorService 执行 run()

asyncCall.executeOn(executorService)

}

return isRunning

}

//RealCall.kt中的内部类

internal inner class AsyncCall(

private val responseCallback: Callback

) : Runnable {

fun executeOn(executorService: ExecutorService) {

...

//执行Runnable

executorService.execute(this)

...

}

override fun run() {

threadName("OkHttp ${redactedUrl()}") {

...

try {

//兜兜转转 终于调用这个关键方法了

val response = getResponseWithInterceptorChain()

signalledCallback = true

//通过之前传入的接口回调数据

responseCallback.onResponse(this@RealCall, response)

} catch (e: IOException) {

if (signalledCallback) {

Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)

} else {

responseCallback.onFailure(this@RealCall, e)

}

} catch (t: Throwable) {

cancel()

if (!signalledCallback) {

val canceledException = IOException("canceled due to $t")

canceledException.addSuppressed(t)

responseCallback.onFailure(this@RealCall, canceledException)

}

throw t

} finally {

//移除队列

client.dispatcher.finished(this)

}

}

}

}

2.3 同步请求 RealCall#execute()

override fun execute(): Response {

//同样判断是否执行过

synchronized(this) {

check(!executed) { "Already Executed" }

executed = true

}

timeout.enter()

//同样监听

callStart()

try {

//同样执行

client.dispatcher.executed(this)

return getResponseWithInterceptorChain()

} finally {

//同样移除

client.dispatcher.finished(this)

}

}

3.如何通过拦截器处理请求和响应?

无论同异步请求都会调用到 getResponseWithInterceptorChain() ,这个方法主要使用责任链模式将整个请求分为几个拦截器调用 ,简化了各自的责任和逻辑,可以扩展其它拦截器,看懂了拦截器 OkHttp 就了解的差不多了。

@Throws(IOException::class)

internal fun getResponseWithInterceptorChain(): Response {

// 构建完整的拦截器

val interceptors = mutableListOf<Interceptor>()

interceptors += client.interceptors //用户自己拦截器,数据最开始和最后

interceptors += RetryAndFollowUpInterceptor(client) //失败后的重试和重定向

interceptors += BridgeInterceptor(client.cookieJar) //桥接用户的信息和服务器的信息

interceptors += CacheInterceptor(client.cache) //处理缓存相关

interceptors += ConnectInterceptor //负责与服务器连接

if (!forWebSocket) {

interceptors += client.networkInterceptors //配置 OkHttpClient 时设置,数据未经处理

}

interceptors += CallServerInterceptor(forWebSocket) //负责向服务器发送请求数据、从服务器读取响应数据

//创建拦截链

val chain = RealInterceptorChain(

call = this,

interceptors = interceptors,

index = 0,

exchange = null,

request = originalRequest,

connectTimeoutMillis = client.connectTimeoutMillis,

readTimeoutMillis = client.readTimeoutMillis,

writeTimeoutMillis = client.writeTimeoutMillis

)

var calledNoMoreExchanges = false

try {

//拦截链的执行

val response = chain.proceed(originalRequest)

...

} catch (e: IOException) {

...

} finally {

...

}

}

//1.RealInterceptorChain#proceed(request: Request)

@Throws(IOException::class)

override fun proceed(request: Request): Response {

...

// copy出新的拦截链,链中的拦截器集合index+1

val next = copy(index = index + 1, request = request)

val interceptor = interceptors[index]

//调用拦截器的intercept(chain: Chain): Response 返回处理后的数据 交由下一个拦截器处理

@Suppress("USELESS_ELVIS")

val response = interceptor.intercept(next) ?: throw NullPointerException(

"interceptor $interceptor returned null")

...

//返回最终的响应体

return response

}

拦截器开始操作 Request。

3.1 拦截器是怎么拦截的?

拦截器都继承自 Interceptor 类并实现了 fun intercept(chain: Chain): Response 方法。

在 intercept 方法里传入 chain 对象 调用它的 proceed() 然后 proceed() 方法里又 copy 下一个拦截器,然后双调用了 intercept(chain: Chain) 接着叒 chain.proceed(request) 直到最后一个拦截器 return response 然后一层一层向上反馈数据。

3.2 RetryAndFollowUpInterceptor

这个拦截器是用来处理重定向的后续请求和失败重试,也就是说一般第一次发起请求不需要重定向会调用下一个拦截器。

@Throws(IOException::class)

override fun intercept(chain: Interceptor.Chain): Response {

val realChain = chain as RealInterceptorChain

var request = chain.request

val call = realChain.call

var followUpCount = 0

var priorResponse: Response? = null

var newExchangeFinder = true

var recoveredFailures = listOf<IOException>()

while (true) {

...//在调用下一个拦截器前的操作

var response: Response

try {

...

try {

//调用下一个拦截器

response = realChain.proceed(request)

newExchangeFinder = true

} catch (e: RouteException) {

...

continue

} catch (e: IOException) {

...

continue

}

...

//处理上一个拦截器返回的 response

val followUp = followUpRequest(response, exchange)

...

//中间有一些判断是否需要重新请求 不需要则返回 response

//处理之后重新请求 Request

request = followUp

priorResponse = response

} finally {

call.exitNetworkInterceptorExchange(closeActiveExchange)

}

}

}

@Throws(IOException::class)

private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {

val route = exchange?.connection?.route()

val responseCode = userResponse.code

val method = userResponse.request.method

when (responseCode) {

//3xx 重定向

HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {

//这个方法重新 构建了 Request 用于重新请求

return buildRedirectRequest(userResponse, method)

}

... 省略一部分code

else -> return null

}

}

在 followUpRequest(userResponse: Response, exchange: Exchange?): Request? 方法中判断了 response 中的服务器响应码做出了不同的操作。

3.3 BridgeInterceptor

它负责对于 Http 的额外预处理,比如 Content-Length 的计算和添加、 gzip 的⽀持(Accept-Encoding: gzip)、 gzip 压缩数据的解包等,这个类比较简单就不贴代码了,想了解的话可以自行查看。

3.4 CacheInterceptor

这个类负责 Cache 的处理,如果本地有了可⽤的 Cache,⼀个请求可以在没有发⽣实质⽹络交互的情况下就返回缓存结果,实现如下。

@Throws(IOException::class)

override fun intercept(chain: Interceptor.Chain): Response {

//在Cache(DiskLruCache)类中 通过request.url匹配response

val cacheCandidate = cache?.get(chain.request())

//记录当前时间点

val now = System.currentTimeMillis()

//缓存策略 有两种类型

//networkRequest 网络请求

//cacheResponse 缓存的响应

val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()

val networkRequest = strategy.networkRequest

val cacheResponse = strategy.cacheResponse

//计算请求次数和缓存次数

cache?.trackResponse(strategy)

...

// 如果 禁止使用网络 并且 缓存不足,返回504和空body的Response

if (networkRequest == null && cacheResponse == null) {

return Response.Builder()

.request(chain.request())

.protocol(Protocol.HTTP_1_1)

.code(HTTP_GATEWAY_TIMEOUT)

.message("Unsatisfiable Request (only-if-cached)")

.body(EMPTY_RESPONSE)

.sentRequestAtMillis(-1L)

.receivedResponseAtMillis(System.currentTimeMillis())

.build()

}

// 如果策略中不能使用网络,就把缓存中的response封装返回

if (networkRequest == null) {

return cacheResponse!!.newBuilder()

.cacheResponse(stripBody(cacheResponse))

.build()

}

//调用拦截器process从网络获取数据

var networkResponse: Response? = null

try {

networkResponse = chain.proceed(networkRequest)

} finally {

// If we're crashing on I/O or otherwise, don't leak the cache body.

if (networkResponse == null && cacheCandidate != null) {

cacheCandidate.body?.closeQuietly()

}

}

//如果有缓存的Response

if (cacheResponse != null) {

//如果网络请求返回code为304 即说明资源未修改

if (networkResponse?.code == HTTP_NOT_MODIFIED) {

//直接封装封装缓存的Response返回即可

val response = cacheResponse.newBuilder()

.headers(combine(cacheResponse.headers, networkResponse.headers))

.sentRequestAtMillis(networkResponse.sentRequestAtMillis)

.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)

.cacheResponse(stripBody(cacheResponse))

.networkResponse(stripBody(networkResponse))

.build()

networkResponse.body!!.close()

// Update the cache after combining headers but before stripping the

// Content-Encoding header (as performed by initContentStream()).

cache!!.trackConditionalCacheHit()

cache.update(cacheResponse, response)

return response

} else {

cacheResponse.body?.closeQuietly()

}

}

val response = networkResponse!!.newBuilder()

.cacheResponse(stripBody(cacheResponse))

.networkResponse(stripBody(networkResponse))

.build()

if (cache != null) {

//判断是否具有主体 并且 是否可以缓存供后续使用

if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {

// 加入缓存中

val cacheRequest = cache.put(response)

return cacheWritingResponse(cacheRequest, response)

}

//如果请求方法无效 就从缓存中remove掉

if (HttpMethod.invalidatesCache(networkRequest.method)) {

try {

cache.remove(networkRequest)

} catch (_: IOException) {

// The cache cannot be written.

}

}

}

return response

}

3.5 ConnectInterceptor

此类负责建⽴连接。 包含了⽹络请求所需要的 TCP 连接(HTTP),或者 TCP 之前的 TLS 连接(HTTPS),并且会创建出对应的 HttpCodec 对象(⽤于编码解码 HTTP 请求)。

@Throws(IOException::class)

override fun intercept(chain: Interceptor.Chain): Response {

val realChain = chain as RealInterceptorChain

val exchange = realChain.call.initExchange(chain)

val connectedChain = realChain.copy(exchange = exchange)

return connectedChain.proceed(realChain.request)

}

看似短短四行实际工作还是比较多的。

/** Finds a new or pooled connection to carry a forthcoming request and response. */

internal fun initExchange(chain: RealInterceptorChain): Exchange {

...

//codec是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和Http2Codec,对应 HTTP/1.1 和 HTTP/2。

val codec = exchangeFinder.find(client, chain)

val result = Exchange(this, eventListener, exchangeFinder, codec)

...

return result

}

#ExchangeFinder.find

fun find(client: OkHttpClient,chain: RealInterceptorChain):ExchangeCodec {

try {

//寻找一个可用的连接

val resultConnection = findHealthyConnection(

connectTimeout = chain.connectTimeoutMillis,

readTimeout = chain.readTimeoutMillis,

writeTimeout = chain.writeTimeoutMillis,

pingIntervalMillis = client.pingIntervalMillis,

connectionRetryEnabled = client.retryOnConnectionFailure,

doExtensiveHealthChecks = chain.request.method != "GET"

)

return resultConnection.newCodec(client, chain)

} catch (e: RouteException) {

trackFailure(e.lastConnectException)

throw e

} catch (e: IOException) {

trackFailure(e)

throw RouteException(e)

}

}

@Throws(IOException::class)

private fun findHealthyConnection(

connectTimeout: Int,

readTimeout: Int,

writeTimeout: Int,

pingIntervalMillis: Int,

connectionRetryEnabled: Boolean,

doExtensiveHealthChecks: Boolean

): RealConnection {

while (true) {

//寻找连接

val candidate = findConnection(

connectTimeout = connectTimeout,

readTimeout = readTimeout,

writeTimeout = writeTimeout,

pingIntervalMillis = pingIntervalMillis,

connectionRetryEnabled = connectionRetryEnabled

)

//确认找到的连接可用并返回

if (candidate.isHealthy(doExtensiveHealthChecks)) {

return candidate

}

...

throw IOException("exhausted all routes")

}

}

@Throws(IOException::class)

private fun findConnection(

connectTimeout: Int,

readTimeout: Int,

writeTimeout: Int,

pingIntervalMillis: Int,

connectionRetryEnabled: Boolean

): RealConnection {

if (call.isCanceled()) throw IOException("Canceled")

// 1. 尝试重用这个call的连接 比如重定向需要再次请求 那么这里就会重用之前的连接

val callConnection = call.connection

if (callConnection != null) {

var toClose: Socket? = null

synchronized(callConnection) {

if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {

toClose = call.releaseConnectionNoEvents()

}

}

//返回这个连接

if (call.connection != null) {

check(toClose == null)

return callConnection

}

// The call's connection was released.

toClose?.closeQuietly()

eventListener.connectionReleased(call, callConnection)

}

...

// 2. 尝试从连接池中找一个连接 找到就返回连接

if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {

val result = call.connection!!

eventListener.connectionAcquired(call, result)

return result

}

// 3. 如果连接池中没有 计算出下一次要尝试的路由

val routes: List<Route>?

val route: Route

if (nextRouteToTry != null) {

// Use a route from a preceding coalesced connection.

routes = null

route = nextRouteToTry!!

nextRouteToTry = null

} else if (routeSelection != null && routeSelection!!.hasNext()) {

// Use a route from an existing route selection.

routes = null

route = routeSelection!!.next()

} else {

// Compute a new route selection. This is a blocking operation!

var localRouteSelector = routeSelector

if (localRouteSelector == null) {

localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)

this.routeSelector = localRouteSelector

}

val localRouteSelection = localRouteSelector.next()

routeSelection = localRouteSelection

routes = localRouteSelection.routes

if (call.isCanceled()) throw IOException("Canceled")

// Now that we have a set of IP addresses, make another attempt at getting a connection from

// the pool. We have a better chance of matching thanks to connection coalescing.

if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {

val result = call.connection!!

eventListener.connectionAcquired(call, result)

return result

}

route = localRouteSelection.next()

}

// Connect. Tell the call about the connecting call so async cancels work.

// 4.到这里还没有找到可用的连接 但是找到了 route 即路由 进行socket/tls连接

val newConnection = RealConnection(connectionPool, route)

call.connectionToCancel = newConnection

try {

newConnection.connect(

connectTimeout,

readTimeout,

writeTimeout,

pingIntervalMillis,

connectionRetryEnabled,

call,

eventListener

)

} finally {

call.connectionToCancel = null

}

call.client.routeDatabase.connected(newConnection.route())

// If we raced another call connecting to this host, coalesce the connections. This makes for 3

// different lookups in the connection pool!

// 4.查找是否有多路复用(http2)的连接,有就返回

if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {

val result = call.connection!!

nextRouteToTry = route

newConnection.socket().closeQuietly()

eventListener.connectionAcquired(call, result)

return result

}

synchronized(newConnection) {

//放入连接池中

connectionPool.put(newConnection)

call.acquireConnectionNoEvents(newConnection)

}

eventListener.connectionAcquired(call, newConnection)

return newConnection

}

接下来看看是如何建立连接的

fun connect(

connectTimeout: Int,

readTimeout: Int,

writeTimeout: Int,

pingIntervalMillis: Int,

connectionRetryEnabled: Boolean,

call: Call,

eventListener: EventListener

) {

...

while (true) {

try {

if (route.requiresTunnel()) {

//创建tunnel,用于通过http代理访问https

//其中包含connectSocket、createTunnel

connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)

if (rawSocket == null) {

// We were unable to connect the tunnel but properly closed down our resources.

break

}

} else {

//不创建tunnel就创建socket连接 获取到数据流

connectSocket(connectTimeout, readTimeout, call, eventListener)

}

//建立协议连接tsl

establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)

eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)

break

} catch (e: IOException) {

...

}

}

...

}

建立tsl连接

@Throws(IOException::class)

private fun establishProtocol(

connectionSpecSelector: ConnectionSpecSelector,

pingIntervalMillis: Int,

call: Call,

eventListener: EventListener

) {

//ssl为空 即http请求 明文请求

if (route.address.sslSocketFactory == null) {

if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {

socket = rawSocket

protocol = Protocol.H2_PRIOR_KNOWLEDGE

startHttp2(pingIntervalMillis)

return

}

socket = rawSocket

protocol = Protocol.HTTP_1_1

return

}

//否则为https请求 需要连接sslSocket 验证证书是否可被服务器接受 保存tsl返回的信息

eventListener.secureConnectStart(call)

connectTls(connectionSpecSelector)

eventListener.secureConnectEnd(call, handshake)

if (protocol === Protocol.HTTP_2) {

startHttp2(pingIntervalMillis)

}

}

至此,创建好了连接,返回到最开始的 find() 方法返回 ExchangeCodec 对象,再包装为 Exchange 对象用来下一个拦截器操作。

3.6 CallServerInterceptor

这个类负责实质的请求与响应的 I/O 操作,即往 Socket ⾥写⼊请求数据,和从 Socket ⾥读取响应数据。

总结

用一张 @piasy 的图来做总结,图很干练结构也很清晰。

以上就是Android okhttp的启动流程及源码解析的详细内容,更多关于Android okhttp的启动流程的资料请关注其它相关文章!

以上是 Android okhttp的启动流程及源码解析 的全部内容, 来源链接: utcz.com/p/243313.html

回到顶部