1. 程式人生 > 其它 >RabbitMQ學習之hello world模型

RabbitMQ學習之hello world模型

在這裡插入圖片描述
不使用交換機,生產者和消費者直接通過佇列進行訊息的傳遞,一個佇列只有一個消費者。
原生書寫MQ:
生產者程式碼:

//配置連線工廠引數
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setHost("192.168.216.200"
); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("my"); //通過連線工廠建立連線 Connection connection = connectionFactory.newConnection(); //通過連線建立一個管道 Channel channel = connection.createChannel(); //管道繫結一個佇列 channel.queueDeclare
("queue1",true,false,false,null); /* queue:佇列名稱,durable:是否持久化,exclusive:是否獨佔channel,autoDelete:自動刪除,arguments:建立佇列攜帶的引數 */ /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) */ String body="你好"
; //通過管道傳送訊息 channel.basicPublish("","queue1",null,body.getBytes()); /* exchange:交換機名稱,這裡沒有使用交換機,名稱為"" routingKey:使用交換機才會用到,這裡就是佇列名稱 props:設定訊息的屬性 body:訊息內容 */ channel.close(); connection.close();

消費者程式碼:

ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setHost("192.168.216.200");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("my");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //處理監聽到的訊息
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println(new String(body));
            }
        };
        //監聽佇列中的訊息,並用重寫的consumer進行資訊消費
        channel.basicConsume("queue1",true, consumer);
        //queue1:堅挺的佇列名稱
        //true:自動確認
        //comsumer:重寫的訊息處理物件
        //消費者因為要一直監聽,所以不需要釋放資源
        //channel.close();
        //connection.close();

總結
不管是生產者還是消費者,都是通過管道與佇列繫結的。
生產者:通過管直接向佇列傳送訊息時,交換機的名稱為"",routingKey就是佇列名稱,傳送完訊息後釋放資源
消費者:也是通過管道進行佇列的監聽,需要傳入一個處理訊息的consumer物件,消費者需要監聽佇列,不需要釋放資源

spring整合MQ
rabbitMQ配置檔案

rabbitmq.host=192.168.216.200
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=my

消費者
在xml檔案中配置:

<context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!--定義connectionFactory-->
    <!--publisher-returns="true"開啟訊息返回
    publisher-confirms="true"開啟訊息確認-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               password="${rabbitmq.password}"
                               username="${rabbitmq.username}"
                               port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-returns="true"
                               publisher-confirms="true"
    />

    <!--定義管理交換機、佇列-->
    <rabbit:admin connection-factory="connectionFactory"/>
     <!--定義訊息操作物件:rabbitTemplate-->
	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

在xml檔案中宣告一個佇列

<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

java程式碼中

	@Autowired
    RabbitTemplate rabbitTemplate;//訊息操作物件

    @Autowired
    RabbitAdmin rabbitAdmin;//管理物件
    //兩個物件在xml檔案中宣告
    @Test
    public void testHelloWorld(){
        rabbitTemplate.convertAndSend("","spring_queue","來自spring_queue的訊息");
        //直接向佇列傳送訊息
        //交換機名稱:""
        //佇列名稱:"spring_queue"
        //訊息內容
    }

消費者
消費者需要實現MessageListener介面並實現方法對訊息進行處理:

public class Sping_mq_listener implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("onMessage");
        System.out.println(new String(message.getBody()));
    }
}

xml檔案

<context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!--定義connectionFactory-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               password="${rabbitmq.password}"
                               username="${rabbitmq.username}"
                               port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <bean id="sping_mq_listener" class="MyMessageListener.Sping_mq_listener"/>
	<!--監聽容器-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
    	<!--像容器中放入定義的監聽器。queue-names:監聽的佇列名-->
        <rabbit:listener ref="sping_mq_listener" queue-names="spring_queue"/>
        
    </rabbit:listener-container>
    <!--包掃描:實現的消費者監聽物件-->
    <context:component-scan base-package="MyMessageListener"/>
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>