从上篇文章我们可以看到 OkHttp 的同步和异步都使用了 Dispatcher ,它的主要作用就是一个任务队列。
我们都听过 OkHttp 的一个高效之处在于在内部维护了一个线程池,方便高效地执行异步请求,这个线程池就在 Dispatcher 类里面。
Dispatcher 类去掉注解只有一百多行,建议自己看下并不是很难,我们来分析:
1、Dispatcher 的成员变量
public final class Dispatcher {
/** 最大并发请求数为64 */
private int maxRequests = 64;
/** 每个主机最大请求数为5 */
private int maxRequestsPerHost = 5;
/** 线程池 */
private ExecutorService executorService;
/** 准备执行的异步请求 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在执行的异步请求,包含已经取消但未执行完的请求 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在执行的同步请求,包含已经取消单未执行完的请求 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
······
}
问:为什么要使用2个异步请求队列呢?
一个是正准备执行的:readyAsyncCalls ,用来做一个缓冲使用;另外一个是正在执行的:runningAsyncCalls 。这里其实是一个生产者-消费者模型,如下图所示:
生产者-消费者模型.png- Dispatcher: 生产者(默认在主线程)
- AsyncCall: 队列中需要处理的Runnable(包装了异步回调接口)
- ExecutorService:消费者池(也就是线程池)
- Deque<readyAsyncCalls>:缓存(用数组实现,可自动扩容,无大小限制)
- Deque<runningAsyncCalls>:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
2、Dispatcher 的线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
根据上面的源码,OkHttp 使用的是单例的线程池,有些朋友对线程池不太了解,解释下几个参数的意思:
- 0(corePoolSize):核心线程池的数量为 0,空闲一段时间后所有线程将全部被销毁。
- Integer.MAX_VALUE(maximumPoolSize): 最大线程数,当任务进来时可以扩充的线程最大值,相当于无限大。
- 60(keepAliveTime): 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间。
- TimeUnit.SECONDS:存活时间的单位是秒。
- new SynchronousQueue<Runnable>():工作队列,先进先出。
- Util.threadFactory("OkHttp Dispatcher", false):单个线程的工厂 。
也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。
3、同步调用
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
dispatcher().executed(this)
方法的源码很简单:
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
这里主要有4点:
- 检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone 方法进行克隆。
- 利用 client.dispatcher().executed(this) 来进行实际执行,将请求的 call 插入到同步队列中。
- 调用 getResponseWithInterceptorChain() 获取 HTTP网络请求返回结果,抛出给最上层的。从方法名可以看出,这一步还会进行一系列“拦截”操作,这个方法很重要待会细说。
- 当任务执行完成后,无论成功与否都会调用 dispatcher.finished 方法,通知分发器相关任务已结束,finished 方法的源码就不去看了。
4、异步调用
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
dispatcher.enqueue 方法的源码也很简单:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
从上述源码分析,如果当前满足
(runningRequests<64 && runningRequestsPerHost<5)
则把异步请求加入 runningAsyncCalls ,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。否则加入 readyAsyncCalls 缓冲排队。
问:异步调用为什么返回 void,那我们请求网络的数据在哪?
我们在异步调用时使用的是接口回调的方式:
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) {
}
});
这就涉及到一个新的类: AsyncCalll(它实现了Runnable接口),AsyncCall的excute方法最终将会被执行,它是 RealCall 的内部类:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
······
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
在它的 execute() 方法中,主要是2点:
- 我们又看到了同步调用中熟悉的:
Response response = getResponseWithInterceptorChain();
所以不管是同步还是异步,都会使用 getResponseWithInterceptorChain() 获取网络请求的返回值。
- 不管请求成功还是失败,通知任务分发器 (client.dispatcher().finished(this)) 该任务已结束,将其销毁。
5、getResponseWithInterceptorChain() 分析
这个方法实在太复杂了,在新的博文中来分析。
6、Dispatcher线程池总结
1)调度线程池Disptcher实现了高并发,低阻塞的实现
2)采用Deque作为缓存,先进先出的顺序执行
3)任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能。
7、Dispatcher 全部源码
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private Runnable idleCallback;
private ExecutorService executorService;
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
public synchronized void setIdleCallback(Runnable idleCallback) {
this.idleCallback = idleCallback;
}
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
public synchronized void cancelAll() {
for (AsyncCall call : readyAsyncCalls) {
call.get().cancel();
}
for (AsyncCall call : runningAsyncCalls) {
call.get().cancel();
}
for (RealCall call : runningSyncCalls) {
call.cancel();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
public synchronized List<Call> queuedCalls() {
List<Call> result = new ArrayList<>();
for (AsyncCall asyncCall : readyAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized List<Call> runningCalls() {
List<Call> result = new ArrayList<>();
result.addAll(runningSyncCalls);
for (AsyncCall asyncCall : runningAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized int queuedCallsCount() {
return readyAsyncCalls.size();
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
}