ActiveMq的訊息佇列的簡單使用
阿新 • • 發佈:2019-01-24
安裝
程式碼
- 訊息的傳送者
package com.bonade.mall.mqTest;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;
/**
* 生產者
* @author lmf
*
*/
public class Producter {
// ActiveMq 的預設使用者名稱
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// ActiveMq 的預設登入密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// ActiveMQ 的連結地址
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
AtomicInteger count = new AtomicInteger(0);
// 連結工廠
ConnectionFactory connectionFactory;
// 連結物件
Connection connection;
// 事務管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init() {
try {
// 建立一個連結工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD, BROKEN_URL);
// 從工廠中建立一個連結
connection = connectionFactory.createConnection();
// 開啟連結
connection.start();
// 建立一個事務(這裡通過引數可以設定事務的級別)
session = connection
.createSession(true, Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname) {
try {
// 建立一個訊息佇列
Queue queue = session.createQueue(disname);
// 訊息生產者
MessageProducer messageProducer = null;
if (threadLocal.get() != null) {
messageProducer = threadLocal.get();
} else {
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while (true) {
if(StringUtils.isNotBlank(disname)){
int num = count.getAndIncrement();
// 建立一條訊息
TextMessage msg = session.createTextMessage(Thread
.currentThread().getName()
+ "productor:這是生產者從這裡傳送訊息!,count:" + num);
// 傳送訊息
messageProducer.send(msg);
// 提交事務
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 訊息的消費者
package com.bonade.mall.mqTest;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消費者
* @author lmf
*
*/
public class Comsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 測試程式碼
public class TestMq {
public static void main(String[] args) {
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
// Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
// Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
// Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
// Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable {
Producter producter;
public ProductorMq(Producter producter) {
this.producter = producter;
}
@Override
public void run() {
while (true) {
try {
producter.sendMessage("TEST-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("TEST-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
需要注意先啟動TestMq 訊息的傳送者程式碼 然後啟動TestConsumer程式碼