宁可枝头抱香死,何曾吹落北风中。
Ribbon简单使用
负载均衡,拿到目标服务器地址列表,根据规则选择一个进行调用;
api方式
@Autowired
LoadBalancerClient loadBalancerClient;
@Autowired
RestTemplate restTemplate;
@Bean
public RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder){
return restTemplateBuilder.build();
}
@GetMapping("/test/{id}")
public String test(@PathVariable("id")int id){
ServiceInstance serviceInstance=loadBalancerClient.choose("xxx-service");
String url=String.format("http://%s:%s",serviceInstance.getHost(),serviceInstance.getPort()+"/xx");
return restTemplate.getForObject(url,String.class);
}
注解方式
@Autowired
RestTemplate restTemplate;
@Bean
@LoadBalanced
public RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder){
return restTemplateBuilder.build();
}
@GetMapping("/test/{id}")
public String test(@PathVariable("id")int id){
return restTemplate.getForObject("http://xxx-service/xx",String.class);
}
调用代码分析
RestTemplate调用
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, (Object[])uriVariables);
}
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
//具体的发送请求工具
ClientHttpRequest request = this.createRequest(url, method);
response = request.execute();
this.handleResponse(url, method, response);
var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;
}
HttpAccessor生成请求客户端
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = this.getRequestFactory().createRequest(url, method);
this.initialize(request);
return request;
}
具体生成请求工厂
InterceptingHttpAccessor
public ClientHttpRequestFactory getRequestFactory() {
//获取是否restTemplate有拦截器
//具体在LoadBalancerInterceptorConfig对resteplate进行了设置
List<ClientHttpRequestInterceptor> interceptors = this.getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = (ClientHttpRequestFactory)factory;
}
return (ClientHttpRequestFactory)factory;
} else {
return super.getRequestFactory();
}
}
具体生成的请求客户端
InterceptingClientHttpRequest
//并进行具体调用
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
//调用的请求拦截器LoadBalancerInterceptor
return nextInterceptor.intercept(request, body, this);
}
}
Ribbon原理
初始化操作
LoadBalancerClient方式
//初始化之前,LoadBalancerAutoConfiguration进行自动装配
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
public class RibbonAutoConfiguration {
@Bean
@ConditionalOnMissingBean({LoadBalancerClient.class})
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(this.springClientFactory());
}
}
注解方式
获取所有的需要拦截的restTemplate
public class LoadBalancerAutoConfiguration{
//相当于@Qualifier,注入所有被@LoadBalanced标记的RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
}
对包装之后的RestTemplate进行拦截配置
static class LoadBalancerInterceptorConfig {
//定义拦截器
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//将自己的拦截器添加的拦截器列表中
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
对restTemplate进行重设
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
调用操作
拦截器拦截restTemplate
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
//具体工具类
private LoadBalancerClient loadBalancer;
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
调用RibbonLoadBalancerClient具体调用
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
//获取负载均衡器
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
//根据负载均衡器获取server
Server server = this.getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
}
}
具体的LoadBalancer:ZoneAwareLoadBalancer
//存储服务器列表的父类
public class BaseLoadBalancer{
protected volatile List<Server> allServerList;
protected volatile List<Server> upServerList;
}
//初始化服务器列表的子类DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer{
public DynamicServerListLoadBalancer(){
this.restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
//开启定时任务更新
this.enableAndInitLearnNewServersFeature();
//更新列表
this.updateListOfServers();
}
public void updateListOfServers() {
servers = this.serverListImpl.getUpdatedListOfServers();
}
}
定时更新
public void enableAndInitLearnNewServersFeature() {
this.serverListUpdater.start(this.updateAction);
}
//配置一个定时任务每30s获取一次服务列表
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get()) {
if (PollingServerListUpdater.this.scheduledFuture != null) {
PollingServerListUpdater.this.scheduledFuture.cancel(true);
}
} else {
try {
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
}
}
}
};
this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
} else {
logger.info("Already active, no-op");
}
}
本地获取
//本地获取服务列表 在配置中配置的
//spring-cloud-order-service.ribbon.listOfServers=\localhost:8080,localhost:8082
public List<Server> getUpdatedListOfServers() {
String listOfServers = (String)this.clientConfig.get(CommonClientConfigKey.ListOfServers);
return this.derive(listOfServers);
}
根据选择器选择服务区
public Server chooseServer(Object key) {
return this.rule.choose(key);
}
选择规则
RoundRobinRule
RandomRule
RetryRule
BestAvailableRule
ResponseTimeWeightedRule
...
默认的轮询算法
public abstract class AbstractServerPredicate{
private int incrementAndGetModulo(int modulo) {
int current;
int next;
do {
current = this.nextIndex.get();
next = (current + 1) % modulo;
} while(!this.nextIndex.compareAndSet(current, next) || current >= modulo)
return current;
}
}
随机算法
public class RandomRule extends AbstractLoadBalancerRule {
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
}
根据响应时间选择的算法
public class WeightedResponseTimeRule{
//收集所有服务器节点的响应时间
public Server choose(ILoadBalancer lb, Object key) {
double randomWeight = this.random.nextDouble() * maxTotalWeight;
int n = 0;
for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
Double d = (Double)var13.next();
if (d >= randomWeight) {
serverIndex = n;
break;
}
}
server = (Server)allList.get(serverIndex);
}
}
具体剔除服务节点
开启定时任务每10s,ping一次
public class BaseLoadBalancer{
void setupPingTask() {
if (!this.canSkipPing()) {
if (this.lbTimer != null) {
this.lbTimer.cancel();
}
this.lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + this.name, true);
//
this.lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0L, (long)(this.pingIntervalSeconds * 1000));
this.forceQuickPing();
}
}
}
具体的ping执行器
class PingTask extends TimerTask {
PingTask() {
}
public void run() {
try {
(BaseLoadBalancer.this.new Pinger(BaseLoadBalancer.this.pingStrategy)).runPinger();
} catch (Exception var2) {
BaseLoadBalancer.logger.error("LoadBalancer [{}]: Error pinging", BaseLoadBalancer.this.name, var2);
}
}
}
- 根据不同的策略选择不同的Pinger
- 默认情况是dumyping,就是不做ping操作
- PingUrl根据http进行心跳检查
自定义扩展
- 负载器ILoadBalancer
- 选择器IRule
- 访问心跳IPing
- 服务列表ServerList
- 服务列表过滤器ServerListFilter