1. 程式人生 > 實用技巧 >redis實現釋出訂閱

redis實現釋出訂閱

轉載Python大佬技術部落格


import threading

import redis

conn = redis.Redis()

PROMPT = ">>> "


def send(username):

    global g_running

    while g_running:
        msg = input(PROMPT)

        if msg == "q":
            g_running = False

        conn.publish("room", f"{username}: {msg}")


def recv(username):

    pubsub = conn.pubsub()
    pubsub.subscribe(["room"])


    for item in pubsub.listen():

        msg_type = item["type"]

        if msg_type == "message":

            msg = item["data"].decode()

            sender, _, content = msg.partition(":")

            if sender == username:
                sender = "me"


            print("\b" * 100, end="")
            print(f"{sender}:{content}", f"\n{PROMPT}", end="")

        if not g_running:
            pubsub.unsubscribe()
            break


if __name__ == "__main__":

    username = input("Enter a username: ") 
    g_running = True


    send_thread = threading.Thread(target=send, args=(username,))

    send_thread.start()

    recv(username)