Redis通訊協議
package redis.clients.jedis; import java.io.IOException; import java.util.ArrayList; import java.util.List; import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterException; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; import redis.clients.util.SafeEncoder; public final class Protocol { private static final String ASK_RESPONSE = "ASK"; private static final String MOVED_RESPONSE = "MOVED"; private static final String CLUSTERDOWN_RESPONSE = "CLUSTERDOWN"; public static final String DEFAULT_HOST = "localhost"; public static final int DEFAULT_PORT = 6379; public static final int DEFAULT_SENTINEL_PORT = 26379; public static final int DEFAULT_TIMEOUT = 2000; public static final int DEFAULT_DATABASE = 0; public static final String CHARSET = "UTF-8"; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String SENTINEL_MASTERS = "masters"; public static final String SENTINEL_GET_MASTER_ADDR_BY_NAME = "get-master-addr-by-name"; public static final String SENTINEL_RESET = "reset"; public static final String SENTINEL_SLAVES = "slaves"; public static final String SENTINEL_FAILOVER = "failover"; public static final String SENTINEL_MONITOR = "monitor"; public static final String SENTINEL_REMOVE = "remove"; public static final String SENTINEL_SET = "set"; public static final String CLUSTER_NODES = "nodes"; public static final String CLUSTER_MEET = "meet"; public static final String CLUSTER_RESET = "reset"; public static final String CLUSTER_ADDSLOTS = "addslots"; public static final String CLUSTER_DELSLOTS = "delslots"; public static final String CLUSTER_INFO = "info"; public static final String CLUSTER_GETKEYSINSLOT = "getkeysinslot"; public static final String CLUSTER_SETSLOT = "setslot"; public static final String CLUSTER_SETSLOT_NODE = "node"; public static final String CLUSTER_SETSLOT_MIGRATING = "migrating"; public static final String CLUSTER_SETSLOT_IMPORTING = "importing"; public static final String CLUSTER_SETSLOT_STABLE = "stable"; public static final String CLUSTER_FORGET = "forget"; public static final String CLUSTER_FLUSHSLOT = "flushslots"; public static final String CLUSTER_KEYSLOT = "keyslot"; public static final String CLUSTER_COUNTKEYINSLOT = "countkeysinslot"; public static final String CLUSTER_SAVECONFIG = "saveconfig"; public static final String CLUSTER_REPLICATE = "replicate"; public static final String CLUSTER_SLAVES = "slaves"; public static final String CLUSTER_FAILOVER = "failover"; public static final String CLUSTER_SLOTS = "slots"; public static final String PUBSUB_CHANNELS = "channels"; public static final String PUBSUB_NUMSUB = "numsub"; public static final String PUBSUB_NUM_PAT = "numpat"; public static final byte[] BYTES_TRUE = toByteArray(1); public static final byte[] BYTES_FALSE = toByteArray(0); private Protocol() { // this prevent the class from instantiation } // 傳送二進位制命令 public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command, final byte[]... args) { sendCommand(os, command.getRaw(), args); } /** * <pre> * 傳送命令,遵循redis協議格式 * *<引數數量> CR LF * $<引數 1 的位元組數量> CR LF * <引數 1 的資料> CR LF * ... * $<引數 N 的位元組數量> CR LF * <引數 N 的資料> CR LF * </pre> */ private static void sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args) { try { os.write(ASTERISK_BYTE); os.writeIntCrLf(args.length + 1); os.write(DOLLAR_BYTE); os.writeIntCrLf(command.length); os.write(command); os.writeCrLf(); for (final byte[] arg : args) { os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf(); } } catch (IOException e) { throw new JedisConnectionException(e); } } /** * <pre> * 錯誤回覆只在某些地方出現問題時傳送: 比如說, 當用戶對不正確的資料型別執行命令, 或者執行一個不存在的命令。 * ERR 是一個通用錯誤,而 WRONGTYPE 則是一個更特定的錯誤。 一個客戶端實現可以為不同型別的錯誤產生不同型別的異常, * 或者提供一種通用的方式, 讓呼叫者可以通過提供字串形式的錯誤名來捕捉(trap)不同的錯誤。 * eg: * cli: incr name * server: -ERR value is not an integer or out of range * * 叢集節點不能代理(proxy)命令請求,所以客戶端應該在節點返回 -MOVED 或者 -ASK 轉向(redirection) * 錯誤時,自行將命令請求轉發至其他節點。 * eg: * MOVED 3999 127.0.0.1:6381 * * 當叢集不可用時,所有對叢集的操作做都不可用,收到((error) CLUSTERDOWN The cluster is down)錯誤 * 參考: * http://blog.chinaunix.net/uid-7374279-id-4470290.html * </pre> */ private static void processError(final RedisInputStream is) { String message = is.readLine(); // TODO: I'm not sure if this is the best way to do this. // Maybe Read only first 5 bytes instead? if (message.startsWith(MOVED_RESPONSE)) { String[] movedInfo = parseTargetHostAndSlot(message); throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0])); } else if (message.startsWith(ASK_RESPONSE)) { String[] askInfo = parseTargetHostAndSlot(message); throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0])); } else if (message.startsWith(CLUSTERDOWN_RESPONSE)) { throw new JedisClusterException(message); } throw new JedisDataException(message); } private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) { String[] response = new String[3]; String[] messageInfo = clusterRedirectResponse.split(" "); String[] targetHostAndPort = messageInfo[2].split(":"); response[0] = messageInfo[1]; response[1] = targetHostAndPort[0]; response[2] = targetHostAndPort[1]; return response; } /** * <pre> * "+": 狀態回覆(status reply) PLUS_BYTE * "-": 錯誤回覆(error reply) MINUS_BYTE * ":": 整數回覆(integer reply) COLON_BYTE * "$": 批量回復(bulk reply) DOLLAR_BYTE * "*": 多條批量回復(multi bulk reply) ASTERISK_BYTE * </pre> */ private static Object process(final RedisInputStream is) { final byte b = is.readByte(); if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else if (b == DOLLAR_BYTE) { return processBulkReply(is); } else if (b == ASTERISK_BYTE) { return processMultiBulkReply(is); } else if (b == COLON_BYTE) { return processInteger(is); } else if (b == MINUS_BYTE) { processError(is); return null; } else { throw new JedisConnectionException("Unknown reply: " + (char) b); } } /** * <pre> * 狀態回覆通常由那些不需要返回資料的命令返回,這種回覆不能包含新行。 * eg: * cli: set name zhangsan * server: +OK * </pre> */ private static byte[] processStatusCodeReply(final RedisInputStream is) { return is.readLineBytes(); } /** * <pre> * 伺服器使用批量回復來返回二進位制安全的字串,字串的最大長度為 512 MB。 * eg: * cli: get name * server: $8\r\nzhangsan\r\n * 空批量回復: * 如果被請求的值不存在, 那麼批量回復會將特殊值 -1 用作回覆的長度值。當請求物件不存在時,客戶端應該返回空物件,而不是空字串。 * </pre> */ private static byte[] processBulkReply(final RedisInputStream is) { final int len = is.readIntCrLf(); if (len == -1) { return null; } final byte[] read = new byte[len]; int offset = 0; while (offset < len) { final int size = is.read(read, offset, (len - offset)); if (size == -1) throw new JedisConnectionException( "It seems like server has closed the connection."); offset += size; } // read 2 more bytes for the command delimiter is.readByte(); is.readByte(); return read; } /** * <pre> * 整數回覆就是一個以 ":" 開頭, CRLF 結尾的字串表示的整數。 * eg: * cli: exists name * server: :1 * </pre> */ private static Long processInteger(final RedisInputStream is) { return is.readLongCrLf(); } /** * <pre> * 多條批量回復是由多個回覆組成的陣列, 陣列中的每個元素都可以是任意型別的回覆, 包括多條批量回複本身。 * eg: * cli: lrange mylist 0 3 * server: *4\r\n * :1\r\n * :2\r\n * :3\r\n * $3\r\n * foo\r\n * 多條批量回復也可以是空白的, * eg: * cli: lrange mylist 7 8 * server: *0\r\n * 無內容的多條批量回復(null multi bulk reply)也是存在的, 比如當 BLPOP 命令的阻塞時間超過最大時限時, 它就返回一個無內容的多條批量回復, 這個回覆的計數值為 -1 : * eg: * cli: blpop key 1 * server: *-1\r\n * 多條批量回復中的元素可以將自身的長度設定為 -1 , 從而表示該元素不存在, 並且也不是一個空白字串(empty string)。 * </pre> */ private static List<Object> processMultiBulkReply(final RedisInputStream is) { final int num = is.readIntCrLf(); if (num == -1) { return null; } final List<Object> ret = new ArrayList<Object>(num); for (int i = 0; i < num; i++) { try { ret.add(process(is)); } catch (JedisDataException e) { ret.add(e); } } return ret; } /** * 解析伺服器返回的資料流 */ public static Object read(final RedisInputStream is) { return process(is); } public static final byte[] toByteArray(final boolean value) { return value ? BYTES_TRUE : BYTES_FALSE; } public static final byte[] toByteArray(final int value) { return SafeEncoder.encode(String.valueOf(value)); } public static final byte[] toByteArray(final long value) { return SafeEncoder.encode(String.valueOf(value)); } public static final byte[] toByteArray(final double value) { return SafeEncoder.encode(String.valueOf(value)); } public static enum Command implements ProtocolCommand { PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, ZLEXCOUNT, ZRANGEBYLEX, ZREVRANGEBYLEX, ZREMRANGEBYLEX, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE; private final byte[] raw; Command() { raw = SafeEncoder.encode(this.name()); } @Override public byte[] getRaw() { return raw; } } public static enum Keyword { AGGREGATE, ALPHA, ASC, BY, DESC, GET, LIMIT, MESSAGE, NO, NOSORT, PMESSAGE, PSUBSCRIBE, PUNSUBSCRIBE, OK, ONE, QUEUED, SET, STORE, SUBSCRIBE, UNSUBSCRIBE, WEIGHTS, WITHSCORES, RESETSTAT, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME, AND, OR, XOR, NOT, GETNAME, SETNAME, LIST, MATCH, COUNT; public final byte[] raw; Keyword() { raw = SafeEncoder.encode(this.name().toLowerCase()); } } }
Github:xetorthio/jedis
相關推薦
Redis 通訊協議的格式
協議說明 Redis協議在以下幾點之間做出了折衷: 簡單的實現 快速地被計算機解析 簡單得可以能被人工解析 網路層 Redis在TCP埠6379上監聽到來的連線,客戶端連線到來時,Redis伺服器為此建立一個TCP連線。在客戶端與伺服器端之間傳輸的每個Re
redis通訊協議學習
Redis 的作者認為資料庫系統的瓶頸一般不在於網路流量,而是資料庫自身內部邏輯處理上。所以即使 Redis 使用了浪費流量的文字協議,依然可以取得極高的訪問效能。Redis 將所有資料都放在記憶體,用一個單執行緒對外提供服務,單個節點在跑滿一個 CPU 核心的情況下可以達到了 10w/s 的超高 QPS。
Redis-通訊協議
RESP協議: Redis 序列化協議的簡寫。它是一種直觀的文字協議,優勢在於實現異常簡單,解析效能極好。 將傳輸現已分為5種最小單元型別:每個單元型別結束時統一加上回撤換行符號\r\n,協議型別如下: 1、單行字串 以 + 符號開頭。 2、多行字串 以
【Redis詳解基礎篇四(Redis通訊協議)】
前言 Redis通訊協議是什麼? reids通訊協議就是接受處理來自客戶端請求,非阻塞,iO複用的TCP伺服器 Protocol redis協議與TCP協議進行通訊,他們的協議術語叫做Protocol,代表了伺服器於客戶端之間的通訊,對於redis來講這種協
Redis通訊協議
package redis.clients.jedis; import java.io.IOException; import java.util.ArrayList; import java.util.List; import redis.clients.jedis.
Redis原始碼剖析和註釋(二十)--- 網路連線庫剖析(client的建立/釋放、命令接收/回覆、Redis通訊協議分析等)
Redis 網路連線庫剖析 1. Redis網路連線庫介紹 Redis網路連線庫對應的檔案是networking.c。這個檔案主要負責 客戶端的建立與釋放 命令接收與命令回覆 Redis通訊協議分析 CLIENT 命令的實現 我們接下來就這幾塊內
由於各個廠家的通訊協議都不兼容有些還必須獲得
tag 郵件傳輸 源地址 能夠 彩頁 它的 如果 通過 獲得 有時又稱為"數據鏈路測過"或"網絡接口層",通常包括操作系統中的設備驅動程序和計算機中對應的網絡接口卡。它們一起處理與電纜(或其他任何傳輸媒介)的物理接口細節。 網絡層 有時又稱為"網絡互聯層",處理分組在網
Kafka的通訊協議
單位 編碼 ace 處理 lap 部分 nap spa head Kafka的Producer、Broker和Consumer之間采用的是一套自行設計的基於TCP層的協議。Kafka的這套協議完全是為了Kafka自身的業務需求而定制的,而非要實現一套類似於Protocol
C#高性能大容量SOCKET並發(八):通訊協議
pad 英文 透明 優勢 sock ase sha dev lac 協議種類 開發Socket程序有兩種協議類型,一種是用文本描述的,類似HTTP協議,定義字符集,好處是兼容性和調試方便,缺點是解析文本會損耗一些性能;一種是用Code加結構體,定義字節順序,好處是性能高,
MQTT是IBM開發的一個即時通訊協議,構建於TCP/IP協議上,是物聯網IoT的訂閱協議,借助消息推送功能,可以更好地實現遠程控制
集合 cap 消息處理 簡易 遠程控制 mes ogr 設計思想 成本 最近一直做物聯網方面的開發,以下內容關於使用MQTT過程中遇到問題的記錄以及需要掌握的機制原理,主要講解理論。 背景 MQTT是IBM開發的一個即時通訊協議。MQTT構建於TCP/IP協議上
# 2017-2018-1 20155318 《信息安全系統設計基礎》 實驗五 通訊協議設計
應用程序 .cn 代碼 下使用 申請 只讀 genrsa 關閉 accept 2017-2018-1 20155318 《信息安全系統設計基礎》 實驗五 通訊協議設計 Linux下OpenSSL的安裝與測試 任務要求:在Ubuntu中完成 http://www.cnblo
2017-2018-1 20155317《信息安全系統設計基礎》 實驗五 通訊協議設計
pthread set 申請 gac process 在一起 href main fun 2017-2018-1 20155317《信息安全系統設計基礎》 實驗五 通訊協議設計 實驗要求 任務 安裝OpenSSL環境,並編寫測試代碼驗證無誤研究OpenSSL算法,測試
2017-2018-1 20155234 實驗五 通訊協議設計
src 通訊 ubuntu ref 作業 服務器 html clas logs 實驗任務 任務一 在Ubuntu中完成 http://www.cnblogs.com/rocedu/p/5087623.html 中的作業 提交運行結果截圖 截圖如下 任務二 在Ubuntu中
20155325 2017-2018 1 《信息安全系統設計基礎》實驗五 通訊協議設計
inux tps rime 影響 收信 alt 完整 man 輸出 實驗五 通訊協議設計-1 實驗要求 在Ubuntu中完成 http://www.cnblogs.com/rocedu/p/5087623.html 中的作業 提交運行結果截圖 實驗截圖 碼雲鏈接 link
2017-2018-1 20155215 實驗五 通訊協議設計
完成 版本 html onf ron 分享圖片 apache day es2017 1 在Ubuntu中完成 http://www.cnblogs.com/rocedu/p/5087623.html 中的作業 提交運行結果截圖 Linux下OpenSSL的安裝與使用
2017-2018-1 201553334 實驗五 通訊協議設計
down 新增 ubun 防護 工作 通訊 ref 自我 roc 2017-2018-1 201553334 實驗五 通訊協議設計 1、在Ubuntu中完成 http://www.cnblogs.com/rocedu/p/5087623.html 中的作業 提交運行結果截圖
2017-2018-1 20155304 實驗五 通訊協議設計
img tex eof lin 單獨 serve free 密碼安全 echo 2017-2018-1 20155304 實驗五 通訊協議設計 實驗步驟 實驗五 通訊協議設計-1 在Ubuntu中完成 http://www.cnblogs.com/rocedu/p/5087
關於物聯網通信協議(通訊協議)
出了 ems 訂閱 不可 關於 ase -i 目前 點對點 一、區分通訊與通信協議:1、傳統意義上的“通訊”主要指電話、電報、電傳。通訊的“訊”指消息(Message),媒體訊息通過通訊網絡從一端傳遞到另外一端。媒體訊息的內容主要是話音、文字、圖片和視頻圖像。其網絡的構成主
1.2分布式-網絡通訊協議
未收到 正常 緩沖區 使用 什麽是 fab gmp 大小 報文 網絡協議: TCP/IP 和UDP/IP TCP/IP TCP/IP(Transmission Control Protocol/Internet Protocol)是一種可靠的網絡數據傳輸控制協議。定義了主機
[Golang] 從零開始寫Socket Server(2): 自定義通訊協議
在上一章我們做出來一個最基礎的demo後,已經可以初步實現Server和Client之間的資訊交流了~ 這一章我會介紹一下怎麼在Server和Client之間實現一個簡單的通訊協議,從而增強整個資訊交流過程的穩定性。