1. 程式人生 > >ActiveMQ 入門helloworld

ActiveMQ 入門helloworld

data ext pom tty reat 分享 消息 over 用戶

1.下載安裝ActiveMQ

官網下載地址:http://activemq.apache.org/download.html

ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,我選擇了Windows 版本下進行開發。

技術分享

下載完壓縮包後,直接解壓:

技術分享

目錄:

bin存放的是腳本文件

conf存放的是基本配置文件

data存放的是日誌文件

docs存放的是說明文檔

examples存放的是簡單的實例

lib存放的是activemq所需jar包

webapps用於存放項目的目錄

2.啟動ActiveMQ

可以在命令行中用命令啟動,我這裏就直接鼠標點擊運行腳本啟動了

技術分享

ActiveMQ默認啟動時,啟動了內置的jetty服務器,提供一個用於監控ActiveMQ的admin應用。

admin:http://127.0.0.1:8161/admin/

我們在瀏覽器打開鏈接之後輸入賬號密碼(默認賬號密碼都是admin),類似於訪問tomcat。

技術分享

登陸成功後頁面顯示:

技術分享

ActiveMQ啟動就完成了,關閉直接點擊關閉腳本或用命令管就可以了。

3.不多bb,直接上helloworld代碼

創建一個maven項目,最好是用jdk1.8,我用jdk1.7會報錯,pom.xml依賴如下:

<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
</dependencies>

3.1創建生產者類:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author cjf
* 生產者
*/
public class Producter {

//ActiveMq 的默認用戶名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默認登錄密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的鏈接地址
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

AtomicInteger count = new AtomicInteger(0);
//鏈接工廠
ConnectionFactory connectionFactory;
//鏈接對象
Connection connection;
//事務管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

public void init(){
try {
//創建一個鏈接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//從工廠中創建一個鏈接
connection = connectionFactory.createConnection();
//開啟鏈接
connection.start();
//創建一個事務(這裏通過參數可以設置事務的級別)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}

public void sendMessage(String disname){
try {
//創建一個消息隊列
Queue queue = session.createQueue(disname);
//消息生產者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//創建一條消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是大帥哥,我現在正在生產東西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是大帥哥,我現在正在生產東西!,count:"+num);
//發送消息
messageProducer.send(msg);
//提交事務
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

3.2,創建消費者類:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author cjf
* 消費者
*/
public class Comsumer {

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

ConnectionFactory connectionFactory;

Connection connection;

Session session;

ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();

public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}


public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;

if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

4.運行ActiveMQ項目,進行測試

4.1,生產者生產消息:

package com.iflytek;
/**
*
* @author cjf
* 測試生產者生產消息
*/
public class TestProducter {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestProducter testMq = new TestProducter();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}

private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}

@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

運行結果:

技術分享

4.2,測試消費者消費消息:

package com.iflytek;
/**
*
* @author cjf
* activeMQ測試消費者消費消息
*/
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}

private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}

@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

運行結果:

技術分享

5.ActiveMQ的helloworld就寫完了,更多MQ其他的知識自己再好好學,我也是個菜雞剛看了2天這東西,求關註啊,可以交流一下。

ActiveMQ 入門helloworld