您的当前位置:首页拆轮子:OkHttp 的源码解析(三):任务分发器(Dispat

拆轮子:OkHttp 的源码解析(三):任务分发器(Dispat

2024-12-13 来源:哗拓教育

从上篇文章我们可以看到 OkHttp 的同步和异步都使用了 Dispatcher ,它的主要作用就是一个任务队列。

我们都听过 OkHttp 的一个高效之处在于在内部维护了一个线程池,方便高效地执行异步请求,这个线程池就在 Dispatcher 类里面。

Dispatcher 类去掉注解只有一百多行,建议自己看下并不是很难,我们来分析:

1、Dispatcher 的成员变量

public final class Dispatcher {
  /** 最大并发请求数为64 */
  private int maxRequests = 64;
  /** 每个主机最大请求数为5 */
  private int maxRequestsPerHost = 5;

  /** 线程池 */
  private ExecutorService executorService;

  /** 准备执行的异步请求 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** 正在执行的异步请求,包含已经取消但未执行完的请求 */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** 正在执行的同步请求,包含已经取消单未执行完的请求 */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  ······
}

问:为什么要使用2个异步请求队列呢?

一个是正准备执行的:readyAsyncCalls ,用来做一个缓冲使用;另外一个是正在执行的:runningAsyncCalls 。这里其实是一个生产者-消费者模型,如下图所示:

生产者-消费者模型.png
  • Dispatcher: 生产者(默认在主线程)
  • AsyncCall: 队列中需要处理的Runnable(包装了异步回调接口)
  • ExecutorService:消费者池(也就是线程池)
  • Deque<readyAsyncCalls>:缓存(用数组实现,可自动扩容,无大小限制)
  • Deque<runningAsyncCalls>:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存

2、Dispatcher 的线程池

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

根据上面的源码,OkHttp 使用的是单例的线程池,有些朋友对线程池不太了解,解释下几个参数的意思:

  • 0(corePoolSize):核心线程池的数量为 0,空闲一段时间后所有线程将全部被销毁。
  • Integer.MAX_VALUE(maximumPoolSize): 最大线程数,当任务进来时可以扩充的线程最大值,相当于无限大。
  • 60(keepAliveTime): 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间。
  • TimeUnit.SECONDS:存活时间的单位是秒。
  • new SynchronousQueue<Runnable>():工作队列,先进先出。
  • Util.threadFactory("OkHttp Dispatcher", false):单个线程的工厂 。

也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

3、同步调用

  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

dispatcher().executed(this) 方法的源码很简单:

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

这里主要有4点:

  • 检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone 方法进行克隆。
  • 利用 client.dispatcher().executed(this) 来进行实际执行,将请求的 call 插入到同步队列中。
  • 调用 getResponseWithInterceptorChain() 获取 HTTP网络请求返回结果,抛出给最上层的。从方法名可以看出,这一步还会进行一系列“拦截”操作,这个方法很重要待会细说。
  • 当任务执行完成后,无论成功与否都会调用 dispatcher.finished 方法,通知分发器相关任务已结束,finished 方法的源码就不去看了。

4、异步调用

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

dispatcher.enqueue 方法的源码也很简单:

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

从上述源码分析,如果当前满足

(runningRequests<64 && runningRequestsPerHost<5)

则把异步请求加入 runningAsyncCalls ,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。否则加入 readyAsyncCalls 缓冲排队。

问:异步调用为什么返回 void,那我们请求网络的数据在哪?
我们在异步调用时使用的是接口回调的方式:

call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }

    @Override
    public void onResponse(Call call, Response response) {
    }
});

这就涉及到一个新的类: AsyncCalll(它实现了Runnable接口),AsyncCall的excute方法最终将会被执行,它是 RealCall 的内部类:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    ······

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

在它的 execute() 方法中,主要是2点:

  • 我们又看到了同步调用中熟悉的:

Response response = getResponseWithInterceptorChain();

所以不管是同步还是异步,都会使用 getResponseWithInterceptorChain() 获取网络请求的返回值。

  • 不管请求成功还是失败,通知任务分发器 (client.dispatcher().finished(this)) 该任务已结束,将其销毁。

5、getResponseWithInterceptorChain() 分析

这个方法实在太复杂了,在新的博文中来分析。

6、Dispatcher线程池总结

1)调度线程池Disptcher实现了高并发,低阻塞的实现
2)采用Deque作为缓存,先进先出的顺序执行
3)任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能。

7、Dispatcher 全部源码

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private Runnable idleCallback;

  private ExecutorService executorService;

  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

  public synchronized void setMaxRequests(int maxRequests) {
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }

  public synchronized int getMaxRequests() {
    return maxRequests;
  }

  public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }

  public synchronized int getMaxRequestsPerHost() {
    return maxRequestsPerHost;
  }

  public synchronized void setIdleCallback(Runnable idleCallback) {
    this.idleCallback = idleCallback;
  }

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  public synchronized void cancelAll() {
    for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }

    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }

    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  public synchronized List<Call> queuedCalls() {
    List<Call> result = new ArrayList<>();
    for (AsyncCall asyncCall : readyAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  public synchronized List<Call> runningCalls() {
    List<Call> result = new ArrayList<>();
    result.addAll(runningSyncCalls);
    for (AsyncCall asyncCall : runningAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  public synchronized int queuedCallsCount() {
    return readyAsyncCalls.size();
  }

  public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }
}
显示全文