1. 程式人生 > 實用技巧 >Flink實戰(九十八):flink-sql使用(十六)雙流join(一)雙流 join 場景應用

Flink實戰(九十八):flink-sql使用(十六)雙流join(一)雙流 join 場景應用

安裝匯入第三方模組Redis

pip3 install redis

import redis

操作String型別

"""
redis 基本命令 String
set(name, value, ex=None, px=None, nx=False, xx=False)
在 Redis 中設定值,預設,不存在則建立,存在則修改。
引數:
    ex - 過期時間(秒)
    px - 過期時間(毫秒)
    nx - 如果設定為True,則只有name不存在時,當前set操作才執行
    xx - 如果設定為True,則只有name存在時,當前set操作才執行
redis 取出的結果預設是位元組,我們可以設定 decode_responses=True 改成字串。
"""
redis = redis.Redis(host="116.62.13.104", port=6379, decode_responses=True)
#  python-k1 代表key; hello 代表 value;  ex代表seconds;px代表ms
redis.set("python-k1","hello",ex=120)
# 獲取值的第一種方式   使用  redis.get("key")
print(redis.get("python-k1"),type(redis.get("python-k1")))
# 獲取值的第二種方式  直接使用  redis['key']
print(redis['python-k1'],type(redis['python-k1']))

連線池

redis-py 使用 connection pool 來管理對一個 redis server 的所有連線,避免每次建立、釋放連線的開銷。

"""
連線池
redis-py 使用 connection pool 來管理對一個 redis server 的所有連線,避免每次建立、釋放連線的開銷。
預設,每個Redis例項都會維護一個自己的連線池。可以直接建立一個連線池,然後作為引數 Redis,這樣就可以實現多個 Redis 例項共享一個連線池。
"""
pool = redis.ConnectionPool(host='116.62.13.104', port=6379, decode_responses=True)
redis = redis.Redis(connection_pool=pool)

完整程式碼

# author: LiuShihao
# data: 2020/12/3 2:59 下午
# youknow: 各位老鐵,我的這套程式碼曾經有人出價三個億我沒有賣,如今拿出來和大家分享,不求別的,只求大家免費的小紅心幫忙點一點,這裡謝過了。
# desc: Python操作Redis
"""
    通過 get()、set() 操作redis字元型資料;
    通過 hset()、hget() 操作redis雜湊型別資料
    通過 json.dumps() 和 json.loads() 可以實現python中的字典資料的序列化和反序列化;
"""
import redis
import traceback

# redis = redis.Redis(host="116.62.13.104", port=6379, decode_responses=True)
"""
連線池
redis-py 使用 connection pool 來管理對一個 redis server 的所有連線,避免每次建立、釋放連線的開銷。
預設,每個Redis例項都會維護一個自己的連線池。可以直接建立一個連線池,然後作為引數 Redis,這樣就可以實現多個 Redis 例項共享一個連線池。
"""
pool = redis.ConnectionPool(host='116.62.13.104', port=6379, decode_responses=True)
redis = redis.Redis(connection_pool=pool)

"""
redis 基本命令 String
set(name, value, ex=None, px=None, nx=False, xx=False)
在 Redis 中設定值,預設,不存在則建立,存在則修改。
引數:
    ex - 過期時間(秒)
    px - 過期時間(毫秒)
    nx - 如果設定為True,則只有name不存在時,當前set操作才執行
    xx - 如果設定為True,則只有name存在時,當前set操作才執行
redis 取出的結果預設是位元組,我們可以設定 decode_responses=True 改成字串。

    # 獲取值的第一種方式   使用  redis.get("key")
    print(redis.get("python-k1"),type(redis.get("python-k1")))
    # 獲取值的第二種方式  直接使用  redis['key']
    print(redis['python-k1'],type(redis['python-k1']))
"""

# 獲取所有的key名
def findAllKeys():
    keyList = redis.keys()
    for item in keyList:
        print(item)
# 獲取所有的鍵值對
def findAllKeyAndValue():
    # keyAndValue = []
    keyList = redis.keys()
    for item in keyList:
        print(item)
        value = redis[item]
        # d=dict(item=value)
        d= {item:value}
        print(d)
        # keyAndValue.append(d)


if __name__ == '__main__':
    # redis.set('PK2','PV2')
    # value = redis.get('PK2')
    # print('值:',value)
    findAllKeyAndValue()
    # print("鍵值對:",keyAndValue)

Bug redis.exceptions.ResponseError


報錯資訊:

redis.exceptions.ResponseError: MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.

報錯原因:
強制把redis快照關閉了導致不能持久化的問題。
解決方法:
在linux下通過redis-cli連線redis進行資料庫操作:

redis-cli


config set stop-writes-on-bgsave-error no