RabbitMQ 訂閱模式
阿新 • • 發佈:2019-01-09
1.訂閱模式模型
a) 一個生產者 多個消費者
b) 每個消費者都有自己的佇列
c) 生產者沒有直接把訊息傳送給佇列,而是先發送給交換機exchange
d) 每個佇列都要繫結到交換機上
e) 生產者傳送的訊息是經過交換機的,然後到達佇列,就能實現一個訊息被多個消費者消費
應用場景:
比如 註冊之後需要傳送郵件 同時需要傳送簡訊
a) 一個生產者 多個消費者
b) 每個消費者都有自己的佇列
c) 生產者沒有直接把訊息傳送給佇列,而是先發送給交換機exchange
d) 每個佇列都要繫結到交換機上
e) 生產者傳送的訊息是經過交換機的,然後到達佇列,就能實現一個訊息被多個消費者消費 應用場景: 比如 註冊之後需要傳送郵件 同時需要傳送簡訊
生產者
package com.ithzk.rabbitmq.ps; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/10 */ public class Send { private final static String EXCHANGER_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 分發 channel.exchangeDeclare(EXCHANGER_NAME,"fanout"); String msg = "hello exchange"; channel.basicPublish(EXCHANGER_NAME,"",null,msg.getBytes()); System.out.println("Send msg"+msg); channel.close(); connection.close(); } }
1訊息丟失了,因為交換機沒有儲存的能力,rabbitMQ中只有佇列有儲存能力,此時還沒有佇列繫結到該交換機上,所以資料丟失了。
消費者1
package com.ithzk.rabbitmq.ps; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.*; import javax.sound.midi.Soundbank; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/10 */ public class Recv1 { private static final String QUEUE_NAME="test_queue_fanout_email"; private final static String EXCHANGER_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連線 Connection connection = RabbitMQConnectionUtils.getConnection(); //從連線中獲取頻道 final Channel channel = connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //繫結佇列到交換機 轉發器 channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,""); //保證一次只發一個 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); System.out.println("[Consumer 1 start]"); } }
消費者2
package com.ithzk.rabbitmq.ps;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/10
*/
public class Recv2 {
private static final String QUEUE_NAME="test_queue_fanout_sms";
private final static String EXCHANGER_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連線
Connection connection = RabbitMQConnectionUtils.getConnection();
//從連線中獲取頻道
final Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//繫結佇列到交換機 轉發器
channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,"");
//保證一次只發一個
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
System.out.println("[Consumer 2 start]");
}
}
和交換機繫結的佇列都會收到訊息
轉載至:https://blog.csdn.net/u013985664/article/details/79512747