1. 程式人生 > 實用技巧 >ActiveMQ c# 系列——進階例項(三)

ActiveMQ c# 系列——進階例項(三)

前言

前面介紹了基本的消費者和生產者,那麼看下他們之間有什麼其他的api。

正文

消費者設定等待時間

生產者生產了5條訊息

改一下消費者。

static void Main(string[] args)
{

	Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
	IConnectionFactory factory = new ConnectionFactory(connecturl);
  
	using (IConnection connection = factory.CreateConnection())
	{
		using (ISession session = connection.CreateSession())
		{
			IDestination destination = SessionUtil.GetDestination(session, "queue://test");
			using (IMessageConsumer consumer=session.CreateConsumer(destination))
			{
				connection.Start();
				//consumer.Listener += new MessageListener(onMessage);
				while (true)
				{
					var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
					if (message != null)
					{
						Console.WriteLine(message.NMSMessageId);
						Console.WriteLine(message.Text);
					}
				}
			}
		}
	}
}

Receive(TimeSpan.FromSeconds(4)) 表示如果4秒沒有訊息將不再等待。通過斷點調式可以很快的展示出來。

多個消費者的情況

我們知道佇列是每個生產的東西都只能消費一次,這個就不做試驗了,因為這個是佇列的基本原理。

那麼我啟動兩個消費者,那麼消費情況是什麼樣的呢?

生產者:

class Program
{
	static void Main(string[] args)
	{
		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
		IConnectionFactory factory = new ConnectionFactory(connecturl);
		using (IConnection connection = factory.CreateConnection())
		{
			using (ISession session = connection.CreateSession())
			{
				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
				using (IMessageProducer producer = session.CreateProducer(destination))
				{
					//producer.DeliveryMode = MsgDeliveryMode.Persistent;
					//producer.RequestTimeout = TimeSpan.FromSeconds(2);
					for (int i = 1; i < 7; i++)
					{
						ITextMessage request = session.CreateTextMessage("oh,my friend"+i);
						producer.Send(request);
					}
				}
			}
		}
	}
}

兩個消費者:

class Program
{
	protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
	static void Main(string[] args)
	{

		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
		IConnectionFactory factory = new ConnectionFactory(connecturl);

		using (IConnection connection = factory.CreateConnection())
		{
			using (ISession session = connection.CreateSession())
			{
				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
				using (IMessageConsumer consumer = session.CreateConsumer(destination))
				{
					connection.Start();
					consumer.Listener += new MessageListener(onMessage);
					Console.Read();
					//while (true)
					//{
					//    var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
					//    if (message != null)
					//    {
					//        Console.WriteLine(message.NMSMessageId);
					//        Console.WriteLine(message.Text);
					//    }
					//}
				}
			}
		}
	}

	protected static void onMessage(IMessage receivedMsg)
	{
		ITextMessage message = receivedMsg as ITextMessage;
		if (message != null)
		{
			//查詢出訊息
			Console.WriteLine(message.Text);
		}
	}
}

消費情況:

平均分配