1. 程式人生 > >使用 ActiveMQ 中介軟體 實現訊息傳遞可以實現QQ聊天

使用 ActiveMQ 中介軟體 實現訊息傳遞可以實現QQ聊天

1.這是使用ActiveMQ 中介軟體的jar包 activemq-all-5.4.2.jar 來實現的訊息傳送與接受的Demo(完全原創的哈..):
2.請大家多多指教:
3.
4./**
5. * 啟動ActiveMQ內建服務
6. *
7. * @author WLei 2011-4-12
8. */
9.public class ActiveMQConfigBean {
10.
11. /**
12. * 啟動ActiveMQ服務
13. *
14. * @return
15. * @author WLei 2011-4-12
16. */
17. public BrokerService getBrokerService() {
18. try {
19. BrokerService brokerService = new BrokerService();
20. brokerService.addConnector("tcp://localhost:0");
21. brokerService.start();
22. return brokerService;
23. } catch (Exception e) {
24. e.printStackTrace();
25. }
26. return null;
27. }
28.
29. /**
30. * 建立ActiveMQ連線工廠
31. *
32. * @return
33. * @author WLei 2011-4-12
34. */
35. public ActiveMQConnectionFactory createActiveMQConnectionFactory() {
36. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
37. return activeMQConnectionFactory;
38. }
39.
40.}
41.
42.
43.
44.
45./**
46. * 監聽生成類(監聽可以寫成多執行緒)
47. *
48. * @author WLei Apr 20, 2011
49. */
50.public class GenerateListener {
51.
52. /**
53. * 生成主題型別的監聽
54. *
55. * @param activeMQConnectionFactory
56. * @param topicName
57. * @author WLei Apr 20, 2011
58. */
59. public void getTopicListener(
60. ActiveMQConnectionFactory activeMQConnectionFactory,
61. String topicName) {
62.
63.
64. try {
65.
66. //生成一個連線
67. TopicConnection topicConnection = activeMQConnectionFactory
68. .createTopicConnection();
69. //建立一個會話
70. TopicSession topicSession = topicConnection.createTopicSession(
71. false, TopicSession.AUTO_ACKNOWLEDGE);
72. //建立一個主題
73. Topic topic = topicSession.createTopic(topicName);
74.
75. //建立主題訂閱者
76. TopicSubscriber topicSubscriber = topicSession
77. .createSubscriber(topic);
78.
79. //新建一個訂閱者的監聽
80. topicSubscriber.setMessageListener(new MessageListener() {
81.
82. //實現MessageListener的onMessage 方法
83. @Override
84. public void onMessage(Message msg) {
85.
86. //判定獲取到的訊息型別
87. if (msg instanceof TextMessage) {
88. TextMessage textMessage = (TextMessage) msg;
89. try {
90. //列印收到的訊息
91. System.out.println("主題模式監聽到:"
92. + textMessage.getText());
93. } catch (JMSException e1) {
94. e1.printStackTrace();
95. }
96. try {
97. msg.acknowledge();
98. } catch (JMSException e) {
99. e.printStackTrace();
100. }
101. }
102. }
103. });
104.
105. // 啟動監聽..(這其實是個執行緒)
106. topicConnection.start();
107. } catch (JMSException e) {
108.
109. e.printStackTrace();
110. }
111. }
112.
113. /**
114. * 生成佇列形式的監聽
115. *
116. * @param activeMQConnectionFactory
117. * @param queueName
118. * @author WLei Apr 20, 2011
119. */
120. public void getQueueListener(
121. ActiveMQConnectionFactory activeMQConnectionFactory,
122. String queueName) {
123. try {
124. // 建立一個連線
125. QueueConnection connection = activeMQConnectionFactory
126. .createQueueConnection();
127.
128. //建立一個佇列訊息會話
129. QueueSession queueSession = connection.createQueueSession(false,
130. QueueSession.AUTO_ACKNOWLEDGE);
131.
132. //建立一個佇列
133. Queue queue = queueSession.createQueue(queueName);
134.
135. //建立一個接受者
136. QueueReceiver receiver = queueSession.createReceiver(queue);
137.
138. //訊息監聽
139. receiver.setMessageListener(new MessageListener() {
140. @Override
141. public void onMessage(Message msg) {
142. try {
143. if (msg instanceof TextMessage) {
144. TextMessage textMessage = (TextMessage) msg;
145. System.out.println("佇列模式監聽到:"
146. + textMessage.getText());
147. }
148. msg.acknowledge();
149. } catch (JMSException e) {
150. e.printStackTrace();
151. }
152. }
153. });
154. connection.start();
155. } catch (JMSException e) {
156. e.printStackTrace();
157. }
158. }
159.
160.}
161.
162.
163.
164.
165.
166.
167./**
168. * 傳送生成類
169. *
170. * @author WLei Apr 20, 2011
171. */
172.public class GenerateSender {
173.
174. /**
175. * 建立一個傳送主題訊息的方法(是個簡單的例子沒有考慮效能而進行拆分)
176. *
177. * @param activeMQConnectionFactory
178. * @param msg
179. * @param topicName
180. * @author WLei Apr 20, 2011
181. */
182. public void topicSender(
183. ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
184. String topicName) {
185. try {
186. // 建立一個主題訊息的連線
187. TopicConnection topicConnection = activeMQConnectionFactory
188. .createTopicConnection();
189. // 建立一個主題會話
190. TopicSession topicSession = topicConnection.createTopicSession(
191. false, TopicSession.AUTO_ACKNOWLEDGE);
192. // 建立一個主題
193. Topic topic = topicSession.createTopic(topicName);
194. // 建立一個該主題釋出者
195. TopicPublisher topicPublisher = topicSession.createPublisher(topic);
196.
197. // publisher.setDeliveryMode(DeliveryMode.PERSISTENT);//設定訊息模式,有持久與非持久的
198. // publisher.setTimeToLive(3*24*60*60*1000);//生存時間
199.
200. // 建立一個文字訊息
201. TextMessage textMessage = topicSession.createTextMessage();
202. // 設定文字訊息內容
203. textMessage.setText(msg);
204. // 釋出訊息
205. topicPublisher.publish(textMessage);
206. // 關閉連線
207. topicConnection.close();
208. } catch (JMSException e) {
209. e.printStackTrace();
210. }
211. }
212.
213. /**
214. * 建立一個傳送佇列訊息的方法
215. *
216. * @param activeMQConnectionFactory
217. * @param msg
218. * @param queueName
219. * @author WLei Apr 20, 2011
220. */
221. public void queueSender(
222. ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
223. String queueName) {
224. try {
225. // 建立一個連線
226. QueueConnection queueConnection = activeMQConnectionFactory
227. .createQueueConnection();
228. // 建立一個會話
229. QueueSession queueSession = queueConnection.createQueueSession(
230. false, QueueSession.AUTO_ACKNOWLEDGE);
231. // 建立一個佇列
232. Queue queue = queueSession.createQueue(queueName);
233. // 建立一個該佇列的傳送者
234. QueueSender queueSender = queueSession.createSender(queue);
235. // 使用queueSession建立一個文字訊息
236. TextMessage textMessage = queueSession.createTextMessage();
237. // 設定該文字訊息的文字內容
238. textMessage.setText(msg);
239. // 傳送該文字訊息
240. queueSender.send(textMessage);
241. // 關閉連線
242. queueConnection.close();
243. } catch (JMSException e) {
244. e.printStackTrace();
245. }
246. }
247.
248.}
249.
250.
251.
252./**
253. * 傳送資訊
254. *
255. * @author WLei 2011-4-13
256. */
257.public class Test {
258. public static void main(String[] args) {
259.
260. String topicMsg = "this is a message of topic..";
261. String queueMsg = "this is a message of queue..";
262.
263. // 例項化一個配置bean
264. ActiveMQConfigBean activeMQConfig = new ActiveMQConfigBean();
265.
266. // 啟動中介軟體服務
267. activeMQConfig.getBrokerService();
268.
269. // 獲取一個連線工廠
270. ActiveMQConnectionFactory ac = activeMQConfig
271. .createActiveMQConnectionFactory();
272.
273. // 生成一個監聽
274. GenerateListener generateListener = new GenerateListener();
275.
276. String topicChat = "topic";
277. String queueChat = "queue";
278.
279. // 啟動主題監聽
280. generateListener.getTopicListener(ac, topicChat);
281. generateListener.getQueueListener(ac, queueChat);
282.
283. // 建立一個傳送類
284. GenerateSender generateSender = new GenerateSender();
285. // 傳送主題訊息
286. generateSender.topicSender(ac, topicMsg, topicChat);
287. // 傳送佇列訊息
288. generateSender.queueSender(ac, queueMsg, queueChat);
289.
290. }
291.}