PSQueue隊列操作
隊列是一種特殊的線性表,特殊之處在於它只允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作,和棧(FILO,First In Last Out,先進後出)屬於線性表一樣,隊列也是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭,即FIFO(First In First Out,先進先出)。
高並發(High Concurrency)是互聯網分布式系統架構設計中必須考慮的因素之一,它通常是指,通過設計保證系統能夠同時並行處理很多請求。
在移動互聯網高速發展的時代,各種電商平臺的搶購業務變得越來越火爆,搶購業務所帶來的高並發問題值得我們去探索,主要涉及的方面包括處理和響應速度、數據的一致性等。搶購開放的一瞬間,可能有成千上萬的下訂單請求發送到服務器去處理,如果只是簡單的請求處理響應方式,不做任何處理,導致的結果很可能是很多客戶很長時間得不到響應,根本不知道自己是否下訂單成功,或者下訂單的數量已經超過了商品的數量,這就導致了超發的問題。
高並發有諸多的解決方案,引入隊列是其中一種。其主要用來減輕服務器壓力負載,提高程序運行的穩定性。
隊列(Queue)又稱先進先出表(First In First Out),即先進入隊列的元素,先從隊列中取出。加入元素的一頭叫“隊頭”,取出元素的一頭叫“隊尾”。利用消息隊列可以很好地異步處理數據傳送和存儲,當你頻繁地向數據庫中插入數據、頻繁地向搜索引擎提交數據,就可采取消息隊列來異步插入。另外,還可以將較慢的處理邏輯、有並發數量限制的處理邏輯,通過消息隊列放在後臺處理,例如FLV視頻轉換、發送手機短信、發送電子郵件等。
如要支持多臺服務器,則需要更強的隊列機制,如ActiveMQ,kafka等。在此,我們詳解一種純
PSQueue Server 在使用中具有以下特征:
1.非常簡單,基於 HTTP GET/POST 協議。
PHP、Java、Perl、Shell、Python、Ruby等支持HTTP協議的編程語言均可調用。
2.完善的JMX管理接口,所有方法全部可以由JMX來管理。為了安全管理,使用PSQueue需要口令權限。
3.每個隊列支持任意多消費者。
4.非常快速,入隊列、出隊列速度超過40000次/秒。
5.高並發,支持5K以上的並發連接。
6.支持多隊列。
7.隊列個數無限制,只要系統的磁盤空間夠用(
8.低內存消耗,海量數據存儲,存儲幾十GB的數據只需不到200MB的物理內存緩沖區。
9.可以實時查看指定隊列狀態(未讀隊列數量)。
10.可以查看指定隊列、指定消費者的內容,包括未出、已出的隊列內容。
11.查看隊列內容時,支持多字符集編碼。
12.創新設計,比如,消費者可以故意倒回到老的偏移量再次消費數據。這雖然違反了隊列的常見約定,但被證明是許多消費者的基本特征。
接下來,我們詳細看下PSQueue的具體使用。
首先,我們要在電腦上安裝與開啟PSQueue服務器。
切換到PSQueue的bin文件夾下,在DOS窗口中執行下述命令:
#安裝psqueue-server psqueue.bat install #開啟psqueue服務 psqueue.bat start
其次,我們先通過操作字符串數據來查看PSQueue的基本操作:
package com.itszt.test1; import wjw.psqueue.client.PSQueueClient; import wjw.psqueue.msg.*; /** * PSQueue的基本用法
* 在使用前,需要先導入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar兩個jar包,並添加依賴 */ public class Test { //定義隊列名稱,訂閱者名稱,以及要連接的隊列服務器的用戶名和密碼 static String queue_name = "szt_queue"; static String sub_name = "szt_sub"; static String username = "admin"; static String pwd = "123456"; public static void main(String[] args) { //定義操作隊列服務器的客戶端 PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000); //判斷指定隊列是否創建 ResQueueStatus status = queueClient.status(queue_name); System.out.println(status.getQueueName()+"--"+status); // System.out.println(status.getStatus().code==0); // System.out.println(ResultCode.SUCCESS.code); // System.out.println(ResultCode.QUEUE_IS_EXIST.code); if (status.getStatus().code==0) { System.out.println("您已創建隊列!---->" + queue_name + "\n請您操作隊列"); } else { System.out.println("您尚未創建隊列---->" + queue_name + "\n接下來為您創建隊列"); //1.創建一個隊列 ResultCode rest = queueClient.createQueue(queue_name, 10000000L, username, pwd); System.out.println(rest.code); if (rest.code == ResultCode.SUCCESS.code) { System.out.println("隊列創建成功!--->" + rest.toString()); } else { System.out.println("隊列創建失敗!--->" + rest.toString()); } } //2.向隊列中放數據 /*ResAdd resAdd = queueClient.add(queue_name, "Hello,Queue!" + Math.random() * 1000); if (resAdd.status.code == ResultCode.SUCCESS.code) { System.out.println("插入數據成功!--->" + resAdd); } else { System.out.println("插入數據失敗!--->" + resAdd); }*/ //3.先判斷一個指定的訂閱者是否存在,若不存在則創建,然後通過該訂閱者來消費隊列中的數據 ResSubStatus statusForSub = queueClient.statusForSub(queue_name, sub_name); System.out.println(statusForSub); // System.out.println(ResultCode.SUB_IS_EXIST.code+"---"+ResultCode.SUB_NOT_EXIST.code); if (statusForSub.getStatus().code==0) { System.out.println("您指定的用戶--" + sub_name + "--已存在\n請通過該訂閱者消費隊列中的數據"); } else { System.out.println("================"); System.out.println("您指定的用戶--" + sub_name + "--不存在\n請創建該用戶"); ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd); if (resultCode.code == ResultCode.SUCCESS.code) { System.out.println("用戶" + sub_name + "創建成功!\n請通過該用戶消費隊列中的數據"); } else { System.out.println("用戶創建失敗,請聯系管理員!"); } System.out.println("=================="); } //通過該訂閱者消費隊列中的數據 /*ResData resData = queueClient.poll(queue_name, sub_name); if (resData.status.code == ResultCode.SUCCESS.code) { System.out.println("消費成功!--->" + resData.toString()); } else if (resData.status.code == ResultCode.ALL_MESSAGE_CONSUMED.code) { System.out.println("隊列中的數據已經全部消費完,請在生產後繼續消費!"); } else { System.out.println("resData = " + resData); System.out.println("消費數據失敗,請聯系管理員!"); }*/ //4.移除某個隊列 /*ResultCode remQueue = queueClient.removeQueue(queue_name, username, pwd); if(remQueue.code==ResultCode.SUCCESS.code){ System.out.println("隊列移除成功!"); }else{ System.out.println("隊列移除失敗,請聯系管理員!"); }*/ //5.移除訂閱者 /*ResultCode remSub = queueClient.removeSub(queue_name, sub_name, username, pwd); if(remSub.code==ResultCode.SUCCESS.code){ System.out.println("訂閱者移除成功!"); }else{ System.out.println("訂閱者移除失敗,請聯系管理員!"); }*/ //6.查看隊列中某個位置的數據 ResData resData = queueClient.view(queue_name, 0); if(resData.status.code==ResultCode.SUCCESS.code){ System.out.println("成功:" + resData.toString()); }else{ System.out.println("失敗:" + resData.toString()); } //7.列出全部隊列 ResList resList = queueClient.queueNames(); System.out.println("全部隊列:"+resList.data); //8.列出全部訂閱者 ResList subNames = queueClient.subNames(queue_name); System.out.println(queue_name+"隊列的全部用戶:" + subNames.data); //9.重置隊列 /*ResultCode resultCode = queueClient.resetQueue(queue_name, username, pwd); if(resultCode.code==ResultCode.SUCCESS.code){ System.out.println(queue_name+"隊列重置成功!"); }else{ System.out.println(queue_name+"隊列重置失敗!"); }*/ } }
接下來,我們再操作自定義類型的對象數據:
package com.itszt.test2; import com.google.gson.Gson; import wjw.psqueue.client.PSQueueClient; import wjw.psqueue.msg.*; import java.util.Date; /** * 測試訂單操作 * 在原先導入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar兩個jar包後, * 再導入gson-2.7.jar包,並均對其添加依賴 */ public class Test { static String queue_name = "szt_queue"; static String sub_name = "szt_sub"; static String username = "admin"; static String pwd = "123456"; static PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000); static Gson gson=new Gson(); public Test(){ //查詢指定隊列是否存在,不存在則創建 ResQueueStatus rqs = queueClient.status(queue_name); if(rqs.status.code== ResultCode.SUCCESS.code){ System.out.println(queue_name+"對列已存在"); }else{ System.out.println(queue_name+"對列不存在,將創建隊列"); ResultCode resultCode = queueClient.createQueue(queue_name, 10000000L, username, pwd); if(resultCode.code==ResultCode.SUCCESS.code){ System.out.println(queue_name+"隊列已創建成功!"); }else{ System.out.println(queue_name+"隊列創建失敗!請聯系管理員"); } } //查詢指定用戶是否存在,不存在則創建 ResSubStatus resSubStatus = queueClient.statusForSub(queue_name, sub_name); if(resSubStatus.getStatus().code==0){ System.out.println(sub_name+"用戶已存在!"); }else{ System.out.println(sub_name+"用戶不存在!請創建用戶!"); ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd); if (resultCode.code== ResultCode.SUCCESS.code) { System.out.println(sub_name+"用戶創建成功!"); }else{ System.out.println(sub_name+"用戶創建失敗!"); } } } public static void main(String[] args) { // queueClient.resetQueue(queue_name,username,pwd); new AddThread().start(); new AddThread().start(); new PollThread().start(); new PollThread().start(); new PollThread().start(); new PollThread().start(); new AddThread().start(); new AddThread().start(); } //生產者線程 static class AddThread extends Thread{ private static int id=1; @Override public void run() { //生產1000條數據 for(int i=0;i<1000;i++){ OrderRequest order=new OrderRequest(++id, "張三的訂單" + id, new Date().toString()); ResAdd resAdd = queueClient.add(queue_name, gson.toJson(order)); if(resAdd.status.code==ResultCode.SUCCESS.code){ System.out.println("訂單"+order.getOrderName()+"放入隊列成功!"); }else{ System.out.println("訂單"+order.getOrderName()+"放入隊列失敗!"); } } } } //消費者線程 static class PollThread extends Thread{ @Override public void run() { while (true){ ResData resData = queueClient.poll(queue_name, sub_name); if(resData.status.code==ResultCode.QUEUE_POLL_ERROR.code){ System.out.println("隊列中的數據取完了!"); break; }else{ OrderRequest orderFromJson = gson.fromJson(resData.getData(), OrderRequest.class); if(orderFromJson==null){ System.out.println("隊列中的數據取完了!"); break; } System.out.println("取出一個數據:"+orderFromJson); } } } } }
到此,PSQueue的演示操作基本完畢,希望對您有所幫助!
PSQueue隊列操作