ActiveMQ c# 系列——進階例項(三)
阿新 • • 發佈:2020-07-27
前言
前面介紹了基本的消費者和生產者,那麼看下他們之間有什麼其他的api。
正文
消費者設定等待時間
生產者生產了5條訊息
改一下消費者。
static void Main(string[] args) { Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616"); IConnectionFactory factory = new ConnectionFactory(connecturl); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, "queue://test"); using (IMessageConsumer consumer=session.CreateConsumer(destination)) { connection.Start(); //consumer.Listener += new MessageListener(onMessage); while (true) { var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4)); if (message != null) { Console.WriteLine(message.NMSMessageId); Console.WriteLine(message.Text); } } } } } }
Receive(TimeSpan.FromSeconds(4)) 表示如果4秒沒有訊息將不再等待。通過斷點調式可以很快的展示出來。
多個消費者的情況
我們知道佇列是每個生產的東西都只能消費一次,這個就不做試驗了,因為這個是佇列的基本原理。
那麼我啟動兩個消費者,那麼消費情況是什麼樣的呢?
生產者:
class Program { static void Main(string[] args) { Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616"); IConnectionFactory factory = new ConnectionFactory(connecturl); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, "queue://test"); using (IMessageProducer producer = session.CreateProducer(destination)) { //producer.DeliveryMode = MsgDeliveryMode.Persistent; //producer.RequestTimeout = TimeSpan.FromSeconds(2); for (int i = 1; i < 7; i++) { ITextMessage request = session.CreateTextMessage("oh,my friend"+i); producer.Send(request); } } } } } }
兩個消費者:
class Program { protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10); static void Main(string[] args) { Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616"); IConnectionFactory factory = new ConnectionFactory(connecturl); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, "queue://test"); using (IMessageConsumer consumer = session.CreateConsumer(destination)) { connection.Start(); consumer.Listener += new MessageListener(onMessage); Console.Read(); //while (true) //{ // var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4)); // if (message != null) // { // Console.WriteLine(message.NMSMessageId); // Console.WriteLine(message.Text); // } //} } } } } protected static void onMessage(IMessage receivedMsg) { ITextMessage message = receivedMsg as ITextMessage; if (message != null) { //查詢出訊息 Console.WriteLine(message.Text); } } }
消費情況:
平均分配