1. 程式人生 > >RabbitMQ--02--RabbitMQ傳遞物件

RabbitMQ--02--RabbitMQ傳遞物件

方案一:物件序列化
其實和hello world類似,只不過增加了一個物件序列化:
序列化就是將一個物件的狀態(各個屬性量)儲存起來,然後在適當的時候再獲得。序列化分為兩大部分:序列化和反序列化。序列化是這個過程的第一部分,將資料分解成位元組流,以便儲存在檔案中或在網路上傳輸。反序列化就是開啟位元組流並重構物件。物件序列化不僅要將基本資料型別轉換成位元組表示,有時還要恢復資料。恢復資料要求有恢復資料的物件例項。

傳送端:

public class Send {
    /**
     * mq通訊的名稱
     */
    private final static String QUEUE_NAME="hello"
; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory connFactory=new ConnectionFactory(); //設定伺服器位置 connFactory.setHost("localhost"); //設定伺服器埠號 //connFactory.setPort(5672); //建立連線 Connection con=connFactory.newConnection(); //建立channel
Channel channel=con.createChannel(); //設定佇列的屬性第一個引數為佇列名。第二個引數為是否建立一個持久佇列,第三個是否建立一個專用的佇列, //第四個引數為是否自動刪除佇列,第五個引數為其他屬性(結構引數) channel.queueDeclare(QUEUE_NAME, false, false, false, null); //String message="hello world"; //建立一個物件 User user=new User(); user.setId(1
); user.setName("dema"); user.setPassword("123"); //將建立的物件序列化後傳遞 //第一個引數為,第二個引數為佇列名。第三個引數為其他屬性。第四個引數為訊息體 channel.basicPublish("",QUEUE_NAME,null,SerializationUtils.serialize(user)); System.out.println("正在傳送訊息:"+user.getId()); //關閉連線 channel.close(); con.close(); } }

接收端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class Receive {

    /**
     * 定義rm通訊的名稱
     */
    private final static String QUEUE_NAME="hello";


    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        // TODO Auto-generated method stub

        ConnectionFactory connFactory=new ConnectionFactory();

        //設定伺服器位置
        connFactory.setHost("localhost");
        //設定埠號
        //connFactory.setPort(15672);
        //連線登入使用者名稱
        //connFactory.setPassword("guest");
        //連線登入密碼
        //connFactory.setUsername("guest");

        //建立連線
        Connection con=connFactory.newConnection();

        //建立channel
        Channel channel=con.createChannel();

        //設定佇列的屬性第一個引數為佇列名。第二個引數為是否建立一個持久佇列,第三個是否建立一個專用的佇列,
                //第四個引數為是否自動刪除佇列,第五個引數為其他屬性(結構引數)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer=new QueueingConsumer(channel);
        //第一個引數為佇列名,第二個引數是否考慮已傳送的訊息,第三個引數為消費物件的介面
        channel.basicConsume(QUEUE_NAME, true, consumer);

        System.out.println("Receiv類正在等待Send類傳送訊息");
        while(true){
            Delivery delivery=consumer.nextDelivery();
            //String message=new String(delivery.getBody()); 
            //將傳遞過來的物件反序列化
            @SuppressWarnings("deprecation")
            User user=(User)SerializationUtils.deserialize(delivery.getBody());
            //System.out.println("Receive類接收到Send類傳送的資訊:"+message);
            System.out.println(user.getName());
        }

        //關閉連線
    }
}

方案二:通過ObjectMapper將物件轉換成JSON資料:
傳送端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Send {
    /**
     * mq通訊的名稱
     */
    private final static String QUEUE_NAME="hello";

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connFactory=new ConnectionFactory();
        //設定伺服器位置
        connFactory.setHost("localhost");
        //設定伺服器埠號
        //connFactory.setPort(5672);
        //建立連線
        Connection con=connFactory.newConnection();
        //建立channel
        Channel channel=con.createChannel();
        //設定佇列的屬性第一個引數為佇列名。第二個引數為是否建立一個持久佇列,第三個是否建立一個專用的佇列,
        //第四個引數為是否自動刪除佇列,第五個引數為其他屬性(結構引數)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //String message="hello world";

        //建立一個物件
        User user=new User();
        user.setId(1);
        user.setName("dema");
        user.setPassword("123");

        //將Java物件匹配JSON結構
        ObjectMapper mapper=new ObjectMapper();
        String message=mapper.writeValueAsString(user);
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("正在傳送訊息:"+user.getId());     
        //關閉連線
        channel.close();
        con.close();
        //SimpleMessageConverter


    }
}

接收端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;


public class Receive {

    /**
     * 定義rm通訊的名稱
     */
    private final static String QUEUE_NAME="hello";


    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        // TODO Auto-generated method stub

        ConnectionFactory connFactory=new ConnectionFactory();

        //設定伺服器位置
        connFactory.setHost("localhost");
        //設定埠號
        //connFactory.setPort(15672);
        //連線登入使用者名稱
        //connFactory.setPassword("guest");
        //連線登入密碼
        //connFactory.setUsername("guest");

        //建立連線
        Connection con=connFactory.newConnection();

        //建立channel
        Channel channel=con.createChannel();

        //設定佇列的屬性第一個引數為佇列名。第二個引數為是否建立一個持久佇列,第三個是否建立一個專用的佇列,
                //第四個引數為是否自動刪除佇列,第五個引數為其他屬性(結構引數)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer=new QueueingConsumer(channel);
        //第一個引數為佇列名,第二個引數是否考慮已傳送的訊息,第三個引數為消費物件的介面
        channel.basicConsume(QUEUE_NAME, true, consumer);

        System.out.println("Receiv類正在等待Send類傳送訊息");
        while(true){

            //將json資料轉成物件
            ObjectMapper mapper=new ObjectMapper();
            Delivery delivery=consumer.nextDelivery();
            String message=new String(delivery.getBody());
            User user=mapper.readValue(message.getBytes("utf-8"),User.class);
            System.out.println(user.getName());
        }

        //關閉連線
    }

}

說明都在註釋中。
附帶遇到的問題及解決辦法:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/ILoggerFactory
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:791)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
..........

Failed to instantiate SLF4J LoggerFactory
Reported exception:
java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at com.rabbitmq.client.impl.AMQConnection.<clinit>(AMQConnection.java:49)
    at com.rabbitmq.client.ConnectionFactory.<init>(ConnectionFactory.java:91)
    at Receive.main(Receive.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 8 more
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at com.rabbitmq.client.impl.AMQConnection.<clinit>(AMQConnection.java:49)
    at com.rabbitmq.client.ConnectionFactory.<init>(ConnectionFactory.java:91)
    at Receive.main(Receive.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 8 more

解決方案:缺少日誌架包
新增三個架包:
log4j-1.2.17.jar
slf4j-api-1.6.4-sources.jar
slf4j-api-1.6.4.jar
slf4j-log4j12-1.7.0-sources.jar
slf4j-log4j12-1.7.0.jar
參考網站:
http://outofmemory.cn/code-snippet/6776/java.lang.NoClassDefFoundError-org-slf4j-LoggerFactory

Exception in thread "main" java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:60)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:900)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:817)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:954)
    at Receive.main(Receive.java:30)

安裝後在瀏覽器中通過url無法訪問
解決方案:
給path變數新增內容,在其後面增加:;%RABBITMQ_SERVER%\sbin (注意前面的分號),然後確定即可
新增環境變數:RABBITMQ_SERVER
環境變數RABBITMQ_SERVER 的值為:D:\My-Softwar-Installed\RabbitMQ Server\rabbitmq_server-
配置環境變數後cmd中輸入
rabbitmq-plugins enable rabbitmq_management
然後執行下面的命令來安裝:
rabbitmq-service stop
rabbitmq-service install
rabbitmq-service start

Exception in thread "main" log4j:WARN No appenders could be found for logger (com.rabbitmq.client.impl.ForgivingExceptionHandler).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:372)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:297)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:900)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:817)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:954)
    at Receive.main(Receive.java:31)

解決方案:
將log4j.properties新增到classpath下
http://blog.csdn.net/huoyin/article/details/41593013

rabbitmq的web管理介面無法使用guest使用者登入
解決方案:由chrome換成360瀏覽器
附demo下載連結:http://download.csdn.net/detail/btwangzhi/9752777