1. 程式人生 > >PSQueue隊列操作

PSQueue隊列操作

支持 程序 queue隊列 HR 列名 用戶 thread 超過 客戶端

  隊列是一種特殊的線性表,特殊之處在於它只允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作,和棧(FILO,First In Last Out,先進後出)屬於線性表一樣,隊列也是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭,即FIFO(First In First Out,先進先出)。

  高並發(High Concurrency)是互聯網分布式系統架構設計中必須考慮的因素之一,它通常是指,通過設計保證系統能夠同時並行處理很多請求。  

  在移動互聯網高速發展的時代,各種電商平臺的搶購業務變得越來越火爆,搶購業務所帶來的高並發問題值得我們去探索,主要涉及的方面包括處理和響應速度、數據的一致性等。搶購開放的一瞬間,可能有成千上萬的下訂單請求發送到服務器去處理,如果只是簡單的請求處理響應方式,不做任何處理,導致的結果很可能是很多客戶很長時間得不到響應,根本不知道自己是否下訂單成功,或者下訂單的數量已經超過了商品的數量,這就導致了超發的問題。

  

  高並發有諸多的解決方案,引入隊列是其中一種。其主要用來減輕服務器壓力負載,提高程序運行的穩定性。

  隊列(Queue)又稱先進先出表(First In First Out),即先進入隊列的元素,先從隊列中取出。加入元素的一頭叫隊頭,取出元素的一頭叫隊尾。利用消息隊列可以很好地異步處理數據傳送和存儲,當你頻繁地向數據庫中插入數據、頻繁地向搜索引擎提交數據,就可采取消息隊列來異步插入。另外,還可以將較慢的處理邏輯、有並發數量限制的處理邏輯,通過消息隊列放在後臺處理,例如FLV視頻轉換、發送手機短信、發送電子郵件等。

  如要支持多臺服務器,則需要更強的隊列機制,如ActiveMQ,kafka等。在此,我們詳解一種純

java實現的隊列機制PSQueue。

  PSQueue Server 在使用中具有以下特征:

 1.非常簡單,基於 HTTP GET/POST 協議。

  PHPJavaPerlShellPythonRuby等支持HTTP協議的編程語言均可調用。

 2.完善的JMX管理接口,所有方法全部可以由JMX來管理。為了安全管理,使用PSQueue需要口令權限。

 3.每個隊列支持任意多消費者。

 4.非常快速,入隊列、出隊列速度超過40000/秒。

 5.高並發,支持5K以上的並發連接。

 6.支持多隊列。

 7.隊列個數無限制,只要系統的磁盤空間夠用(

缺省單個隊列占用磁盤空間是2G)

 8.低內存消耗,海量數據存儲,存儲幾十GB的數據只需不到200MB的物理內存緩沖區。

 9.可以實時查看指定隊列狀態(未讀隊列數量)。

 10.可以查看指定隊列、指定消費者的內容,包括未出、已出的隊列內容。

 11.查看隊列內容時,支持多字符集編碼。

  12.創新設計,比如,消費者可以故意倒回到老的偏移量再次消費數據。這雖然違反了隊列的常見約定,但被證明是許多消費者的基本特征。

  接下來,我們詳細看下PSQueue的具體使用。

  首先,我們要在電腦上安裝與開啟PSQueue服務器。

  切換到PSQueue的bin文件夾下,在DOS窗口中執行下述命令:

#安裝psqueue-server
psqueue.bat install

#開啟psqueue服務
psqueue.bat start

  其次,我們先通過操作字符串數據來查看PSQueue的基本操作:

package com.itszt.test1;
import wjw.psqueue.client.PSQueueClient;
import wjw.psqueue.msg.*;
/**
 * PSQueue的基本用法
* 在使用前,需要先導入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar兩個jar包,並添加依賴 */ public class Test { //定義隊列名稱,訂閱者名稱,以及要連接的隊列服務器的用戶名和密碼 static String queue_name = "szt_queue"; static String sub_name = "szt_sub"; static String username = "admin"; static String pwd = "123456"; public static void main(String[] args) { //定義操作隊列服務器的客戶端 PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000); //判斷指定隊列是否創建 ResQueueStatus status = queueClient.status(queue_name); System.out.println(status.getQueueName()+"--"+status); // System.out.println(status.getStatus().code==0); // System.out.println(ResultCode.SUCCESS.code); // System.out.println(ResultCode.QUEUE_IS_EXIST.code); if (status.getStatus().code==0) { System.out.println("您已創建隊列!---->" + queue_name + "\n請您操作隊列"); } else { System.out.println("您尚未創建隊列---->" + queue_name + "\n接下來為您創建隊列"); //1.創建一個隊列 ResultCode rest = queueClient.createQueue(queue_name, 10000000L, username, pwd); System.out.println(rest.code); if (rest.code == ResultCode.SUCCESS.code) { System.out.println("隊列創建成功!--->" + rest.toString()); } else { System.out.println("隊列創建失敗!--->" + rest.toString()); } } //2.向隊列中放數據 /*ResAdd resAdd = queueClient.add(queue_name, "Hello,Queue!" + Math.random() * 1000); if (resAdd.status.code == ResultCode.SUCCESS.code) { System.out.println("插入數據成功!--->" + resAdd); } else { System.out.println("插入數據失敗!--->" + resAdd); }*/ //3.先判斷一個指定的訂閱者是否存在,若不存在則創建,然後通過該訂閱者來消費隊列中的數據 ResSubStatus statusForSub = queueClient.statusForSub(queue_name, sub_name); System.out.println(statusForSub); // System.out.println(ResultCode.SUB_IS_EXIST.code+"---"+ResultCode.SUB_NOT_EXIST.code); if (statusForSub.getStatus().code==0) { System.out.println("您指定的用戶--" + sub_name + "--已存在\n請通過該訂閱者消費隊列中的數據"); } else { System.out.println("================"); System.out.println("您指定的用戶--" + sub_name + "--不存在\n請創建該用戶"); ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd); if (resultCode.code == ResultCode.SUCCESS.code) { System.out.println("用戶" + sub_name + "創建成功!\n請通過該用戶消費隊列中的數據"); } else { System.out.println("用戶創建失敗,請聯系管理員!"); } System.out.println("=================="); } //通過該訂閱者消費隊列中的數據 /*ResData resData = queueClient.poll(queue_name, sub_name); if (resData.status.code == ResultCode.SUCCESS.code) { System.out.println("消費成功!--->" + resData.toString()); } else if (resData.status.code == ResultCode.ALL_MESSAGE_CONSUMED.code) { System.out.println("隊列中的數據已經全部消費完,請在生產後繼續消費!"); } else { System.out.println("resData = " + resData); System.out.println("消費數據失敗,請聯系管理員!"); }*/ //4.移除某個隊列 /*ResultCode remQueue = queueClient.removeQueue(queue_name, username, pwd); if(remQueue.code==ResultCode.SUCCESS.code){ System.out.println("隊列移除成功!"); }else{ System.out.println("隊列移除失敗,請聯系管理員!"); }*/ //5.移除訂閱者 /*ResultCode remSub = queueClient.removeSub(queue_name, sub_name, username, pwd); if(remSub.code==ResultCode.SUCCESS.code){ System.out.println("訂閱者移除成功!"); }else{ System.out.println("訂閱者移除失敗,請聯系管理員!"); }*/ //6.查看隊列中某個位置的數據 ResData resData = queueClient.view(queue_name, 0); if(resData.status.code==ResultCode.SUCCESS.code){ System.out.println("成功:" + resData.toString()); }else{ System.out.println("失敗:" + resData.toString()); } //7.列出全部隊列 ResList resList = queueClient.queueNames(); System.out.println("全部隊列:"+resList.data); //8.列出全部訂閱者 ResList subNames = queueClient.subNames(queue_name); System.out.println(queue_name+"隊列的全部用戶:" + subNames.data); //9.重置隊列 /*ResultCode resultCode = queueClient.resetQueue(queue_name, username, pwd); if(resultCode.code==ResultCode.SUCCESS.code){ System.out.println(queue_name+"隊列重置成功!"); }else{ System.out.println(queue_name+"隊列重置失敗!"); }*/ } }

  接下來,我們再操作自定義類型的對象數據:

package com.itszt.test2;

import com.google.gson.Gson;
import wjw.psqueue.client.PSQueueClient;
import wjw.psqueue.msg.*;

import java.util.Date;

/**
 * 測試訂單操作
 * 在原先導入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar兩個jar包後,
 * 再導入gson-2.7.jar包,並均對其添加依賴
 */
public class Test {
    static String queue_name = "szt_queue";
    static String sub_name = "szt_sub";
    static String username = "admin";
    static String pwd = "123456";

    static PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000);

    static Gson gson=new Gson();

    public Test(){
        //查詢指定隊列是否存在,不存在則創建
        ResQueueStatus rqs = queueClient.status(queue_name);
        if(rqs.status.code== ResultCode.SUCCESS.code){
            System.out.println(queue_name+"對列已存在");
        }else{
            System.out.println(queue_name+"對列不存在,將創建隊列");
            ResultCode resultCode = queueClient.createQueue(queue_name, 10000000L, username, pwd);
            if(resultCode.code==ResultCode.SUCCESS.code){
                System.out.println(queue_name+"隊列已創建成功!");
            }else{
                System.out.println(queue_name+"隊列創建失敗!請聯系管理員");
            }
        }
        //查詢指定用戶是否存在,不存在則創建
        ResSubStatus resSubStatus = queueClient.statusForSub(queue_name, sub_name);
        if(resSubStatus.getStatus().code==0){
            System.out.println(sub_name+"用戶已存在!");
        }else{
            System.out.println(sub_name+"用戶不存在!請創建用戶!");
            ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd);
            if (resultCode.code== ResultCode.SUCCESS.code) {
                System.out.println(sub_name+"用戶創建成功!");
            }else{
                System.out.println(sub_name+"用戶創建失敗!");
            }
        }
    }

    public static void main(String[] args) {
//        queueClient.resetQueue(queue_name,username,pwd);
        new AddThread().start();
        new AddThread().start();
        new PollThread().start();
        new PollThread().start();
        new PollThread().start();
        new PollThread().start();
        new AddThread().start();
        new AddThread().start();
    }

    //生產者線程
    static class AddThread extends Thread{
        private static int id=1;
        @Override
        public void run() {
            //生產1000條數據
            for(int i=0;i<1000;i++){
                OrderRequest order=new OrderRequest(++id, "張三的訂單" + id, new Date().toString());
                ResAdd resAdd = queueClient.add(queue_name, gson.toJson(order));
                if(resAdd.status.code==ResultCode.SUCCESS.code){
                    System.out.println("訂單"+order.getOrderName()+"放入隊列成功!");
                }else{
                    System.out.println("訂單"+order.getOrderName()+"放入隊列失敗!");
                }
            }
        }
    }

    //消費者線程
    static class PollThread extends Thread{
        @Override
        public void run() {
            while (true){
                ResData resData = queueClient.poll(queue_name, sub_name);
                if(resData.status.code==ResultCode.QUEUE_POLL_ERROR.code){
                    System.out.println("隊列中的數據取完了!");
                    break;
                }else{
                    OrderRequest orderFromJson = gson.fromJson(resData.getData(), OrderRequest.class);
                    if(orderFromJson==null){
                        System.out.println("隊列中的數據取完了!");
                        break;
                    }
                    System.out.println("取出一個數據:"+orderFromJson);
                }
            }
        }
    }
}

  到此,PSQueue的演示操作基本完畢,希望對您有所幫助!

PSQueue隊列操作