1. 程式人生 > >rabbitmq演示代碼

rabbitmq演示代碼

cat stack local tst mes sta lse bytes eat

簡單使用:

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Producer代碼 服務端-生產者
* @author cxsz-hp16
* @Title: Sender
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:02
*/
public class Sender {
//消息名
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
send();
}
//發送消息
public static void send(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//設置參數
factory.setHost("localhost");
//創建連接
connection = factory.newConnection();
//創建管道
channel = connection.createChannel();
/**
* 隊列聲明
* 參數:queue:隊列名、durable:是否持久化、exclusive:是否排外、arguments:設置隊列消息什麽時候被刪除
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//設置消息內容
String message = "my first message";
/**
*
* 參數:
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("已發消息:"+message);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* 客戶端-消費者
* @author cxsz-hp16
* @Title: Receiver
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:31
*/
public class Receiver {
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//回調消費消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask {
//消息名
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
send();
}
//發送消息
public static void send(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//設置參數
factory.setHost("localhost");
//創建連接
connection = factory.newConnection();
//創建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,true,false,null);
//設置消息內容
String message = "2.";
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker {
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//回調消費消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//設置一個任務
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任務
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//將字符串轉換為字符數組
for (char ch: task.toCharArray()) {
//當值為.時,阻塞線程來達到耗時的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}



package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask23 {
//消息名
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
send();
}
//發送消息
public static void send(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//設置參數
factory.setHost("localhost");
//創建連接
connection = factory.newConnection();
//創建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//設置消息內容
String message = "2.";
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker3 {
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//創建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

channel.basicQos(1);
//回調消費消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//設置一個任務
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //關閉資源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任務
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//將字符串轉換為字符數組
for (char ch: task.toCharArray()) {
//當值為.時,阻塞線程來達到耗時的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}





rabbitmq演示代碼