RabbitMQ學習之hello world模型
阿新 • • 發佈:2020-12-21
不使用交換機,生產者和消費者直接通過佇列進行訊息的傳遞,一個佇列只有一個消費者。
原生書寫MQ:
生產者程式碼:
//配置連線工廠引數
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setHost("192.168.216.200" );
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("my");
//通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//通過連線建立一個管道
Channel channel = connection.createChannel();
//管道繫結一個佇列
channel.queueDeclare ("queue1",true,false,false,null);
/*
queue:佇列名稱,durable:是否持久化,exclusive:是否獨佔channel,autoDelete:自動刪除,arguments:建立佇列攜帶的引數
*/
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String body="你好" ;
//通過管道傳送訊息
channel.basicPublish("","queue1",null,body.getBytes());
/*
exchange:交換機名稱,這裡沒有使用交換機,名稱為""
routingKey:使用交換機才會用到,這裡就是佇列名稱
props:設定訊息的屬性
body:訊息內容
*/
channel.close();
connection.close();
消費者程式碼:
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setHost("192.168.216.200");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("my");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//處理監聽到的訊息
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body));
}
};
//監聽佇列中的訊息,並用重寫的consumer進行資訊消費
channel.basicConsume("queue1",true, consumer);
//queue1:堅挺的佇列名稱
//true:自動確認
//comsumer:重寫的訊息處理物件
//消費者因為要一直監聽,所以不需要釋放資源
//channel.close();
//connection.close();
總結
不管是生產者還是消費者,都是通過管道與佇列繫結的。
生產者:通過管直接向佇列傳送訊息時,交換機的名稱為"",routingKey就是佇列名稱,傳送完訊息後釋放資源
消費者:也是通過管道進行佇列的監聽,需要傳入一個處理訊息的consumer物件,消費者需要監聽佇列,不需要釋放資源
spring整合MQ
rabbitMQ配置檔案
rabbitmq.host=192.168.216.200
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=my
消費者
在xml檔案中配置:
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定義connectionFactory-->
<!--publisher-returns="true"開啟訊息返回
publisher-confirms="true"開啟訊息確認-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
password="${rabbitmq.password}"
username="${rabbitmq.username}"
port="${rabbitmq.port}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true"
publisher-confirms="true"
/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義訊息操作物件:rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
在xml檔案中宣告一個佇列
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
java程式碼中
@Autowired
RabbitTemplate rabbitTemplate;//訊息操作物件
@Autowired
RabbitAdmin rabbitAdmin;//管理物件
//兩個物件在xml檔案中宣告
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("","spring_queue","來自spring_queue的訊息");
//直接向佇列傳送訊息
//交換機名稱:""
//佇列名稱:"spring_queue"
//訊息內容
}
消費者
消費者需要實現MessageListener介面並實現方法對訊息進行處理:
public class Sping_mq_listener implements MessageListener {
public void onMessage(Message message) {
System.out.println("onMessage");
System.out.println(new String(message.getBody()));
}
}
xml檔案
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定義connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
password="${rabbitmq.password}"
username="${rabbitmq.username}"
port="${rabbitmq.port}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="sping_mq_listener" class="MyMessageListener.Sping_mq_listener"/>
<!--監聽容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
<!--像容器中放入定義的監聽器。queue-names:監聽的佇列名-->
<rabbit:listener ref="sping_mq_listener" queue-names="spring_queue"/>
</rabbit:listener-container>
<!--包掃描:實現的消費者監聽物件-->
<context:component-scan base-package="MyMessageListener"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>