亲宝软件园·资讯

展开

【一起学源码-微服务】Feign 源码三:Feign结合Ribbon实现负载均衡的原理分析

一枝花算不算浪漫 人气:1

前言

前情回顾

上一讲我们已经知道了Feign的工作原理其实是在项目启动的时候,通过JDK动态代理为每个FeignClinent生成一个动态代理。

动态代理的数据结构是:ReflectiveFeign.FeignInvocationHandler。其中包含target(里面是serviceName等信息)和dispatcher(map数据结构,key是请求的方法名,方法参数等,value是SynchronousMethodHandler)。

如下图所示:

本讲目录

这一讲主要是Feign与Ribbon结合实现负载均衡的原理分析。

说明

原创不易,如若转载 请标明来源!

博客地址:一枝花算不算浪漫
微信公众号:壹枝花算不算浪漫

源码分析

Feign结合Ribbon实现负载均衡原理

通过前面的分析,我们可以直接来看下SynchronousMethodHandler中的代码:

final class SynchronousMethodHandler implements MethodHandler {

    @Override
    public Object invoke(Object[] argv) throws Throwable {
        // 生成请求类似于:GET /sayHello/wangmeng HTTP/1.1
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
            try {
                return executeAndDecode(template);
            } catch (RetryableException e) {
                retryer.continueOrPropagate(e);
                if (logLevel != Logger.Level.NONE) {
                    logger.logRetry(metadata.configKey(), logLevel);
                }
                continue;
            }
        }
    }

    Object executeAndDecode(RequestTemplate template) throws Throwable {
        // 构建request对象:GET http://serviceA/sayHello/wangmeng HTTP/1.1
        Request request = targetRequest(template);

        if (logLevel != Logger.Level.NONE) {
            logger.logRequest(metadata.configKey(), logLevel, request);
        }

        Response response;
        long start = System.nanoTime();
        try {
            // 这个client就是之前构建的LoadBalancerFeignClient,options是超时时间
            response = client.execute(request, options);
            // ensure the request is set. TODO: remove in Feign 10
            response.toBuilder().request(request).build();
        } catch (IOException e) {
            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
            }
            throw errorExecuting(request, e);
        }
        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

        // 下面逻辑都是构建返回值response
        boolean shouldClose = true;
        try {
            if (logLevel != Logger.Level.NONE) {
                response =
                        logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
                // ensure the request is set. TODO: remove in Feign 10
                response.toBuilder().request(request).build();
            }
            if (Response.class == metadata.returnType()) {
                if (response.body() == null) {
                    return response;
                }
                if (response.body().length() == null ||
                        response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
                    shouldClose = false;
                    return response;
                }
                // Ensure the response body is disconnected
                byte[] bodyData = Util.toByteArray(response.body().asInputStream());
                return response.toBuilder().body(bodyData).build();
            }
            if (response.status() >= 200 && response.status() < 300) {
                if (void.class == metadata.returnType()) {
                    return null;
                } else {
                    return decode(response);
                }
            } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
                return decode(response);
            } else {
                throw errorDecoder.decode(metadata.configKey(), response);
            }
        } catch (IOException e) {
            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
            }
            throw errorReading(request, response, e);
        } finally {
            if (shouldClose) {
                ensureClosed(response.body());
            }
        }
    }
}

这里主要是构建request数据,然后通过request和options去通过LoadBalancerFeignClient.execute()方法去获得返回值。我们可以接着看client端的调用:

public class LoadBalancerFeignClient implements Client {

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            // asUri: http://serviceA/sayHello/wangmeng
            URI asUri = URI.create(request.url());

            // clientName:serviceA
            String clientName = asUri.getHost();

            // uriWithoutHost: http://sayHello/wangmeng
            URI uriWithoutHost = cleanUrl(request.url(), clientName);

            // 这里ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1  
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);

            // 这里面config只有两个超时时间,一个是connectTimeout:5000,一个是readTimeout:5000
            IClientConfig requestConfig = getClientConfig(options, clientName);

            // 真正执行负载均衡的地方
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }
}

接着我们看下lbClient()executeWithLoadBalancer()

public class LoadBalancerFeignClient implements Client {

    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }
}

public class CachingSpringLoadBalancerFactory {
    public FeignLoadBalancer create(String clientName) {
        if (this.cache.containsKey(clientName)) {
            return this.cache.get(clientName);
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        // 获取Ribbon ILoadBalancer信息
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }
}

这里是获取了ILoadBalancer数据,里面包含了Ribbon获取的serviceA所有服务节点信息。

这里已经获取到ILoadBalancer,里面包含serviceA服务器所有节点请求host信息。接下来就是从中负载均衡选择一个节点信息host出来。

public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
}

public class LoadBalancerCommand<T> {
    
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                }

        // 省略代码...

    // selectServer是真正执行负载均衡的逻辑
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    // loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey为null
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
}

public class LoadBalancerContext implements IClientConfigAware {

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // 获取到ILoadBalancer,这里面有IRule的信息及服务节点所有信息
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                // 这里就执行真正的chooseServer的逻辑了。默认的rule为ZoneAvoidanceZule
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            }

            // 省略代码
        }
    }
}

上面代码已经很清晰了,这里就是真正的通过ribbon的 rule.chooseServer()负载均衡地选择了一个服务节点调用,debug如下:

到了这里feign与ribbon的分析也就结束了,返回请求url信息,然后得到response结果:

总结

上面已经分析了Feign与Ribbon的整合,最终还是落到Ribbon中的ILoadBalancer中,使用最后使用IRule去选择对应的server数据。

下一讲 会画一个很大的图,包含Feign、Ribbon、Eureka关联的图,里面会画出每个组件的细节及依赖关系。也算是学习至今的一个总复习了。

申明

本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

加载全部内容

相关教程
猜你喜欢
用户评论