qiuyadong's Homepage

分布式服务治理Dubbo之服务发布与消费源码分析


Dubbo使用api与sping两种方式发布服务,以及服务消费。

DUBBo使用Sping发布服务

继承Sping的方式:

1 写一个xml约束文件xsd文件:dubbo使用了dubbo.xsd文件

2 在resources/META-INF/文件夹下写sping.schemas文件

  http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

3 在resources/META-INF/文件夹下写sping.handlers文件

  http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

4 当sping加载xml时就会按照定义的DubboNamespaceHandler加载文件


  ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans        
       http://www.springframework.org/schema/beans/spring-beans.xsd        
       http://code.alibabatech.com/schema/dubbo        
       http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

</beans>

将每个标签解析成相应的类:


  public class DubboNamespaceHandler extends NamespaceHandlerSupport {

	static {
		Version.checkDuplicate(DubboNamespaceHandler.class);
	}

	public void init() {
	    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));//这里实例化了服务发布者对象
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));//这里实例化消费者发布对象
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

注意:dubbo是基于sping配置来实现发布服务的,那么一定是基于spring的扩展来写了一套自己的标签,为了在spring启动的时候,也相应的启动provider发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean,ServiceBean、ReferenceBean。分别继承了ServiceConfig和ReferenceConfig

同时还分别实现了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware

InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。

DisposableBean bean被销毁的时候,spring容器会自动执行destory方法,比如释放资源

pplicationContextAware 实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来

ApplicationListener ApplicationEvent事件监听,spring容器启动后会发一个事件通知

BeanNameAware 获得自身初始化时,本身的bean的id属性

设置了延迟暴露,dubbo在Spring实例化bean(initializeBean)的时候会对实现了InitializingBean的类进行回调,回调方法是afterPropertySet(),如果设置了延迟暴露,dubbo在这个方法中进行服务的发布。

没有设置延迟或者延迟为-1,dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的类进行回调onApplicationEvent,dubbo会在这个方法中发布服务。

5 加载完xml文件后,猜想ServiceBean实现了ApplicationListener所以使用onApplicationEvent发布服务

public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}

6 接下来就着重分析ServiceBean的onApplicationEvent方法:


    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName()) && this.isDelay() && !this.isExported() && !this.isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + this.getInterface());
            }

            this.export();
        }

    }

    //如果有休眠则休眠,继续调用doexport

    if (this.export == null || this.export) {
                if (this.delay != null && this.delay > 0) {
                    delayExportExecutor.schedule(new Runnable() {
                        public void run() {
                            ServiceConfig.this.doExport();
                        }
                    }, (long)this.delay, TimeUnit.MILLISECONDS);
                } else {
                    this.doExport();
                }

            }

7 ServiceConfig是ServiceBean的父类,调用父类的synchronized void export() 由于是synchronized则它是原子的发布:

初始化流程的性能调优优先级应该放的比较低,但是安全的优先级应该放的比较高!

经过重重检查之后调用发布逻辑

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

      if (!"none".toString().equalsIgnoreCase(scope)) {
          if (!"remote".toString().equalsIgnoreCase(scope)) {
              this.exportLocal(url);
          }

          if (!"local".toString().equalsIgnoreCase(scope)) {
              if (logger.isInfoEnabled()) {
                  logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url);
              }

              if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
                  Iterator i$ = registryURLs.iterator();

                  while(i$.hasNext()) {
                      URL registryURL = (URL)i$.next();
                      url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                      URL monitorUrl = this.loadMonitor(registryURL);
                      if (monitorUrl != null) {
                          url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
                      }

                      if (logger.isInfoEnabled()) {
                          logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                      }

                      Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
                      Exporter<?> exporter = protocol.export(invoker);
                      this.exporters.add(exporter);
                  }
              } else {
                  Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);////通过proxyFactory来获取Invoker对象
                  Exporter<?> exporter = protocol.export(invoker);////注册服务
                  this.exporters.add(exporter);
              }
          }
      }

      this.urls.add(url);
  }


这个地方可以做一个小结

1 Invoker - 执行具体的远程调用(这块后续单独讲)

2 Protocol – 服务地址的发布和订阅

3 Exporter – 暴露服务或取消暴露

接下来分析proxyFactory.getInvoker(this.ref, this.interfaceClass, url)获取Invoker对象:


// private static final ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

//由于是ProxyFactory的适配器扩展,可以从方法上加载了  @Adaptive({"proxy"}),得知返回了一个适配器类 Protocol$Adaptive
//根据传入的extName得到据地的代理工厂,猜想使用了javassist的ProxyFactory,返回invoker

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({"proxy"})
    <T> T getProxy(Invoker<T> var1) throws RpcException;

    @Adaptive({"proxy"})
    <T> Invoker<T> getInvoker(T var1, Class<T> var2, URL var3) throws RpcException;
}

猜想应该是subwraper:


public class StubProxyFactoryWrapper implements ProxyFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(StubProxyFactoryWrapper.class);
    private final ProxyFactory proxyFactory;
    private Protocol protocol;
//由于含有ProxyFactory的构造器,所以肯定了走StubProxyFactoryWrapper
    public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }
//这里使用了注入ioc
    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }
    //由于还是调用了this.proxyFactory,因此还是调用javassist的ProxyFactory,其实没有包装
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return this.proxyFactory.getInvoker(proxy, type, url);
    }
}

由于这里调用了getInvoker


public class JavassistProxyFactory extends AbstractProxyFactory {
    public JavassistProxyFactory() {
    }

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}


Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type)看看这句话的逻辑:


public static Wrapper getWrapper(Class<?> c) {
  while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
   c = c.getSuperclass();
//Object类型的
if( c == Object.class )
   return OBJECT_WRAPPER;
//先去Wrapper缓存中查找
Wrapper ret = WRAPPER_MAP.get(c);
if( ret == null ) {
   //缓存中不存在,生成Wrapper类,放到缓存
   ret = makeWrapper(c);
   WRAPPER_MAP.put(c,ret);
}
return ret;
    }


makeWrapper方法代码不在列出,太长了。就是生成一个继承自Wrapper的类,最后的结果大概是:


public class Wrapper1 extends Wrapper {
    public static String[] pns;
    public static Map pts;
    public static String[] mns; // all method name array.
    public static String[] dmns;
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String n) {
        return pts.containsKey($1);
    }

    public Class getPropertyType(String n) {
        return (Class) pts.get($1);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object o, String n, Object v) {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }

    public Object getPropertyValue(Object o, String n) {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("sayHello".equals($2) && $3.length == 0) {
                w.sayHello();
                return null;
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }
}

生成完Wrapper以后,返回一个AbstractProxyInvoker实例。至此生成Invoker的步骤就完成了。可以看到Invoker执行方法的时候,会调用Wrapper的invokeMethod,这个方法中会有真实的实现类调用真实方法的代码。


public class JavassistProxyFactory extends AbstractProxyFactory {
    public JavassistProxyFactory() {
    }

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    //返回一个AbstractProxyInvoker,再该invoker调用doInvoke的时候就开始调用:

    //具体的包装wrapper中的invokeMethod方法
    //  if ("sayHello".equals($2) && $3.length == 0) {
    //      w.sayHello();
    //      return null;
    //  }
}

继续


private void exportLocal(URL url) {
        if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString()).setProtocol("injvm").setHost("127.0.0.1").setPort(0);
            //以上知道proxyFactory.getInvoker(this.ref, this.interfaceClass, local)返回的是AbstractProxyInvoker
            Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(this.ref, this.interfaceClass, local));

            this.exporters.add(exporter);
            logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry");
        }

    }

看看protocol.export(bstractProxyInvoker)的执行过程:


//private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

//推测生产出适配类:Protocol$Adaptive,如果有wrapper,则走wrapper,最后会调用dubboProtocol的export
@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> var1) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;

    void destroy();
}

那就是走:


ublic class ProtocolListenerWrapper implements Protocol {
    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        } else {
            this.protocol = protocol;
        }
    }

    public int getDefaultPort() {
        return this.protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return (Exporter)("registry".equals(invoker.getUrl().getProtocol()) ? this.protocol.export(invoker) : new ListenerExporterWrapper(this.protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), "exporter.listener"))));
    }
  }

  //再走

  public class ProtocolListenerWrapper implements Protocol {
    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        } else {
            this.protocol = protocol;
        }
    }

    public int getDefaultPort() {
        return this.protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return (Exporter)("registry".equals(invoker.getUrl().getProtocol()) ? this.protocol.export(invoker) : new ListenerExporterWrapper(this.protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), "exporter.listener"))));
    }
  }

最后调用DubboProtocol的export


public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
     URL url = invoker.getUrl();
     String key = serviceKey(url);
     DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);//初始化了DubboExporter
     this.exporterMap.put(key, exporter);
     Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
     Boolean isCallbackservice = url.getParameter("is_callback_service", false);
     if (isStubSupportEvent && !isCallbackservice) {
         String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
         if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
             this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
         } else if (this.logger.isWarnEnabled()) {
             this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
         }
     }

     this.openServer(url);
     return exporter;
 }


查找并初始化话server


private void openServer(URL url) {
        String key = url.getAddress();
        boolean isServer = url.getParameter("isserver", true);
        if (isServer) {
            ExchangeServer server = (ExchangeServer)this.serverMap.get(key);//查找serverMap本地缓存有没有ip:20880的server的服务器
            if (server == null) {
                this.serverMap.put(key, this.createServer(url));
            } else {
                server.reset(url);
            }
        }

    }


    private ExchangeServer createServer(URL url) {
        url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
        url = url.addParameterIfAbsent("heartbeat", String.valueOf(60000));
        String str = url.getParameter("server", "netty");
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        } else {
            url = url.addParameter("codec", Version.isCompatibleVersion() ? "dubbo1compatible" : "dubbo");

            ExchangeServer server;
            try {
                server = Exchangers.bind(url, this.requestHandler);
            } catch (RemotingException var5) {
                throw new RpcException("Fail to start server(url: " + url + ") " + var5.getMessage(), var5);
            }

            str = url.getParameter("client");
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }

            return server;
        }
    }



Comments