1. 程式人生 > >NetMQ 發布訂閱模式 Publisher-Subscriber

NetMQ 發布訂閱模式 Publisher-Subscriber

target send 打開 col 請求響應模型 wid mat on() kobject

第一部分引用於:點擊打開

1:簡單介紹

PUB-SUB模式一般處理的都不是系統的關鍵數據。發布者不關註訂閱者是否收到發布的消息,訂閱者也不知道自己是否收到了發布者發出的所有消息。你也不知道訂閱者何時開始收到消息。類似於廣播,收音機。因此邏輯上,它都不是可靠的。這個可以通過與請求響應模型組合來解決。

技術分享
圖1:簡單的發布訂閱模式

技術分享
圖2:與請求響應模式組合的發布訂閱模式

2:案例

定義IPublishser接口

namespace NetMQDemoPublisher
{
    public interface IPublisher:IDisposable
    {
        /// <summary>
/// 發布消息 /// </summary> /// <param name="topicName">主題</param> /// <param name="data">內容</param> void Publish(string topicName, string data); } }

技術分享技術分享

Publisher實現類

namespace NetMQDemoPublisher
{
    public class Publisher:IPublisher
    {
        
private object _lockObject = new object(); private PublisherSocket _publisherSocket; public Publisher(string endPoint) { _publisherSocket = new PublisherSocket(); _publisherSocket.Options.SendHighWatermark = 1000; _publisherSocket.Bind(endPoint); }
#region Implementation of IDisposable /// <summary> /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。 /// </summary> public void Dispose() { lock (_lockObject) { _publisherSocket.Close(); _publisherSocket.Dispose(); } } /// <summary> /// 發布消息 /// </summary> /// <param name="topicName">主題</param> /// <param name="data">內容</param> public void Publish(string topicName, string data) { lock (_lockObject) { _publisherSocket.SendMoreFrame(topicName).SendFrame(data); } } #endregion } }

Publisher窗口界面

技術分享

界面中實現的功能代碼

namespace NetMQDemoPublisher
{
    public partial class PublisherForm : Form
    {
        private IPublisher publisher;
        public PublisherForm()
        {
            InitializeComponent();
            publisher = new Publisher("tcp://127.0.0.1:8888");
        }

        private void button1_Click(object sender, EventArgs e)
        {
            string strContent = textBox1.Text;
            ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}",  strContent));
            listView1.Items.Add(item);
            publisher.Publish("NetMQ", strContent);
        }
    }
}

定義ISubscriber接口

namespace NetMQDemoSubscriber
{
    public interface ISubscriber:IDisposable
    {
        /// <summary>
        /// 事件
        /// </summary>
        event Action<string, string> Nofity;

        /// <summary>
        /// 註冊訂閱主題
        /// </summary>
        /// <param name="topics"></param>
        void RegisterSubscriber(List<string> topics);

        /// <summary>
        /// 註冊訂閱
        /// </summary>
        void RegisterSbuscriberAll();

        /// <summary>
        /// 移除所有訂閱消息,並關閉
        /// </summary>
        void RemoveSbuscriberAll();
    }
}

Subscriber實現類

namespace NetMQDemoSubscriber
{
    public class Subscriber:ISubscriber
    {
        private SubscriberSocket _subscriberSocket = null;
        private string _endpoint = @"tcp://127.0.0.1:9876";

        public Subscriber(string endPoint)
        {
            _subscriberSocket = new SubscriberSocket();
            _endpoint = endPoint;
        }
        #region Implementation of IDisposable

        /// <summary>
        /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。
        /// </summary>
        public void Dispose()
        {
            throw new NotImplementedException();
        }

        #endregion

        #region Implementation of ISubscriber

        public event Action<string, string> Nofity = delegate { };

        /// <summary>
        /// 註冊訂閱主題
        /// </summary>
        /// <param name="topics"></param>
        public void RegisterSubscriber(List<string> topics)
        {
            InnerRegisterSubscriber(topics);
        }

        /// <summary>
        /// 註冊訂閱
        /// </summary>
        public void RegisterSbuscriberAll()
        {
            InnerRegisterSubscriber();
        }

        /// <summary>
        /// 移除所有訂閱消息,並關閉
        /// </summary>
        public void RemoveSbuscriberAll()
        {
            InnerStop();
        }

        #endregion

        #region 內部實現

        /// <summary>
        /// 註冊訂閱消息
        /// </summary>
        /// <param name="topics">訂閱的主題</param>
        private void InnerRegisterSubscriber(List<string> topics = null)
        {
            InnerStop();
            _subscriberSocket = new SubscriberSocket();
            _subscriberSocket.Options.ReceiveHighWatermark = 1000;
            _subscriberSocket.Connect(_endpoint);
            if (null == topics)
            {
                _subscriberSocket.SubscribeToAnyTopic();
            }
            else
            {
                topics.ForEach(item => _subscriberSocket.Subscribe(item));
            }
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string messageTopicReceived = _subscriberSocket.ReceiveFrameString();
                    string messageReceived = _subscriberSocket.ReceiveFrameString();
                    Nofity(messageTopicReceived, messageReceived);
                }
            });
        }

        /// <summary>
        /// 關閉訂閱
        /// </summary>
        private void InnerStop()
        {
            _subscriberSocket.Close();
        }

        #endregion
    }
}

Subscriber窗口界面

技術分享

窗體功能代碼

namespace NetMQDemoSubscriber
{
    public partial class SubscriberForm : Form
    {
        private ISubscriber subscriber;
        public SubscriberForm()
        {
            InitializeComponent();
        }

        private void SubscriberForm_Load(object sender, EventArgs e)
        {
            subscriber = new Subscriber("tcp://127.0.0.1:8888");
            subscriber.RegisterSbuscriberAll();
            subscriber.Nofity+= delegate(string s, string s1)
            {
                ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1));
                listView1.Items.Add(item);
            };
        }
    }
}

運行後,Publiser開啟一個,Subscirber開啟三個,進行測試如圖

技術分享

源碼下載

如果覺得文章好,記得關註一下公眾號喲!

技術分享

NetMQ 發布訂閱模式 Publisher-Subscriber