1. 程式人生 > >ActiveMQ系列—使用示例(在ActiveMQ中傳遞Stomp訊息)

ActiveMQ系列—使用示例(在ActiveMQ中傳遞Stomp訊息)

下面我們使用ActiveMQ提供的JAVA 客戶端(實際上就是ActiveMQ對JMS規範的實現),向ActiveMQ中的Queue(示例程式碼中將這個Queue命名為’test’)傳送一條Stomp協議訊息,然後再使用JAVA語言的客戶端,從ActiveMQ上接受這條訊息:

1、使用ActiveMQ的API傳送Stomp協議訊息

import java.net.Socket;
import java.util.Date;

import org.apache.activemq.transport.stomp.StompConnection;

// 訊息生產者
public class
TestProducer {
public static void main(String[] args) { try { // 建立Stomp協議的連線 StompConnection con = new StompConnection(); Socket so = new Socket("192.168.1.168", 61613); con.open(so); // 注意,協議版本可以是1.2,也可以是1.1 con.setVersion("1.2"
); // 使用者名稱和密碼,這個不必多說了 con.connect("admin", "admin"); // 以下發送一條資訊(您也可以使用“事務”方式) con.send("/test", "234543" + new Date().getTime()); } catch (Exception e) { e.printStackTrace(System.out); } } }

這裡寫圖片描述

2、使用ActiveMQ的API接收Stomp協議訊息

import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 建立連線
        StompConnection con = new StompConnection();
        Socket so = new Socket("192.168.1.168", 61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");

        String ack = "client";
        con.subscribe("/test", "client");
        // 接受訊息(使用迴圈進行)
        for (;;) {
            StompFrame frame = null;
            try {
                // 注意,如果沒有接收到訊息,
                // 這個消費者執行緒會停在這裡,直到本次等待超時
                frame = con.receive();
            } catch (SocketTimeoutException e) {
                continue;
            }

            // 列印本次接收到的訊息
            System.out.println("frame.getAction() = " + frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String meesage_id = headers.get("message-id");
            System.out.println("frame.getBody() = " + frame.getBody());
            System.out.println("frame.getCommandId() = " + frame.getCommandId());

            // 在ack是client標記的情況下,確認訊息
            if ("client".equals(ack)) {
                con.ack(meesage_id);
            }
        }
    }
}

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

以上分別是使用Activie提供的Stomp協議的訊息生產端和Stomp協議的訊息消費端的程式碼(如果您不清楚Stomp協議的細節,可以參考我另一篇文章:《ActiveMQ系列—訊息協議(Stomp協議)》)。請注意在程式碼片段中,並沒有出現任何一個帶有jms名稱的包或者類——這是因為ActiveMQ為Stomp協議提供的JAVA API在內部進行了JMS規範的封裝。

您可以檢視activemq-stomp中關於協議轉換部分的原始碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級介面:org.apache.activemq.transport.stomp.FrameTranslator來驗證這件事情