NetMQ 發布訂閱模式 Publisher-Subscriber
阿新 • • 發佈:2017-10-14
target send 打開 col 請求響應模型 wid mat on() kobject
第一部分引用於:點擊打開
1:簡單介紹
PUB-SUB模式一般處理的都不是系統的關鍵數據。發布者不關註訂閱者是否收到發布的消息,訂閱者也不知道自己是否收到了發布者發出的所有消息。你也不知道訂閱者何時開始收到消息。類似於廣播,收音機。因此邏輯上,它都不是可靠的。這個可以通過與請求響應模型組合來解決。
圖1:簡單的發布訂閱模式
圖2:與請求響應模式組合的發布訂閱模式
2:案例
定義IPublishser接口
namespace NetMQDemoPublisher { public interface IPublisher:IDisposable { /// <summary>/// 發布消息 /// </summary> /// <param name="topicName">主題</param> /// <param name="data">內容</param> void Publish(string topicName, string data); } }
Publisher實現類
namespace NetMQDemoPublisher { public class Publisher:IPublisher {private object _lockObject = new object(); private PublisherSocket _publisherSocket; public Publisher(string endPoint) { _publisherSocket = new PublisherSocket(); _publisherSocket.Options.SendHighWatermark = 1000; _publisherSocket.Bind(endPoint); }#region Implementation of IDisposable /// <summary> /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。 /// </summary> public void Dispose() { lock (_lockObject) { _publisherSocket.Close(); _publisherSocket.Dispose(); } } /// <summary> /// 發布消息 /// </summary> /// <param name="topicName">主題</param> /// <param name="data">內容</param> public void Publish(string topicName, string data) { lock (_lockObject) { _publisherSocket.SendMoreFrame(topicName).SendFrame(data); } } #endregion } }
Publisher窗口界面
界面中實現的功能代碼
namespace NetMQDemoPublisher { public partial class PublisherForm : Form { private IPublisher publisher; public PublisherForm() { InitializeComponent(); publisher = new Publisher("tcp://127.0.0.1:8888"); } private void button1_Click(object sender, EventArgs e) { string strContent = textBox1.Text; ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}", strContent)); listView1.Items.Add(item); publisher.Publish("NetMQ", strContent); } } }
定義ISubscriber接口
namespace NetMQDemoSubscriber { public interface ISubscriber:IDisposable { /// <summary> /// 事件 /// </summary> event Action<string, string> Nofity; /// <summary> /// 註冊訂閱主題 /// </summary> /// <param name="topics"></param> void RegisterSubscriber(List<string> topics); /// <summary> /// 註冊訂閱 /// </summary> void RegisterSbuscriberAll(); /// <summary> /// 移除所有訂閱消息,並關閉 /// </summary> void RemoveSbuscriberAll(); } }
Subscriber實現類
namespace NetMQDemoSubscriber { public class Subscriber:ISubscriber { private SubscriberSocket _subscriberSocket = null; private string _endpoint = @"tcp://127.0.0.1:9876"; public Subscriber(string endPoint) { _subscriberSocket = new SubscriberSocket(); _endpoint = endPoint; } #region Implementation of IDisposable /// <summary> /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。 /// </summary> public void Dispose() { throw new NotImplementedException(); } #endregion #region Implementation of ISubscriber public event Action<string, string> Nofity = delegate { }; /// <summary> /// 註冊訂閱主題 /// </summary> /// <param name="topics"></param> public void RegisterSubscriber(List<string> topics) { InnerRegisterSubscriber(topics); } /// <summary> /// 註冊訂閱 /// </summary> public void RegisterSbuscriberAll() { InnerRegisterSubscriber(); } /// <summary> /// 移除所有訂閱消息,並關閉 /// </summary> public void RemoveSbuscriberAll() { InnerStop(); } #endregion #region 內部實現 /// <summary> /// 註冊訂閱消息 /// </summary> /// <param name="topics">訂閱的主題</param> private void InnerRegisterSubscriber(List<string> topics = null) { InnerStop(); _subscriberSocket = new SubscriberSocket(); _subscriberSocket.Options.ReceiveHighWatermark = 1000; _subscriberSocket.Connect(_endpoint); if (null == topics) { _subscriberSocket.SubscribeToAnyTopic(); } else { topics.ForEach(item => _subscriberSocket.Subscribe(item)); } Task.Factory.StartNew(() => { while (true) { string messageTopicReceived = _subscriberSocket.ReceiveFrameString(); string messageReceived = _subscriberSocket.ReceiveFrameString(); Nofity(messageTopicReceived, messageReceived); } }); } /// <summary> /// 關閉訂閱 /// </summary> private void InnerStop() { _subscriberSocket.Close(); } #endregion } }
Subscriber窗口界面
窗體功能代碼
namespace NetMQDemoSubscriber { public partial class SubscriberForm : Form { private ISubscriber subscriber; public SubscriberForm() { InitializeComponent(); } private void SubscriberForm_Load(object sender, EventArgs e) { subscriber = new Subscriber("tcp://127.0.0.1:8888"); subscriber.RegisterSbuscriberAll(); subscriber.Nofity+= delegate(string s, string s1) { ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1)); listView1.Items.Add(item); }; } } }
運行後,Publiser開啟一個,Subscirber開啟三個,進行測試如圖
源碼下載
如果覺得文章好,記得關註一下公眾號喲!
NetMQ 發布訂閱模式 Publisher-Subscriber