1. 程式人生 > >Dubbo、Spring、Zookeeper、整合基礎案例(引數回撥)

Dubbo、Spring、Zookeeper、整合基礎案例(引數回撥)

摘要:最近抽時間系統的學習了Dubbo的一些內容,趁有時間,整理下,順便記錄下,以防以後回顧。前面我們學校了Dubbo的xml、註解方式,本次我們學習下Dubbo的引數回撥。

一:執行環境

1>:JDK 1.8

2>:IDEA 2018.1

3>:Zookeeper 3.x

4>:Maven 3.2

5>:Dubbo 2.8.4

二:專案結構

三:服務提供者、這裡的pom.xml和Provider.java上一篇的一樣就不貼上來了

CallBackListener.java
package com.micai.dubbo.provider;

/**
 * @Auther: zhaoxinguo
 * @Date: 2018/9/11 16:30
 * @Description: 引數回撥監聽器
 */
public interface CallBackListener {

    /**
     * 呼叫服務提供方
     * @param msg
     */
    void changed(String msg);
}
CallBackService.java
package com.micai.dubbo.provider;

/**
 * @Auther: zhaoxinguo
 * @Date: 2018/9/11 16:29
 * @Description: 引數回撥
 */
public interface CallBackService {

    /**
     * 介面對消費者暴露的服務
     * @param key
     * @param callBackListener
     */
    void addListener(String key, CallBackListener callBackListener);
}
CallBackServiceImpl.java
package com.micai.dubbo.provider;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Auther: zhaoxinguo
 * @Date: 2018/9/11 16:31
 * @Description: 介面的實現類
 */
public class CallBackServiceImpl implements CallBackService {

    // 監聽器集合
    private final Map<String, CallBackListener> listeners = new ConcurrentHashMap<String, CallBackListener>();

    public CallBackServiceImpl() {
        Thread t = new Thread(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        for (Map.Entry<String, CallBackListener> entry : listeners.entrySet()) {
                            try {
                                entry.getValue().changed(getChanged(entry.getKey()));
                            } catch (Throwable t) {
                                listeners.remove(entry.getKey());
                            }
                        }
                        Thread.sleep(5000); // timely trigger change event
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        });
        t.setDaemon(true);
        t.start();
    }

    // 新增監聽器、傳送一個事件,觸發監聽器
    public void addListener(String key, CallBackListener listener) {
        listeners.put(key, listener);
        listener.changed(getChanged(key)); // send notification for change
    }

    private String getChanged(String key) {
        return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
}

applicationContext.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
       http://code.alibabatech.com/schema/dubbo
       http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!--提供方應用資訊,用於計算依賴關係-->
    <dubbo:application name="hello-world-app"/>

    <!--使⽤zookeeper註冊中⼼暴露服務地址-->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!--用dubbo協議在20880埠暴露服務-->
    <dubbo:protocol name="dubbo" port="20880"/>

    <!--&lt;!&ndash;宣告需要暴露的服務介面&ndash;&gt;
    <dubbo:service interface="com.micai.dubbo.provider.DemoService" ref="demoService"/>
    &lt;!&ndash;和本地bean一樣實現服務&ndash;&gt;
    <bean id="demoService" class="com.micai.dubbo.provider.DemoServiceImpl"/>-->

    <!--引數回撥-->
    <bean id="callBackService" class="com.micai.dubbo.provider.CallBackServiceImpl"/>
    <dubbo:service interface="com.micai.dubbo.provider.CallBackService" ref="callBackService" connections="1" callbacks="1000">
        <dubbo:method name="addListener">
            <dubbo:argument index="1" callback="true"/>
        </dubbo:method>
    </dubbo:service>

    <!--掃描註解包路徑,多個包⽤逗號分隔,不填pacakge表示掃描當前ApplicationContext中所有的類-->
    <dubbo:annotation package="com.micai.dubbo.provider"/>

</beans>

四:服務消費者、這裡的pom.xml和上一篇的一樣就不貼上來了

CallBackAction.java
package com.micai.dubbo.consumer;

import com.alibaba.dubbo.config.annotation.Reference;
import com.micai.dubbo.provider.CallBackListener;
import com.micai.dubbo.provider.CallBackService;
import org.springframework.stereotype.Component;

/**
 * @Auther: zhaoxinguo
 * @Date: 2018/9/11 17:46
 * @Description: 引數回撥
 */
@Component
public class CallBackAction {

    @Reference
    private CallBackService callBackService;

    public void getChanges() {
        callBackService.addListener("zhaoxinguo", new CallBackListener() {
            @Override
            public void changed(String msg) {
                System.out.println("callback: " + msg);
            }
        });
    }
}
Consumer.java
package com.micai.dubbo;

import com.micai.dubbo.consumer.CallBackAction;
import com.micai.dubbo.consumer.DemoAction;
import com.micai.dubbo.consumer.UserAction;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @Auther: zhaoxinguo
 * @Date: 2018/9/11 14:36
 * @Description:
 */
public class Consumer {

    public static void main(String [] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        classPathXmlApplicationContext.start();

        // 同步呼叫
        /*DemoAction demoAction = (DemoAction) classPathXmlApplicationContext.getBean("demoAction");
        String str = demoAction.sayHello();
        System.out.println("同步呼叫結果: " + str);*/

        // 2.6.x版本之前的非同步呼叫方式
        /*UserAction userAction = (UserAction) classPathXmlApplicationContext.getBean("userAction");
        // 通過RpcContext獲取Future
        userAction.getUserName();
        // 拿到Dubbo內建的ResponseFuture並設定回撥
        userAction.getUserName2();*/

        // 引數回撥
        CallBackAction callBackAction = classPathXmlApplicationContext.getBean("callBackAction", CallBackAction.class);
        callBackAction.getChanges();

        // -------------------- 全非同步的Dubbo服務呼叫鏈 -------------------- //

        // CompletableFuture型別介面實現非同步呼叫
        /*AsyncAction asyncAction = (AsyncAction) classPathXmlApplicationContext.getBean("asyncAction");
        asyncAction.asyncSayHello();*/

        // 同步介面使用Annotation Processor實現非同步呼叫
        /*GreetingsAction greetingsAction = (GreetingsAction) classPathXmlApplicationContext.getBean("greetingsAction");
        greetingsAction.sayHi();*/

    }
}

五:執行結果

六:下載原始碼請掃描加群獲取

QQ交流群