1 | <dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" timeout="60000" version="10.11.11" group="guodong"/> |
入口
DubboNamespaceHandler配置了自定义dubbo标签的解析。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
DubboBeanDefinitionParser会对标签进行解析,并把相关参数设置到RootBeanDefinition,并返回它。RootBeanDefinition有个很关键的方法setBeanClass,这里设置的beanClass为ReferenceBean。
ReferenceBean实现了FactoryBean,其返回的Bean对象不是指定类的一个实例,而是该FactoryBean的getObject方法所返回的对象。
ReferenceBean的getObject方法会调用ReferenceConfig的get()方法。
1 | public class ReferenceConfig<T> extends AbstractReferenceConfig{ |
核心就是init(),它会对配置进行检测并创建代理 createProxy(map);
创建代理
1 | public class ReferenceConfig<T> extends AbstractReferenceConfig{ |
引用服务
使用zk作为注册中心1
2
3
4
5
6
7
8
9
10
11
12public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
...
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//extName=registry
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);//返回RegistryProtocol包装后的对象ProtocolListenerWrapper
return extension.refer(arg0, arg1);
}
}
先ProtocolListenerWrapper,再进入ProtocolFilterWrapper,由于使用registry协议,这2个Wrapper类不做处理。最后到达RegistryProtocol
1 | public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { |
获取注册中心
1 | public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory { |
ZookeeperRegistryFactory继承AbstractRegistryFactory,会调用AbstractRegistryFactory的getRegistry1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public Registry getRegistry(URL url) {
//zookeeper://54.249.216.148:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=16620×tamp=1527326703926
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
//zookeeper://54.249.216.148:2181/com.alibaba.dubbo.registry.RegistryService
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
//先从缓存获取
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//ZookeeperRegistryFactory的createRegistry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//加到缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
ZookeeperRegistryFactory的createRegistry方法:1
2
3
4
5
6
7
8
9
10
11
12
13public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
//ExtensionLoader获取扩展时会对set方法进行自动注入,这里会注入ZookeeperTransporter$Adaptive
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
ExtensionLoader自动注入如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
//objectFactory=SpringExtensionFactory或者SpiExtensionFactory,objectFactory.getExtension最终调用:SpiExtensionFactory调用的是getAdaptiveExtension,SpringExtensionFactory调用getExtension(Class<T> type, String name),SPI扩展点objectFactory为SpiExtensionFactory,property在SpringExtensionFactory时才起作用
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
1 | public class ZookeeperTransporter$Adaptive implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter { |
ZookeeperRegistryFactory的new ZookeeperRegistry(url, zookeeperTransporter):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
//会调用AbstractRegistry和FailbackRegistry
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
//连接到zk
zkClient = zookeeperTransporter.connect(url);
//添加重连监听
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
zookeeperTransporter.connect(url)调用过程如下:
ZookeeperTransporter$Adaptive(自适应扩展点,代码在上面已经说明)->ZkclientZookeeperTransporter(new ZkclientZookeeperClient(url))->ZkclientZookeeperClient
1 | public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> { |
1 | public AbstractRegistry(URL url) { |
获取注册中心时订阅通知
由于还没订阅,getSubscribed() size为0 什么都不做
1 | public abstract class AbstractRegistry implements Registry { |
AbstractRegistry构造执行完成接下去执行是FailbackRegistry1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public FailbackRegistry(URL url) {
super(url);
//默认5s 重试一次
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// 检测并连接注册中心
try {
retry();
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
注册失败的url会放到Set
引用远程服务
1 | public class RegistryProtocol implements Protocol { |
官方集群容错图:
各节点关系:
- 这里的
Invoker
是Provider
的一个可调用Service
的抽象,Invoker
封装了Provider
地址及Service
接口信息 Directory
代表多个Invoker
,可以把它看成List<Invoker>
,但与List
不同的是,它的值可能是动态变化的,比如注册中心推送变更Cluster
将Directory
中的多个Invoker
伪装成一个Invoker
,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个Router
负责从多个Invoker
中按路由规则选出子集,比如读写分离,应用隔离等LoadBalance
负责从多个Invoker
中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
注册服务到注册中心
FailbackRegistry(register())->AbstractRegistry (register())->FailbackRegistry->ZookeeperRegistry (doRegister())1
2
3
4
5
6
7
8
9
10
11
12
13public abstract class AbstractRegistry implements Registry {
public void register(URL url) {
//consumer://192.168.213.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&group=guodong&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,gude&pid=5772&revision=10.11.11&side=consumer&timeout=60000×tamp=1527330877579&version=10.11.11
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()){
logger.info("Register: " + url);
}
//加到set
registered.add(url);
}
}
1 |
|
1 | public class ZookeeperRegistry extends FailbackRegistry |
最终创建节点1
2
3
4
5
6
7
8
9
10
11
12
13public void create(String path, boolean ephemeral) { //path=/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.213.1%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26group%3Dguodong%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%2Cgude%26pid%3D5772%26revision%3D10.11.11%26side%3Dconsumer%26timeout%3D60000%26timestamp%3D1527330877579%26version%3D10.11.11
int i = path.lastIndexOf('/');
if (i > 0) {
//递归创建父节点为持久节点
create(path.substring(0, i), false);
}
//最后创建临时节点 /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.213.1%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26group%3Dguodong%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%2Cgude%26pid%3D5772%26revision%3D10.11.11%26side%3Dconsumer%26timeout%3D60000%26timestamp%3D1527330877579%26version%3D10.11.11
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
zk创建的节点如下:
订阅服务提供者
RegistryDirectory( subscribe())->FailbackRegistry(subscribe())->AbstractRegistry (subscribe())->FailbackRegistry->ZookeeperRegistry (doRegister())
1 | public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener{ |
AbstractRegistry:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public void subscribe(URL url, NotifyListener listener) {
//listener为RegistryDirectory ,RegistryDirectory实现了NotifyListener
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()){
logger.info("Subscribe: " + url);
}
//获取当前url是否已经存在监听集合,不存在则创建,并放到缓存
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
//监听器放到当前url为key的缓存集合中
listeners.add(listener);
}
FailbackRegistry:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void subscribe(URL url, NotifyListener listener) {
//调用AbstractRegistry.subscribe
super.subscribe(url, listener);
//failedUnsubscribed和failedNotified中移除当前监听器
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}
ZookeeperRegistry:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
//Constants.ANY_VALUE=* 这里走else逻辑
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
//toCategoriesPath(url)->/dubbo/com.alibaba.dubbo.demo.DemoService/providers、/dubbo/com.alibaba.dubbo.demo.DemoService/configurators、/dubbo/com.alibaba.dubbo.demo.DemoService/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//主要目的就是将zkClient的事件IZkChildListener转换到ZookeeperRegistry事件NotifyListener,当zkClient子目录发生改变时,会调用下面的ZookeeperRegistry.this.notify
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//创建3个节点/dubbo/com.alibaba.dubbo.demo.DemoService/providers、/dubbo/com.alibaba.dubbo.demo.DemoService/configurators、/dubbo/com.alibaba.dubbo.demo.DemoService/routers
zkClient.create(path, false);
//zkclient将会监听路径下的变化 最终会调用ZkclientZookeeperClient下的client.subscribeChildChanges(path, listener),这个会回调上面的ZookeeperRegistry.this.notify();
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//第一次订阅通知 ZookeeperRegistry.this.notify()
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
List
children = zkClient.addChildListener(path, zkListener)非常重要,/dubbo/com.alibaba.dubbo.demo.DemoService/providers、/dubbo/com.alibaba.dubbo.demo.DemoService/configurators、/dubbo/com.alibaba.dubbo.demo.DemoService/routers配置改变时,都会收到通知,ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));会对通知进行处理,比如重新刷新Invoker
ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
NotifyListener作为key,key就是RegistryDirectory
服务订阅完成后通知
核心:providers、routers、configurators目录下发生改变会调用这个通知
1 | public abstract class FailbackRegistry extends AbstractRegistry{ |
1 | public abstract class AbstractRegistry implements Registry { |
最终目的:刷新urlInvokerMap 和methodInvokerMap对象
1 | public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { |
protocol.refer(serviceType, url)->Protocol$Adpative(获取DubboProtocol的包装)-> ProtocolListenerWrapper->ListenerInvokerWrapper->ProtocolFilterWrapper(创建Invoker Filter链,DubboInvoker在链条最后)->DubboProtocol(创建DubboInvoker)
创建客户端连接
创建客户端连接并返回DubboInvoker
1 | public class DubboProtocol extends AbstractProtocol { |
建立连接:Exchangers->HeaderExchanger->Transporters
1 | public class Exchangers { |
1 | public class HeaderExchanger implements Exchanger { |
1 | public class Transporters { |
1 | public class NettyTransporter implements Transporter { |
最终获取到ReferenceCountExchangeClient(有缓存直接从缓存中获取,没有新建一个客户端,这里要注意是否一个提供者所有接口都共享一个客户端,默认情况是共享一个客户端),然后创建DubboInvoker
加入集群路由
默认集群策略:failover
Cluster$Adpative->MockClusterWrapper->MockClusterInvoker->FailoverCluster
1 | public class FailoverCluster implements Cluster { |
最终返回包装后的FailoverClusterInvoker
创建invoker代理
根据 返回的FailoverClusterInvoker创建代理,最终会创建DemoService接口的代理类.
proxyFactory.getProxy(invoker)->ProxyFactory$Adpative->StubProxyFactoryWrapper->AbstractProxyFactory->JavassistProxyFactory
1 | public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { |
1 | public abstract class AbstractProxyFactory implements ProxyFactory { |
1 | public class JavassistProxyFactory extends AbstractProxyFactory { |
主要核心是Proxy.getProxy(interfaces) 产生代理类。
原理
Proxy类的newInstance(InvocationHandler handler)方法是一个抽象方法,所有Proxy.getProxy(interfaces) 要先产生Proxy的代理类,实现newInstance(InvocationHandler handler)方法。
Proxy例子
1 | public interface Gude { |
Proxy会根据interfaces实现抽象方法产生代理类,产生的类名 序号会递增 从0开始 Proxy0 1 2 3… 和proxy0 1 2…对应
- Gude接口生成的Proxy代理类
1 | public class Proxy0 extends Proxy { |
- Gude接口实现类
1 | public class proxy0 implements Gude { |
所以JavassistProxyFactory:
1 | public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { |
最终会产生DemoService、EchoService的实现类proxy0(ps:不一定是序号0根据顺序递增)并返回。这个代理类会作为bean注册到spring容器中。在业务中调用此接口的时候,会走proxy0的方法,这里会走集群容错、路由等去远程调用(调用的是返回的MockClusterInvoker->FailoverClusterInvoker….)。
总结
过程:获取注册中心->注册consumer到注册中心->订阅providers服务提供者->连接到提供者->创建接口代理。(ps:都是基于单个接口,每个接口都会经过这个流程,只不过有些可以直接去缓存获取,比如共享客户端已经创建过连接了)