亲宝软件园·资讯

展开

Java OkHttp框架

niuyongzhi 人气:0

1.OkHttp发起网络请求

可以通过OkHttpClient发起一个网络请求

//创建一个Client,相当于打开一个浏览器
 OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
 //创建一个请求。
        Request request = new Request.Builder()
                .url("http://www.baidu.com")
                .method("GET",null)
                .build();
    //调用Client 创建一个Call。
        Call call = okHttpClient.newCall(request);
        //Call传入一个回调函数,并加入到请求队列。
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
}

通过Retrofit发起一个OkHttp请求

 Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://www.baidu.com/")
                .build();
        NetInterface netInterface = retrofit.create(NetInterface.class);
        Call<Person> call = netInterface.getPerson();
        call.enqueue(new Callback<Person>() {
            @Override
            public void onResponse(Call<Person> call, Response<Person> response) {
            }
            @Override
            public void onFailure(Call<Person> call, Throwable t) {
            }
 });

以上两种方式都是通过call.enqueue() 把网络请求加入到请求队列的。

这个call是RealCall的一个对象。

 public void enqueue(Callback responseCallback) {
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

这里有两个判断条件

runningAsyncCalls.size() < maxRequests如果运行队列数量大于最大数量,

runningCallsForHost(call) < maxRequestsPerHost并且访问同一台服务器的请求数量大于最大数量,请求会放入等待队列,否则加入运行队列,直接执行。

//等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//运行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//运行队列数量最大值
private int maxRequests = 64;
//访问不同主机的最大数量
private int maxRequestsPerHost = 5;
dispatcher.java
 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

接下来看这行代码executorService().execute(call);

executorService()拿到一个线程池实例,

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;

execute(call)执行任务,发起网络请求。

 //AsyncCall.java
 @Override protected void execute() {
       try {
       //这个方法去请求网络,会返回Respose
        Response response = getResponseWithInterceptorChain();
        //请求成功,回调接口
         responseCallback.onResponse(RealCall.this, response);
       }catch(Exceptrion e){
            //失败回调
         responseCallback.onFailure(RealCall.this, e);
       }finally {
          //从当前运行队列中删除这个请求
          client.dispatcher().finished(this);
      }
 }

getResponseWithInterceptorChain()

这行代码,使用了设计模式中的责任链模式。

 //这个方法命名:通过拦截器链,获取Response
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
     // 这个我们自己定义的拦截器。
    interceptors.addAll(client.interceptors());
    //重试和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    //请求头拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //访问拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //拦截器责任链
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    //执行拦截器集合中的拦截器
    return chain.proceed(originalRequest);
  }

责任链模式中,链条的上游持有下游对象的引用。这样能够保证在链条上的每一个对象,都能对其符合条件的任务进行处理。

但是在上面的拦截器构成责任链中,是把拦截器,放在了一个集合中。

第一个参数interceptors 是一个拦截器的集合。

第五个参数0是集合的index,RealInterceptorChain就是根据这个索引值+1,

对chain.proceed方法循环调用,进行集合遍历,并执行拦截器中定义的方法的。

这个责任链模式,并没有明确的指定下游对象是什么,而是通过集合index值的变化,动态的指定的。

 Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......)
   chain.proceed(originalRequest);
   public Response proceed(Request request,...){
    //构建一个index+1的拦截器链
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
                connection, index + 1,....);
        //拿到当前的拦截器
        Interceptor interceptor = interceptors.get(index);
        //调用拦截器intercept(next)方法,
        //在这个方法中继续调用realChain.proceed(),从而进行循环调用,index索引值再加1.
        Response response = interceptor.intercept(next);
 }

2.OkHttp的连接器

1)RetryAndFollowUpInterceptor:重试和重定向拦截器

public Response intercept(Chain chain){
      while (true) {
        Response response;
          try {
          //创建StreamAllocation对象,这个对象会在连接拦截器中用到
            StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                  createAddress(request.url()), call, eventListener, callStackTrace);
              this.streamAllocation = streamAllocation;
             调用责任链下游拦截器
             response = realChain.proceed(request, streamAllocation, null, null);
            } catch (RouteException e) {
                 // The attempt to connect via a route failed. The request will not have been sent.
                 路由异常,请求还没发出去。
                 这样这个recover(),如果返回的是false,则抛出异常,不再重试
                 如果返回的是true,则执行下面的continue,进行下一次while循环,进行重试,重新发起网络请求。
                 if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
                   throw e.getFirstConnectException();
                 }
                 releaseConnection = false;
                continue;
             } catch (IOException e) {
                 // An attempt to communicate with a server failed. The request may have been sent.
                 请求已经发出去了,但是和服务器连接失败了。
                 这个recover()返回值的处理逻辑和上面异常一样。
                 boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
                 if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
                 releaseConnection = false;
                 continue;
               }
             } finally {//finally是必定会执行到的,不管上面的catch中执行的是continue还是thow
                // We're throwing an unchecked exception. Release any resources.
                if (releaseConnection) {
                  streamAllocation.streamFailed(null);
                  streamAllocation.release();
                }
              }
             在这个重试拦截器中,okhttp的做法很巧妙。先是在外面有一个while循环,如果发生异常,
             会在recover方法中对异常类型进行判断,如果不符合属于重试,则返回false,并thow e,结束while循环。
             如果符合重试的条件,则返回true,在上面的catch代码块中执行continue方法,进入下一个while循环。
            //如果请求正常,并且返回了response,则会进行重定向的逻辑判断
            followUpRequest在这个方法中会根据ResponseCode,状态码进行重定向的判断,
            Request followUp;
                 try {
                   followUp = followUpRequest(response, streamAllocation.route());
                 } catch (IOException e) {
                   streamAllocation.release();
                   throw e;
                 }
                 如果flolowUp 为null,则不需要重定向,直接返回response
                 if (followUp == null) {
                   if (!forWebSocket) {
                     streamAllocation.release();
                   }
                   return response;
                 }
                  如果flolowUp 不为null,则进行重定向了请求
               如果重定向次数超过MAX_FOLLOW_UPS=20次,则抛出异常,结束while循环
              if (++followUpCount > MAX_FOLLOW_UPS) {
                     streamAllocation.release();
                     throw new ProtocolException("Too many follow-up requests: " + followUpCount);
                   }
                   if (followUp.body() instanceof UnrepeatableRequestBody) {
                     streamAllocation.release();
                     throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
                   }
                   if (!sameConnection(response, followUp.url())) {
                     streamAllocation.release();
                     //从重定向请求中拿到url,封装一个新的streamAllocation对象,
                     streamAllocation = new StreamAllocation(client.connectionPool(),
                         createAddress(followUp.url()), call, eventListener, callStackTrace);
                     this.streamAllocation = streamAllocation;
                   } else if (streamAllocation.codec() != null) {
                     throw new IllegalStateException("Closing the body of " + response
                         + " didn't close its backing stream. Bad interceptor?");
                   }
                   //将重定向请求赋值给request 进入下一个重定向的请求的while循环,继续走上面的while循环代码
                   request = followUp;
                   priorResponse = response;
                 }
 }
   //只有这个方法返回值为false都不进行重试。
   private boolean recover(IOException e, StreamAllocation streamAllocation,
       boolean requestSendStarted, Request userRequest) {
     streamAllocation.streamFailed(e);
     // The application layer has forbidden retries.
     应用层禁止重试。可以通过OkHttpClient进行配置(默认是允许的)
     if (!client.retryOnConnectionFailure()) return false;
     // We can't send the request body again.
     if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
     // This exception is fatal. 致命的异常
     判断是否属于重试的异常
     if (!isRecoverable(e, requestSendStarted)) return false;
     // No more routes to attempt.
     没有更多可以连接的路由线路
     if (!streamAllocation.hasMoreRoutes()) return false;
     // For failure recovery, use the same route selector with a new connection.
     return true;
   }
  只有这个方法返回false,都不进行重试。
 private boolean isRecoverable(IOException e, boolean requestSendStarted) {
   // If there was a protocol problem, don't recover.
   出现了协议异常,不再重试
   if (e instanceof ProtocolException) {
     return false;
   }
   // If there was an interruption don't recover, but if there was a timeout connecting to a route
   // we should try the next route (if there is one).
   requestSendStarted为false时,并且异常类型为Scoket超时异常,将会进行下一次重试
   if (e instanceof InterruptedIOException) {
     return e instanceof SocketTimeoutException && !requestSendStarted;
   }
   // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
   // again with a different route.
   如果是一个握手异常,并且证书出现问题,则不能重试
   if (e instanceof SSLHandshakeException) {
     // If the problem was a CertificateException from the X509TrustManager,
     // do not retry.
     if (e.getCause() instanceof CertificateException) {
       return false;
     }
   }

2)BridgeInterceptor 桥拦截器:连接服务器的桥梁,主要是在请求头中设置一些参数配置

如:请求内容长度,编码,gzip压缩等。

public Response intercept(Chain chain) throws IOException {
     Request userRequest = chain.request();
     Request.Builder requestBuilder = userRequest.newBuilder();
     RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }
      ..................
    }
    在请求头中添加gizp,是否压缩
  boolean transparentGzip = false;
     if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
       transparentGzip = true;
       requestBuilder.header("Accept-Encoding", "gzip");
     }
    //cookies
     List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
     if (!cookies.isEmpty()) {
       requestBuilder.header("Cookie", cookieHeader(cookies));
     }
     调用责任链中下一个拦截器的方法,网络请求得到的数据封装到networkResponse中
     Response networkResponse = chain.proceed(requestBuilder.build());
    对cookie进行处理
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    如果设置了gzip,则会对networkResponse进行解压缩。
     if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    return responseBuilder.build();
}

3)CacheInterceptor缓存拦截器

public Response intercept(Chain chain){
   //  this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize);
    这个缓存在底层使用的是DiskLruCache
    //以request为key从缓存中拿到response。
     Response cacheCandidate = cache != null
            ? cache.get(chain.request()): null;
     long now = System.currentTimeMillis();
     //缓存策略
     CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
     Request networkRequest = strategy.networkRequest;
     Response cacheResponse = strategy.cacheResponse;
   // If we're forbidden from using the network and the cache is insufficient, fail.
   //如果请求和响应都为null,直接返回504
   if (networkRequest == null && cacheResponse == null) {
     return new Response.Builder()
         .request(chain.request())
         .protocol(Protocol.HTTP_1_1)
         .code(504)
         .message("Unsatisfiable Request (only-if-cached)")
         .body(Util.EMPTY_RESPONSE)
         .sentRequestAtMillis(-1L)
         .receivedResponseAtMillis(System.currentTimeMillis())
         .build();
   }
   // If we don't need the network, we're done.
   //如果请求为null,缓存不为null,则直接使用缓存。
       if (networkRequest == null) {
         return cacheResponse.newBuilder()
             .cacheResponse(stripBody(cacheResponse))
             .build();
       }
     Response networkResponse = null;
        try {
          //调用责任链下一个拦截器
          networkResponse = chain.proceed(networkRequest);
        } finally {
        }
      Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
     // Offer this request to the cache.
     //将响应存入缓存。
      CacheRequest cacheRequest = cache.put(response);
}

4)ConnectInterceptor 连接拦截器。当一个请求发出,需要建立连接,然后再通过流进行读写。

public Response intercept(Chain chain) throws IOException {
     RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    //在重定向拦截器中创建,
    StreamAllocation streamAllocation = realChain.streamAllocation();
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //从连接池中,找到一个可以复用的连接,
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
   // RealConnection 中封装了一个Socket和一个Socket连接池
    RealConnection connection = streamAllocation.connection();
    //调用下一个拦截器
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
//遍历连接池
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }
  public boolean isEligible(Address address, @Nullable Route route) {
    // If this connection is not accepting new streams, we're done.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;
    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    // If the host exactly matches, we're done: this connection can carry the address.
    从连接池中找到一个连接参数一致且并未占用的连接
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
  }

5)CallServerInterceptor 请求服务器拦截器

/** This is the last interceptor in the chain. It makes a network call to the server. */
这是责任链中最后一个拦截器,这个会去请求服务器。
 public Response intercept(Chain chain) throws IOException {
      RealInterceptorChain realChain = (RealInterceptorChain) chain;
      HttpCodec httpCodec = realChain.httpStream();
      StreamAllocation streamAllocation = realChain.streamAllocation();
      RealConnection connection = (RealConnection) realChain.connection();
      Request request = realChain.request();
      //将请求头写入缓存
      httpCodec.writeRequestHeaders(request);
      return response;

加载全部内容

相关教程
猜你喜欢
用户评论