Java OkHttp框架源码深入解析
作者:niuyongzhi 发布时间:2023-11-29 03:22:50
1.OkHttp发起网络请求
可以通过OkHttpClient发起一个网络请求
//创建一个Client,相当于打开一个浏览器
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
//创建一个请求。
Request request = new Request.Builder()
.url("http://www.baidu.com")
.method("GET",null)
.build();
//调用Client 创建一个Call。
Call call = okHttpClient.newCall(request);
//Call传入一个回调函数,并加入到请求队列。
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
}
通过Retrofit发起一个OkHttp请求
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://www.baidu.com/")
.build();
NetInterface netInterface = retrofit.create(NetInterface.class);
Call<Person> call = netInterface.getPerson();
call.enqueue(new Callback<Person>() {
@Override
public void onResponse(Call<Person> call, Response<Person> response) {
}
@Override
public void onFailure(Call<Person> call, Throwable t) {
}
});
以上两种方式都是通过call.enqueue() 把网络请求加入到请求队列的。
这个call是RealCall的一个对象。
public void enqueue(Callback responseCallback) {
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
这里有两个判断条件
runningAsyncCalls.size() < maxRequests如果运行队列数量大于最大数量,
runningCallsForHost(call) < maxRequestsPerHost并且访问同一台服务器的请求数量大于最大数量,请求会放入等待队列,否则加入运行队列,直接执行。
//等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//运行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//运行队列数量最大值
private int maxRequests = 64;
//访问不同主机的最大数量
private int maxRequestsPerHost = 5;
dispatcher.java
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
接下来看这行代码executorService().execute(call);
executorService()拿到一个线程池实例,
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
execute(call)执行任务,发起网络请求。
//AsyncCall.java
@Override protected void execute() {
try {
//这个方法去请求网络,会返回Respose
Response response = getResponseWithInterceptorChain();
//请求成功,回调接口
responseCallback.onResponse(RealCall.this, response);
}catch(Exceptrion e){
//失败回调
responseCallback.onFailure(RealCall.this, e);
}finally {
//从当前运行队列中删除这个请求
client.dispatcher().finished(this);
}
}
getResponseWithInterceptorChain()
这行代码,使用了设计模式中的责任链模式。
//这个方法命名:通过 * 链,获取Response
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 这个我们自己定义的 * 。
interceptors.addAll(client.interceptors());
//重试和重定向 *
interceptors.add(retryAndFollowUpInterceptor);
//请求头 *
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存 *
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接 *
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//访问 *
interceptors.add(new CallServerInterceptor(forWebSocket));
// * 责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//执行 * 集合中的 *
return chain.proceed(originalRequest);
}
责任链模式中,链条的上游持有下游对象的引用。这样能够保证在链条上的每一个对象,都能对其符合条件的任务进行处理。
但是在上面的 * 构成责任链中,是把 * ,放在了一个集合中。
第一个参数interceptors 是一个 * 的集合。
第五个参数0是集合的index,RealInterceptorChain就是根据这个索引值+1,
对chain.proceed方法循环调用,进行集合遍历,并执行 * 中定义的方法的。
这个责任链模式,并没有明确的指定下游对象是什么,而是通过集合index值的变化,动态的指定的。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......)
chain.proceed(originalRequest);
public Response proceed(Request request,...){
//构建一个index+1的 * 链
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1,....);
//拿到当前的 *
Interceptor interceptor = interceptors.get(index);
//调用 * intercept(next)方法,
//在这个方法中继续调用realChain.proceed(),从而进行循环调用,index索引值再加1.
Response response = interceptor.intercept(next);
}
2.OkHttp的连接器
1)RetryAndFollowUpInterceptor:重试和重定向 *
public Response intercept(Chain chain){
while (true) {
Response response;
try {
//创建StreamAllocation对象,这个对象会在连接 * 中用到
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
调用责任链下游 *
response = realChain.proceed(request, streamAllocation, null, null);
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
路由异常,请求还没发出去。
这样这个recover(),如果返回的是false,则抛出异常,不再重试
如果返回的是true,则执行下面的continue,进行下一次while循环,进行重试,重新发起网络请求。
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
请求已经发出去了,但是和服务器连接失败了。
这个recover()返回值的处理逻辑和上面异常一样。
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
}
} finally {//finally是必定会执行到的,不管上面的catch中执行的是continue还是thow
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
在这个重试 * 中,okhttp的做法很巧妙。先是在外面有一个while循环,如果发生异常,
会在recover方法中对异常类型进行判断,如果不符合属于重试,则返回false,并thow e,结束while循环。
如果符合重试的条件,则返回true,在上面的catch代码块中执行continue方法,进入下一个while循环。
//如果请求正常,并且返回了response,则会进行重定向的逻辑判断
followUpRequest在这个方法中会根据ResponseCode,状态码进行重定向的判断,
Request followUp;
try {
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
如果flolowUp 为null,则不需要重定向,直接返回response
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
如果flolowUp 不为null,则进行重定向了请求
如果重定向次数超过MAX_FOLLOW_UPS=20次,则抛出异常,结束while循环
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
//从重定向请求中拿到url,封装一个新的streamAllocation对象,
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//将重定向请求赋值给request 进入下一个重定向的请求的while循环,继续走上面的while循环代码
request = followUp;
priorResponse = response;
}
}
//只有这个方法返回值为false都不进行重试。
private boolean recover(IOException e, StreamAllocation streamAllocation,
boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// The application layer has forbidden retries.
应用层禁止重试。可以通过OkHttpClient进行配置(默认是允许的)
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again.
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// This exception is fatal. 致命的异常
判断是否属于重试的异常
if (!isRecoverable(e, requestSendStarted)) return false;
// No more routes to attempt.
没有更多可以连接的路由线路
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
只有这个方法返回false,都不进行重试。
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// If there was a protocol problem, don't recover.
出现了协议异常,不再重试
if (e instanceof ProtocolException) {
return false;
}
// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
requestSendStarted为false时,并且异常类型为Scoket超时异常,将会进行下一次重试
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && !requestSendStarted;
}
// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
如果是一个握手异常,并且证书出现问题,则不能重试
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false;
}
}
2)BridgeInterceptor 桥 * :连接服务器的桥梁,主要是在请求头中设置一些参数配置
如:请求内容长度,编码,gzip压缩等。
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
..................
}
在请求头中添加gizp,是否压缩
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//cookies
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
调用责任链中下一个 * 的方法,网络请求得到的数据封装到networkResponse中
Response networkResponse = chain.proceed(requestBuilder.build());
对cookie进行处理
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
如果设置了gzip,则会对networkResponse进行解压缩。
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
3)CacheInterceptor缓存 *
public Response intercept(Chain chain){
// this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize);
这个缓存在底层使用的是DiskLruCache
//以request为key从缓存中拿到response。
Response cacheCandidate = cache != null
? cache.get(chain.request()): null;
long now = System.currentTimeMillis();
//缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
// If we're forbidden from using the network and the cache is insufficient, fail.
//如果请求和响应都为null,直接返回504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
//如果请求为null,缓存不为null,则直接使用缓存。
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//调用责任链下一个 *
networkResponse = chain.proceed(networkRequest);
} finally {
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
// Offer this request to the cache.
//将响应存入缓存。
CacheRequest cacheRequest = cache.put(response);
}
4)ConnectInterceptor 连接 * 。当一个请求发出,需要建立连接,然后再通过流进行读写。
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//在重定向 * 中创建,
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//从连接池中,找到一个可以复用的连接,
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// RealConnection 中封装了一个Socket和一个Socket连接池
RealConnection connection = streamAllocation.connection();
//调用下一个 *
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
//遍历连接池
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
从连接池中找到一个连接参数一致且并未占用的连接
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
5)CallServerInterceptor 请求服务器 *
/** This is the last interceptor in the chain. It makes a network call to the server. */
这是责任链中最后一个 * ,这个会去请求服务器。
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
//将请求头写入缓存
httpCodec.writeRequestHeaders(request);
return response;
来源:https://blog.csdn.net/niuyongzhi/article/details/126464240


猜你喜欢
- C#过滤DataTable中的空数据和重复数据string sql = "select name,age from user&qu
- Spring-Context的作用spring-context提供应用程序上下文,这是Spring的依赖注入容器,它可能总是在以某种方式使用
- 前言在移动互联网浪潮中,联网APP已经把单机拍死在沙滩上,很多公司都希望自家应用能够有一套帐号系统,可是许多用户却并不一定买账:
- java数据类型与二进制详细介绍在java中Int 类型的变量占 4个字节Long 类型的变量占8个字节一个程序就是一个世界,变量是这个程序
- 本文实例讲述了C#(asp.net)多线程用法。分享给大家供大家参考,具体如下:using System;using System.Coll
- 1.返回ModelAndView对象(.jsp)controller代码:package controller;import java.ut
- 抽象类往往用来表征对问题领域进行分析、设计中得出的抽象概念,是对一系列看上去不同,但是本质上相同的具体概念的抽象。下面我们以水果为例,首先定
- 一、雪花算法datacenterId重复问题华为云的服务器的/etc/hosts中都会生成一条 127.0.1.1 hostname的记录
- 本文实例讲述了C#将HashTable中键列表或值列表复制到一维数组的方法。分享给大家供大家参考。具体如下:下面的示例说明如何将 Hasht
- 本文实例讲述了Java Scanner类用法及nextLine()产生的换行符问题。分享给大家供大家参考,具体如下:分析理解:Scanner
- 有httponly的cookie,在httpwebreqeust请求时,会获取不到,可以采用直接获取head中的set-cookie,再转换
- Eclipse查看开发包jar里源代码的方法前言:最近我打算学习一下谷歌的类库Guava,下载了Guava-r09.jar包及其源码,为了可
- 一. CodeCache简介从字面意思理解就是代码缓存区,它缓存的是JIT(Just in Time)编译器编译的代码,简言之codeCac
- 需求winForm 程序输出类型为 windows 程序(不是命令行程序)在运行时想输入一些信息编译开发调试,如何实现这一功能解答:Allo
- Java在控制台打印本月日历在学习《Java核心技术卷I·基础知识》第10版 的时候里面有一个小例子,就是在控制台上打印日历的一个例子,就想
- 问题描述springcloud项目部署或调试时,占用的内存特别多。当部署到服务器上去后,有可能导致服务器内存占用过多而崩溃。解决方案&nbs
- 持久性底部面板可以用于补充应用主要内容的信息,即使用户与应用程序的其他控件进行互动,也仍然可以看到持久的底部面板。可以使用Scaffold.
- 1.准备工作1、新建一个SpringBoot项目加上web依赖, 所有依赖<dependency><groupId>
- 装饰器模式概述装饰器模式(Decorator Pattern)也称为包装模式(Wrapper Pattern),属于结构型模式。它是指在不改
- 在 Windows 有一些字符是不能作为文件名,尝试重命名一个文件,输入/ 就可以看到windows 提示的不能作为文件名的字符那么具体是包