java操作RabbitMq
阿新 • • 發佈:2021-11-27
最近學習了一下RabbitMq,記錄一下,主要是java對RabbitMq的一些基本操作,後期會更新springboot整合RabbitMq的文章以及git地址
一、扇形交換機
扇形交換機是最基本的交換機型別,它所能做的事情非常簡單———廣播訊息。扇形交換機會把能接收到的訊息全部發送給繫結在自己身上的佇列。因為廣播不需要“思考”,所以扇形交換機處理訊息的速度也是所有的交換機型別裡面最快的。
1、rabbitmq連結工具類
package com.heyu.rabbitmq.demo.fanout; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: HEYU * @date: 2020/12/24 11:24 * @Description: */ public class RabbitConnUtil { public static Connection getConn() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("heyu"); connectionFactory.setPassword("heyu123"); connectionFactory.setVirtualHost("heyu"); connectionFactory.setPort(5672); return connectionFactory.newConnection(); } }
2、生產者
package com.heyu.rabbitmq.demo.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author: HEYU * @date: 2020/12/24 14:15 * @Description: 生產者 扇形交換機 交換機不做轉存 只做轉發 */ public class FanoutProducer { private static final String EXCHANGE_NAME = "peng_fanout"; public static void main(String[] args) { try { Connection conn = RabbitConnUtil.getConn(); // 建立通道 Channel channel = conn.createChannel(); // 繫結交換機 扇形交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String MSG = "交換機不做轉存 只做轉發"; channel.basicPublish(EXCHANGE_NAME, "", null, MSG.getBytes(StandardCharsets.UTF_8)); channel.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
3、消費者
package com.heyu.rabbitmq.demo.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: HEYU * @date: 2020/12/24 15:21 * @Description: 郵件消費者 */ public class EmailConsumer { private static final String EMAIL_QUEUE_NAME = "peng_email_queue"; private static final String EXCHANGE_NAME = "peng_fanout"; public static void main(String[] args) { try { Connection conn = RabbitConnUtil.getConn(); Channel channel = conn.createChannel(); channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null); // 消費者繫結交換機 主題佇列名稱 交換機名稱 路由鍵 channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("郵件消費者讀取訊息:"+msg); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag,false); } }; channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
package com.heyu.rabbitmq.demo.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: HEYU
* @date: 2020/12/24 14:16
* @Description: 簡訊消費者
*/
public class SmsConsumer {
private static final String SMS_QUEUE_NAME = "peng_sms_queue";
private static final String EXCHANGE_NAME = "peng_fanout";
public static void main(String[] args) {
try {
Connection conn = RabbitConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
// 消費者繫結交換機
channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("簡訊消費者讀取訊息:"+msg);
}
};
channel.basicConsume(SMS_QUEUE_NAME,true,defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
二、主題交換機
主題交換機上的訊息需要攜帶指定規則的routing_key,主題交換機會根據這個規則將資料傳送到對應的(多個)佇列
1、RabbitMq工具連線類
package com.heyu.rabbitmq.demo.fanout;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: HEYU
* @date: 2020/12/24 11:24
* @Description:
*/
public class RabbitConnUtil {
public static Connection getConn() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("heyu");
connectionFactory.setPassword("heyu123");
connectionFactory.setVirtualHost("heyu");
connectionFactory.setPort(5672);
return connectionFactory.newConnection();
}
}
2、生產者
package com.heyu.rabbitmq.demo.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author: HEYU
* @date: 2020/12/24 14:15
* @Description: 釋出訂閱
*/
public class TopicProducer {
private static final String EXCHANGE_NAME = "peng_topic_fanout";
public static void main(String[] args) {
try {
Connection conn = RabbitConnUtil.getConn();
// 建立通道
Channel channel = conn.createChannel();
// 繫結交換機 扇形交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String MSG = "routingKey 訊息轉發 測試";
channel.basicPublish(EXCHANGE_NAME, "log", null, MSG.getBytes(StandardCharsets.UTF_8));
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3、消費者
package com.heyu.rabbitmq.demo.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: Melody
* @date: 2020/12/24 15:21
* @Description: 郵件消費者
*/
public class EmailConsumer {
private static final String EMAIL_QUEUE_NAME = "topic_email_queue";
private static final String EXCHANGE_NAME = "peng_topic_fanout";
public static void main(String[] args) {
try {
Connection conn = RabbitConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
// 消費者繫結交換機 主題佇列名稱 交換機名稱 路由鍵
channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"log");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("郵件消費者讀取訊息:"+msg);
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
};
channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
package com.heyu.rabbitmq.demo.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: Melody
* @date: 2020/12/24 14:16
* @Description: 消費者
*/
public class SmsConsumer {
private static final String SMS_QUEUE_NAME = "topic_sms_queue";
private static final String EXCHANGE_NAME = "peng_topic_fanout";
public static void main(String[] args) {
try {
Connection conn = RabbitConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
// 消費者繫結交換機
channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"log");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("簡訊消費者讀取訊息:"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(SMS_QUEUE_NAME,false,defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}