1. 程式人生 > >[2]AMQP(高階訊息佇列協議) ----QPID不得不說的事

[2]AMQP(高階訊息佇列協議) ----QPID不得不說的事

如果說到AMQP協議,則不得不提的就是QPID。QPID的論壇現在十分的活躍,基本上白天提的問題,晚上馬上就能得到回覆。由此可見QPID的活躍程度。

大家可以到http://qpid.2158936.n2.nabble.com/網站上面註冊一個賬號,真的非常的好,吐血推薦給大家。但是要求大家的因為一定要好。下面的這個例子就是我們當時提的http://qpid.2158936.n2.nabble.com/connect-Qpid-broker-using-the-latest-amqp-1-0-qpid-jms-client-0-9-ssl-td7644406.html

另外,QPID的JIRA系統上面也能報Bug,這個bug就是我們當時報的: https://issues.apache.org/jira/browse/QPIDJMS-183,回覆這麼迅速,作為一個開源專案還是挺給力的。

關於具體的QPID的介紹,請大家參考我上一篇文章,而且安裝方式網路上也有很多,大家可以自己去找。其官方網站為http://qpid.apache.org/ 文件的地址為
http://qpid.apache.org/documentation.html

因為QPID的伺服器支援AMQP-1-0協議,也支援AQMP0-9等協議。所以其最新的客戶端的API的地址為:

(1)qpid-java-6.0.4 支援AMQP 0-10, 0-9-1, 0-9, 0-8

https://qpid.apache.org/releases/qpid-java-6.0.4/

 (2) qpid-jms-0.9 支援AMQP 1-0 協議

初學者很容易混淆上面兩種最新的API的區別.

下面給一個簡單的基於qpid-jms-0.9 客戶端Java API 的傳送和接受訊息的例子。下面的例子我是基於QPID本身提供的Sample的一個改造。借花獻佛,希望對大家有所幫助。

下圖是我專案程式碼的基本結構


(1) hello.properties的檔案配置為

java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
connectionfactory.myFactoryLookup =  amqp://192.168.45.111:5710?jms.username=admin&jms.password=admin&transport.connectTimeout=30000
queue.queue=queue_henry
queue.myQueueLookup=queue_henry

(2) Sender.java的程式碼
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.jms.example.success;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Sender {
	private static final String USER = "guest";
	private static final String PASSWORD = "guest";
	private static final int DEFAULT_COUNT = 10;
	private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;

	private Context getContext() {
		InitialContext context = null;
		try {
			InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties");
			Properties properties = new Properties();
			properties.load(resourceAsStream);
			context = new InitialContext(properties);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return context;

	}

	public void publish(int count) {
		try {
			// The configuration for the Qpid InitialContextFactory has been
			// supplied in
			// a jndi.properties file in the classpath, which results in it
			// being picked
			// up automatically by the InitialContext constructor.
			Context context = this.getContext();

			ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
			Destination queue = (Destination) context.lookup("myQueueLookup");

			Connection connection = factory.createConnection(USER, PASSWORD);
			connection.setExceptionListener(new MyExceptionListener());
			connection.start();

			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			MessageProducer messageProducer = session.createProducer(queue);

			long start = System.currentTimeMillis();
			for (int i = 1; i <= count; i++) {
				TextMessage message = session.createTextMessage("Text!");
				messageProducer.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);

				if (i % 100 == 0) {
					System.out.println("Sent message " + i);
				}
			}

			long finish = System.currentTimeMillis();
			long taken = finish - start;
			System.out.println("Sent " + count + " messages in " + taken + "ms");

			connection.close();
		} catch (Exception exp) {
			System.out.println("Caught exception, exiting.");
			exp.printStackTrace(System.out);
			System.exit(1);
		}
	}

	public static void main(String[] args) throws Exception {
		int count = DEFAULT_COUNT;
		if (args.length == 0) {
			System.out.println("Sending up to " + count + " messages.");
			System.out
					.println("Specify a message count as the program argument if you wish to send a different amount.");
		} else {
			count = Integer.parseInt(args[0]);
			System.out.println("Sending up to " + count + " messages.");
		}
		Sender sender=new Sender();
		sender.publish(count);

	}

	private static class MyExceptionListener implements ExceptionListener {
		@Override
		public void onException(JMSException exception) {
			System.out.println("Connection ExceptionListener fired, exiting.");
			exception.printStackTrace(System.out);
			System.exit(1);
		}
	}
}

(3) Receiver.java的程式碼

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.jms.example.success;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Receiver {
	private static final String USER = "guest";
	private static final String PASSWORD = "guest";
	private static final int DEFAULT_COUNT = 10;

	private Context getContext() {
		InitialContext context = null;
		try {
			InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties");
			Properties properties = new Properties();
			properties.load(resourceAsStream);
			context = new InitialContext(properties);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return context;

	}

	public void receiveMessage(int count) {
		try {
			// The configuration for the Qpid InitialContextFactory has been
			// supplied in
			// a jndi.properties file in the classpath, which results in it
			// being picked
			// up automatically by the InitialContext constructor.

			Context context = this.getContext();//new InitialContext();

			ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
			Destination queue = (Destination) context.lookup("myQueueLookup");

			Connection connection = factory.createConnection(USER, PASSWORD);
			connection.setExceptionListener(new MyExceptionListener());
			connection.start();

			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			MessageConsumer messageConsumer = session.createConsumer(queue);

			long start = System.currentTimeMillis();

			int actualCount = 0;
			boolean deductTimeout = false;
			int timeout = 1000;
			for (int i = 1; i <= count; i++, actualCount++) {
				Message message = messageConsumer.receive(timeout);
				if (message == null) {
					System.out.println("Message " + i + " not received within timeout, stopping.");
					deductTimeout = true;
					break;
				}
				if (i % 100 == 0) {
					System.out.println("Got message " + i);
				}
				System.out.println("The message:"+message.getJMSDestination().toString());
			}

			long finish = System.currentTimeMillis();
			long taken = finish - start;
			if (deductTimeout) {
				taken -= timeout;
			}
			System.out.println("Received " + actualCount + " messages in " + taken + "ms");

			connection.close();
		} catch (Exception exp) {
			System.out.println("Caught exception, exiting.");
			exp.printStackTrace(System.out);
			System.exit(1);
		}
	}

	public static void main(String[] args) throws Exception {
		int count = DEFAULT_COUNT;
		if (args.length == 0) {
			System.out.println("Consuming up to " + count + " messages.");
			System.out.println(
					"Specify a message count as the program argument if you wish to consume a different amount.");
		} else {
			count = Integer.parseInt(args[0]);
			System.out.println("Consuming up to " + count + " messages.");
		}
		Receiver rec=new Receiver();
		rec.receiveMessage(count);
	}

	private static class MyExceptionListener implements ExceptionListener {
		@Override
		public void onException(JMSException exception) {
			System.out.println("Connection ExceptionListener fired, exiting.");
			exception.printStackTrace(System.out);
			System.exit(1);
		}
	}
}