dubbo服務端之釋出
一概述
最近由於公司業務減少,開始慢慢裁員了。程式猿們都已經清閒個把月了,不知道是不是過慣了忙碌的日子一下特別不舒服。被裁員可以拿到N+1的賠償也是不錯的哈!不知道下一個會不會是我,嘿嘿!上一節說到了如何自定義標籤,今天說說dubbo的服務是如何釋出出來的,研究了幾天略有眉目。如果客觀想看看,需要提前瞭解以下幾個知識點:1.netty的使用,版本是3.x幾的, 2. SPI技術的瞭解, 3.裝飾模式(可能還涉及其它一些常見的模式就不提了),4java的反射機制,5.動態代理。
二服務釋出
dubbo自定義標籤 裡面說過啟動服務時會首先載入XML檔案中的標籤,解析出來的標籤資料會裝配到對應的實體類中,dubbo框架裡面也是這樣的。我們首先看看com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler類的實現
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }
這裡面的邏輯主要就是解析dubbo的相關標籤,並把屬性值裝配到對應的實體類中。研究服務釋出我們重點看service標籤的解析也就是15行的程式碼。因為ServiceBean實現了spring的ApplicationListener介面,所以也是一個監聽器。在spring容器載入完成後觸發contextrefreshedevent事件,這個事件會被實現了ApplicationListener介面的類監聽到,執行對應的onApplicationEvent函式。我們看看ServiceBean類的程式碼
public void onApplicationEvent(ApplicationEvent event) { if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) { if (isDelay() && ! isExported() && ! isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } }
這個監聽方法最終呼叫了export方法來實現服務的釋出處理。因為ServiceBean繼承了ServiceConfig類,所以最終還是呼叫了ServiceConfig中的export方法:
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();//是否暴露
}
if (delay == null) {
delay = provider.getDelay();//是否延遲暴露
}
}
if (export != null && ! export.booleanValue()) {
return;
}
//是否延遲釋出服務介面
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);//延遲睡眠後呼叫doExport釋出介面
} catch (Throwable e) {
}
doExport();//直接釋出介面
}
});
thread.setDaemon(true);//設定為守護執行緒
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();//直接釋出介面
}
}
裡面做了很多邏輯判斷,但最後還是呼叫了doExport方法,doExport做了很多配置上檢查程式碼比較長也非重點就不貼出來,大家可以去檢視相關原始碼。裡面呼叫doExportUrls方法,程式碼如下:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
這裡我斷點了下loadRegistries方法返回的就是註冊地址,也就是服務在釋出時需要到zookeeper中註冊下。我這裡配置兩臺zookeeper伺服器,所以loadRegistries方法返回的是兩個URL如下:
registry://192.168.14.46:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=524®istry=zookeeper×tamp=1498806201853
registry://192.168.14.47:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=524®istry=zookeeper×tamp=1498806201986
從URL可以看出,dubbo將我們設定的註冊標籤資料全部轉換URL的形式,所有的配置引數均跟著URL後面。而集合protocols屬性裡面儲存了我們配置的協議資料。協議可以配置多個種類。也就是說這裡主要是使用這個迴圈將當前這個服務以某種協議在多個註冊機上進行釋出。但是我沒看懂集合protocols物件是從哪裡注入的資料,細節的我就懶得管啦!繼續往doExportUrlsFor1Protocol方法中檢視。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
String host = protocolConfig.getHost();
if (provider != null && (host == null || host.length() == 0)) {
host = provider.getHost();
}
boolean anyhost = false;
if (NetUtils.isInvalidLocalHost(host)) {
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
if (NetUtils.isInvalidLocalHost(host)) {
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
host = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (NetUtils.isInvalidLocalHost(host)) {
host = NetUtils.getLocalHost();
}
}
}
Integer port = protocolConfig.getPort();
if (provider != null && (port == null || port == 0)) {
port = provider.getPort();
}
//獲取協議預設的埠號
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (port == null || port == 0) {
port = defaultPort;
}
if (port == null || port <= 0) {
port = getRandomPort(name);//隨機生成埠
if (port == null || port < 0) {
port = NetUtils.getAvailablePort(defaultPort);
putRandomPort(name, port);
}
logger.warn("Use random available port(" + port + ") for protocol " + name);
}
Map<String, String> map = new HashMap<String, String>();
if (anyhost) {
map.put(Constants.ANYHOST_KEY, "true");
}
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && arguments.size() > 0) {
for (ArgumentConfig argument : arguments) {
//型別自動轉換.
if(argument.getType() != null && argument.getType().length() >0){
Method[] methods = interfaceClass.getMethods();
//遍歷所有方法
if(methods != null && methods.length > 0){
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
//匹配方法名稱,獲取方法簽名.
if(methodName.equals(method.getName())){
Class<?>[] argtypes = methods[i].getParameterTypes();
//一個方法中單個callback
if (argument.getIndex() != -1 ){
if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
} else {
//一個方法中多個callback
for (int j = 0 ;j<argtypes.length ;j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j){
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
}else if(argument.getIndex() != -1){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put("generic", generic);
map.put("methods", Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
//根據服務實現的介面獲取相關服務方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (! ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
}
}
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 匯出服務
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
//根據Map中儲存的屬性組裝URL地址
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置為none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露遠端服務)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//通過proxyFactory物件生成介面實現類代理物件Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//將Invoker物件封裝到protocol協議物件中,同時開啟socket服務監聽埠,這裡socket通訊是使用netty框架來處理的
Exporter<?> exporter = protocol.export(invoker);
//新增物件到集合
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
doExportUrlsFor1Protocol方法中主要做了幾件事
1.根據xml中dubbo標籤配置組裝dubbo協議URL,如果沒有配置任何協議,預設使用了dubbo協議。比如這裡組裝後端URL
dubbo://192.168.14.46:20880/com.service.interfaces.DubboService?anyhost=true&application=demo-provider&dubbo=2.5.3&interface=com.service.interfaces.DubboService&methods=sayName,sayHello&pid=6844&revision=1.0-SNAPSHOT&side=provider×tamp=1502851260474
細看這裡組裝的URL裡面攜帶了釋出一個介面服務所有相關的引數,包括介面全限定名,介面相關方法名稱,版本號,模組名稱
2.通訊協議未指明埠時,隨機生成埠號
3.通過proxyFactory物件生成介面實現類的代理物件invoker
4.通過protocol物件將invoker封裝成Exporter物件,同時開啟了socket服務監聽埠。這裡socket通訊使用的是netty框架,
不知道netty的可以先看看相關資料,對理解後面原始碼會有幫助
這裡proxyFactory和protocol物件是通過以下程式碼生成的,關於這兩個物件是如何生成的,我再通過其它章節仔細描述。這裡我們就預設生成的Protocol 物件是DubboProtocol,ProxyFactory物件是JdkProxyFactory
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();//全域性的協議物件
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();//全域性的代理工廠物件
三 疑問
程式碼分析到這裡,服務已經正常啟動了通過客戶端可以遠端呼叫介面了。你是不是會感到好奇1.伺服器是怎麼開啟socket監聽
2.客戶端是怎樣請求到遠端的服務,伺服器是怎麼響應客戶端
3.伺服器是怎麼做非同步響應,超時怎麼處理
由於篇幅太長,針對以上3個問題我通過下面一個章節來分析。