1. 程式人生 > 其它 >SignalR循序漸進(三)簡易的叢集通訊元件

SignalR循序漸進(三)簡易的叢集通訊元件

上一篇演示了泛型Hub的實現,微軟於6月17日更新了SignalR 2.1.0,然後自帶了泛型Hub,於是就不需要自己去實現了…(微軟你為啥不早一個月自帶啊…)。不過沒關係,SignalR出彩之處不在泛型Hub,本篇為各位觀眾帶來了基於SignalR的簡易叢集通訊元件Demo,可用於分散式定時任務。

說到叢集,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制於成員數量,Cluster用數量堆高可用性,HPC太複雜。本著SignalR的雙向非同步通訊的特點,其實是可以用來玩彈性計算的。初始狀態由一臺計算任務分發節點,一臺監控以及一臺計算節點構成。隨著任務分發佇列中的任務數越來越多,一臺執行節點無法及時消耗待執行任務,達到某個閾值的時候,動態的加入一個計算節點來增加計算吞吐量。同樣的,當佇列中的任務基本處於很低的數量的時候,自動移除一個計算節點來減少資源消耗。當然,如果是大型的計算量之下,分發節點,佇列都應該是叢集的,還要考慮各種計算節點故障之類的問題,這不在本篇考慮的範疇內,本篇以初始狀態模型來一步步實現簡易叢集通訊元件。

好,廢話不說了,正篇開始。

任務分發節點

任務分發節點只有一個公開的行為,就是接受計算節點任務執行完成的訊息。

下面是實現。

/// <summary>
    /// 叢集交換器
    /// </summary>
    public class ClusterHub : Hub<IClusterClient>
    {
        /// <summary>
        /// 
        /// </summary>
        static ClusterHub()
        {
            aliveDictionary = new ConcurrentDictionary<string, Guid>();
        }
        
        /// <summary>
        /// 
        /// </summary>
        /// <param name="dispatcher"></param>
        public ClusterHub(IDispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            db = OdbFactory.Open(localDbFileName);
        }

        /// <summary>
        /// 本地資料庫檔名
        /// </summary>
        const string localDbFileName = "ClusterStorage.dll";

        /// <summary>
        /// 監視器連線Id
        /// </summary>
        static string monitorConnectionId;

        /// <summary>
        /// 排程器
        /// </summary>
        IDispatcher dispatcher;

        /// <summary>
        /// 線上詞典
        /// </summary>
        static ConcurrentDictionary<string, Guid> aliveDictionary;

        /// <summary>
        /// 
        /// </summary>
        static IOdb db;

        /// <summary>
        /// 完成任務
        /// </summary>
        /// <param name="jobResult"></param>
        public void Finished(Contracts.Messages.JobResultDto jobResult)
        {
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));
                if (member != null)
                {
                    member.UpdateStatisticsInfo(jobResult.ProcessedTime);
                    db.Store(member);
                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });
                    }
                }
            }
            Clients.Caller.RunJob(dispatcher.GetJobId());
        }

        /// <summary>
        /// 加入
        /// </summary>
        void Join()
        {
            object ip = string.Empty;
            var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";
            Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);
                if (member != null)
                {
                    member.MemberStatusType = MemberStatusTypeEnum.Connectioned;
                }
                else
                {
                    member = new MemberDo(ip.ToString(), isMonitor);
                    if (isMonitor)
                    {
                        monitorConnectionId = Context.ConnectionId;
                    }
                }
                db.Store(member);

                aliveDictionary.TryAdd(Context.ConnectionId, member.Id);
                if (!isMonitor)
                {
                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).MemberJoin(member.Id);
                    }
                    Clients.Caller.GetId(member.Id.ToString());
                    Clients.Caller.RunJob(dispatcher.GetJobId());
                }
            }
        }

        /// <summary>
        /// 離開
        /// </summary>
        void Leave()
        {
            var id = Guid.Empty;
            aliveDictionary.TryRemove(Context.ConnectionId, out id);
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Id == id);
                if (member != null)
                {
                    member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;
                    db.Store(member);
                    if (member.IsMonitor)
                    {
                        monitorConnectionId = string.Empty;
                    }
                    else if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).MemberLeave(id);
                    }
                }
            }
        }

        public override Task OnConnected()
        {
            Console.WriteLine(Context.ConnectionId+":Connected");
            Join();
            return base.OnConnected();
        }

        public override Task OnDisconnected()
        {
            Console.WriteLine(Context.ConnectionId + ":Disconnected");
            Leave();
            return base.OnDisconnected();
        }

        public override Task OnReconnected()
        {
            Console.WriteLine(Context.ConnectionId + ":Reconnected");
            return base.OnReconnected();
        }
    }

ClusterHub承載著2種客戶端角色的互動,計算節點和監控。

這邊採用了一個輕量級的基於C#開發的無引擎物件資料庫來儲存客戶端資訊。

先說過載的部分:

OnConnected - 當有客戶端連線的時候,執行Join方法。

OnDisconnected - 當有客戶端離線的時候,執行Leave方法。

然後是私有方法:

Join - 根據QueryString來區分客戶端型別是計算節點還是監視器,如果是計算節點,就直接通知監視器有成員加入,然後通過IDispatcher來獲取任務Id,通知計算節點開始執行任務。

Leave -  計算節點離線的時候通知監視器。

公開方法:

Finished - 計算節點完成任務後就呼叫該方法,Hub將計算的一些統計資訊更新到本地儲存,同時通知監視器更新計算結果。

私有變數:

IDispatcher– 任務排程器介面,由外部元件來負責具體的實現。

計算節點

計算節點有兩個行為:

GetId - 獲取節點身份。

RunJob - 執行任務。

/// <summary>
    /// 叢集客戶端
    /// </summary>
    public class ClusterClient
    {
        /// <summary>
        /// 
        /// </summary>
        /// <param name="jobProvider"></param>
        public ClusterClient(IJobProvider jobProvider)
        {
            this.jobProvider = jobProvider;
            url = ConfigurationManager.AppSettings["HubAddress"];
            var queryStrings = new Dictionary<string, string>();
            queryStrings.Add("ClientRole", "Normal");
            connection = new HubConnection(url, queryStrings);
            hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description);
            InitClientEvents();
            connection.Start().Wait();
        }

        string url;

        HubConnection connection;

        IHubProxy hubProxy;

        IJobProvider jobProvider;

        string id;

        /// <summary>
        /// 
        /// </summary>
        void InitClientEvents()
        {
            hubProxy.On("GetId", (id) => GetId(id));
            hubProxy.On("RunJob", (jobId) => RunJob(jobId));
        }

        /// <summary>
        /// 執行任務
        /// </summary>
        /// <param name="id"></param>
        void GetId(string id)
        {
            this.id = id;
        }

        /// <summary>
        /// 執行任務
        /// </summary>
        /// <param name="jobId"></param>
        void RunJob(string jobId)
        {
            var startTime = DateTime.Now;
            jobProvider.Invoke(jobId);
            var stopTime = DateTime.Now;
            hubProxy.Invoke("Finished", new JobResultDto() { Id = id, JobId = jobId, ProcessedTime = (stopTime - startTime).TotalMilliseconds });
        }
    }

客戶端的實現很簡單,核心就是通過建構函式注入任務提供介面,由介面通過任務Id來執行任務。

 

監視器

監視器具有三個公開行為:

MemberJoin - 計算節點加入

MemberLeave - 計算節點離線

UpdateMemberStatisticsInfo - 更新節點統計資訊

/// <reference path="jquery-2.1.1.js" />
/// <reference path="jquery.signalR-2.1.0.js" />
(function ($) {

    var members = [];

    var methods = {
        reloadList: function () {
            var list = "";
            $.each(members, function (i, n) {
                list += "<li id='member_" + n.Id + "'>[" + n.Id + "]:AverageProcessedTime " + n.AverageProcessedTime + " Milliseconds</li>";
            });
            $('#members').html(list);
        }
    }

    var hubs = {
        clusterHub: $.connection.clusterHub,
        init: function () {
            $.connection.hub.logging = true;
            $.connection.hub.url = 'http://192.168.1.124:10086/signalr';
            $.connection.hub.qs = { "ClientRole": "Monitor" }
            $.connection.hub.start().done(function () { });
        }
    }

    var cluster = {
        on: {
            updateMemberStatisticsInfo: function (data) {
                $.each(members, function (i, n) {
                    if (n.Id == data.Id) {
                        n.AverageProcessedTime = data.AverageProcessedTime;
                        return;
                    }
                });
                methods.reloadList();
            },
            memberJoin: function (id) {
                members.push({ "Id": id, "AverageProcessedTime": 0 });
                methods.reloadList();
            },
            memberLeave: function (id) {
                members = $.grep(members, function (n) { return n.Id != id });
                methods.reloadList();
            }
        }
    }

    $(function () {
        hubs.clusterHub.client.UpdateMemberStatisticsInfo = cluster.on.updateMemberStatisticsInfo;
        hubs.clusterHub.client.MemberJoin = cluster.on.memberJoin;
        hubs.clusterHub.client.MemberLeave = cluster.on.memberLeave;
        hubs.init();
    });
})(jQuery);
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <title>叢集監視器</title>
</head>
<body>
    <div>
        <ul id="members"></ul>
    </div>
    <script src="scripts/jquery-2.1.1.min.js"></script>
    <script src="scripts/jquery.signalR-2.1.0.min.js"></script>
    <script src="http://192.168.1.124:10086/signalr/hubs"></script>
    <script src="scripts/core.js"></script>
</body>
</html>

監視器用real-time的Web平臺實現,一共註冊三個方法的實現。

最終效果

 

Hub端啟動後,先啟動監視器,然後在不同的機器上啟動計算端,圖上是2個計算節點,監視器上也顯示著2個節點,每個節點執行一個JobId後,監視器上就會重新整理結果。

進一步思考和擴充套件

簡易叢集元件就到這兒了,本篇演示的是一個思路,可以在這個基礎上深度擴充套件成文章開頭所描述的那樣,高效能高可用的基於SignalR的叢集元件。歡迎各位有興趣的同學進行討論和拍磚。