activemq queue開發(持久化方式)
公司要求activemq通訊,我經過三天的努力,今天算是有所前進。現將我對activemq的認識總結如下:
要使用activemq進行通訊,就必須開啟一個broker, 他可以理解為管理通訊連線的東東。關於broker的啟動,有兩種方式:一是activemq檔案內的bin/activemq.bat檔案。這種啟動方式比較適合多使用者的開發。另一種就是在我們的java程式碼呼叫activemq相關的類來構造並啟動brokerService。
第二個問題就是資訊的持久化問題,這個要達到的最終效果是隻要資訊傳送成功,但沒有被消費掉,不論出現什麼情況(最終導致activemq關閉),當activemq重新啟動時沒有被消費的資訊仍然存在與訊息佇列裡。
要實現這個效果需要兩步:一是activemq在訊息被消費時要存進檔案或資料庫,而不是在記憶體裡。二是傳送到訊息要指明實現持久化。第一步的實現有多個方式(具體o就不說了),但大體上就是存進log檔案和存進本地資料庫兩種。關於activemq檔案配置的實現o就不提了。o要提的是通過java程式碼來實現資料的持久化。
程式碼如下:
BrokerService broker=new BrokerService();
broker.setBrokerName(brokerName); //設定資訊持久化
//定義datasource
DataSource orclds=(DataSource) new BasicDataSource();
((BasicDataSource)orclds).setPoolPreparedStatements(true);
((BasicDataSource)orclds).setMaxActive(200);
((BasicDataSource)orclds).setDriverClassName(" ");//資料庫驅動
((BasicDataSource)orclds).setUrl(" ");//資料庫地址
((BasicDataSource)orclds).setUsername("×××");//資料庫使用者名稱
((BasicDataSource)orclds).setPassword("×××");//資料庫使用者密碼
//定義JDBCPersistenceAdapter
PersistenceAdapter jdbcperAdapter=new JDBCPersistenceAdapter();
((JDBCPersistenceAdapter)jdbcperAdapter).setBrokerService(broker);
((DataSourceSupport)jdbcperAdapter).setDataSource(orclds);
broker.setPersistenceAdapter(jdbcperAdapter);
broker.setPersistent(true);
broker.addConnector(" ");//新增activemq連線
broker.start();
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
如果要使用的是mysql資料庫可能還有幾點細節要注意。
在訊息傳送端,要設定訊息生產者為Persistence的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//訊息持久化
。
當然,還可以通過把activemq檔案拿到我們本地工程下來配置:
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/activemq.xml"));
關於activemq獲取連線傳送訊息的方式:一個是工廠模式
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.send(messagep);
另一個是jndi方式:
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
//設定持久方式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
關於訊息的管理activemq提供了三個message類TextMessage,MapMessage,ObjectMessage
extmessage:
Message testMessage = jmsSession.createMessage();
//釋出重新整理文章訊息
testMessage.setStringProperty("RefreshArticleId", "2046");
producer.send(testMessage);
//釋出重新整理帖子訊息
testMessage.clearProperties();
testMessage.setStringProperty("RefreshThreadId", "331");
mapmessage:
MapMessage messagep = session.createMapMessage();<BR> //構造訊息頭 <BR> //messagep.setStringProperty("SENSE", "BTNM 3.0");<BR> //messagep.setStringProperty("CLASS"," ComputerSystem");<BR> //構造訊息體<BR> //messagep.setString("CPULOAD:NO=0, ID=182910293","0.1");
關於object的o還沒有研究。