1. 程式人生 > 實用技巧 >Tornado 應用筆記06 WebSocket與長輪詢

Tornado 應用筆記06 WebSocket與長輪詢

目錄

WebSocket

以下內容摘自維基百科(原連結)

WebSocket一種在單個 TCP 連線上進行全雙工通訊的協議。WebSocket通訊協議於2011年被IETF定為標準RFC 6455,並被RFC7936所補充規範。WebSocket API也被W3C定為標準。

WebSocket 使得客戶端和伺服器之間的資料交換變得更加簡單,允許服務端主動向客戶端推送資料。在 WebSocket API 中,瀏覽器和伺服器只需要完成一次握手,兩者之間就直接可以建立永續性的連線,並進行雙向資料傳輸。

Websocket使用ws或wss的統一資源標誌符,類似於HTTPS,其中wss表示在TLS之上的Websocket。如:

ws://example.com/wsapi
wss://secure.example.com/

一個典型的Websocket握手請求如下:

客戶端請求
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
伺服器迴應
HTTP/1.1 101 
Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
欄位說明
  • Connection必須設定Upgrade,表示客戶端希望連線升級。
  • Upgrade欄位必須設定Websocket,表示希望升級到Websocket協議。
  • Sec-WebSocket-Key是隨機的字串,伺服器端會用這些資料來構造出一個SHA-1的資訊摘要。把“Sec-WebSocket-Key”加上一個特殊字串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然後計算SHA-1摘要,之後進行BASE-64編碼,將結果做為“Sec-WebSocket-Accept”頭的值,返回給客戶端。如此操作,可以儘量避免普通HTTP請求被誤認為Websocket協議。
  • Sec-WebSocket-Version 表示支援的Websocket版本。RFC6455要求使用的版本是13,之前草案的版本均應當被棄用。
  • Origin欄位是可選的,通常用來表示在瀏覽器中發起此Websocket連線所在的頁面,類似於Referer。但是,於Referer不同的是,Origin只包含了協議和主機名稱。
  • 其他一些定義在HTTP協議中的欄位,如Cookie等,也可以在Websocket中使用。

下面的WebSocket的例子採用了一篇博文(原地址)上的程式碼, 我對此進行了部分修改放在這裡.

# -*- coding: utf-8 -*-
# file: websocket_chat.py
 
import json
import os
from uuid import uuid4
import tornado.websocket
import tornado.web
import tornado.httpserver
import tornado.ioloop
from tornado import options
 
 
class ChatRoom(object):
    """ 處理伺服器與客戶端的互動資訊 """
 
    # 聊天室容器, 儲存聊天室和其對應的`websocket`連線
    chat_room_container = {}
    # 訊息快取, 不過這裡沒有呈現到網頁上
    cache = []
    cache_size = 200
 
    def register(self, ws_handler):
        """ 註冊聊天室使用者 """
 
        room = str(ws_handler.get_argument('n'))  # 獲取所在聊天室
        session = str(ws_handler.get_argument('u'))
        ws_handler.session = session
 
        if room in self.chat_room_container:
            self.chat_room_container[room].append(ws_handler)
        else:
            self.chat_room_container[room] = [ws_handler, ]
 
        self.new_msg_trigger(ws_handler, is_new_user=True)
 
    def unregister(self, ws_handler):
        """ 離開聊天室, 登出使用者 """
 
        room = str(ws_handler.get_argument('n'))
 
        self.chat_room_container[room].remove(ws_handler)
 
        self.new_msg_trigger(ws_handler, is_leave_user=True)
 
    def message_maker(self, session, message=None, is_leave=False, is_new=False,
                      self_new=False):
        """ 訊息生成器 """
 
        _from = 'sys'
        if message:
            _from = session
            msg = message
        elif is_leave:
            msg = '(%s)離開了聊天室' % session
        elif is_new:
            msg = '歡迎(%s)加入聊天室' % session
        elif self_new:
            msg = '歡迎你加入聊天室'
        else:
            raise Exception('WTF?')
 
        msg = {
            'from': _from,
            'message': msg,
        }
        self.update_msg_cache(msg)
        return json.dumps(msg)
 
    def update_msg_cache(self, message):
        """ 訊息快取更新 """
        self.cache.append(message)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]
 
    def send_room_message(self, ws_handler, message, except_self=False):
        """ 傳送聊天室資訊, except_self為True則該訊息不推送給訊息的生產者 """
 
        room = str(ws_handler.get_argument('n'))  # 獲取所在聊天室
 
        if except_self:
            session = str(ws_handler.get_argument('u'))
            for ws_handler in self.chat_room_container[room]:
                if ws_handler.session != session:
                    ws_handler.write_message(message)
        else:
            for ws_handler in self.chat_room_container[room]:
                ws_handler.write_message(message)
 
    def send_left_msg(self, ws_handler):
        """ 傳送離開資訊 """
 
        session = str(ws_handler.get_argument('u'))
 
        msg = self.message_maker(session, is_leave=True)
        self.send_room_message(ws_handler, msg, except_self=True)
 
    def send_welcome_msg(self, ws_handler):
        """ 傳送歡迎資訊 """
 
        session = str(ws_handler.get_argument('u'))
 
        msg = self.message_maker(session, self_new=True)
        ws_handler.write_message(msg)
 
        msg = self.message_maker(session, is_new=True)
        self.send_room_message(ws_handler, msg, except_self=True)
 
    def send_chat_msg(self, ws_handler, message):
        """ 傳送聊天資訊 """
 
        session = str(ws_handler.get_argument('u'))
 
        msg = self.message_maker(session, message)
        self.send_room_message(ws_handler, msg)
 
    def new_msg_trigger(self, ws_handler, message=None, is_new_user=False,
                        is_leave_user=False):
        """ 訊息觸發器,將最新訊息返回給對應聊天室的所有成員 """
 
        if message:
            self.send_chat_msg(ws_handler, message)
        elif is_new_user:
            self.send_welcome_msg(ws_handler)
        elif is_leave_user:
            self.send_left_msg(ws_handler)
        else:
            raise Exception('WTF?')
 
 
class ChatRoomIndexPage(tornado.web.RequestHandler):
    """ 首頁, 聊天室選擇頁 """
 
    def get(self, *args, **kwargs):
        # 生成隨機標識碼, 取代使用者名稱
        session = uuid4()
        self.render('basic.html', session=session)
 
 
class ChatRoomInnerPage(tornado.web.RequestHandler):
    """ 聊天室內頁 """
 
    def get(self, *args, **kwargs):
        # n=聊天室, u=使用者
        n = self.get_argument('n')
        u = self.get_argument('u')
        self.render('room.html', n=n, u=u)
 
 
class NewChat(tornado.websocket.WebSocketHandler):
    """ WebSocket服務, 訊息處理中轉 """
 
    @property
    def chatroom(self):
        return self.application.chatroom
 
    def open(self):
        """ 新的WebSocket連線開啟 """
 
        self.chatroom.register(self)
 
    def on_close(self):
        """ WebSocket連線斷開 """
 
        self.chatroom.unregister(self)
 
    def on_message(self, message):
        """ WebSocket服務端接收到訊息 """
 
        self.chatroom.new_msg_trigger(self, message)
 
        # 心跳包, 如果客戶端接收到的話, 會返回一樣的資料
        self.ping('answer me')
 
    def on_pong(self, data):
        """ 心跳包響應, data是`.ping`發出的資料 """
 
        print 'into on_pong the data is |%s|' % data
 
 
class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', ChatRoomIndexPage),
            (r'/room', ChatRoomInnerPage),
            (r'/new_chat', NewChat),
        ]
 
        tornado_settings = dict(
            template_path=os.path.join(os.path.dirname(__file__), '../template'),
        )
 
        super(Application, self).__init__(handlers, **tornado_settings)
 
        self.chatroom = ChatRoom()
 
 
if __name__ == '__main__':
    options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(8888)
    tornado.ioloop.IOLoop.current().start()
 

<!-- basic.html -->
<body>
    <h1>你好 !{{ session }} <br> 歡迎來到聊天室!</h1>
    <a href="/room?n=1&u={{ session }}"> 聊天室一 </a>  
    <a href="/room?n=2&u={{ session }}"> 聊天室二 </a>
</body>

<!-- room.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title></title>
    <script src="http://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script>
    <script>
        $(function(){
            n = $("#n").val()
            u = $("#u").val()
 
            $("#btn").click(function(){
                sendText()
            })
            function requestText(){
                host = "ws://localhost:8888/new_chat?n=" + n + "&u=" +u
                websocket = new WebSocket(host)
 
                websocket.onopen = function(evt){}      // 建立連線
                websocket.onmessage = function(evt){    // 獲取伺服器返回的資訊
                    data = $.parseJSON(evt.data)
                    if(data['from']=='sys'){
                        $('#chatinfo').append("<p style='width: 100%; text-align:center; font-size: 16px; color: green'>" + data['message'] + "</p>");
                    }else if(data['from']==u){
                        $('#chatinfo').append("<p style='width: 100%; text-align:right; font-size:15px'>" + u + ": <br>" +"<span style='color: blue'>" + data['message'] + "</span>" + "</p>");
                    }else{
                        $('#chatinfo').append("<p style='width: 100%; text-align:left; font-size:15px'>" + data['from'] + ": <br>" +"<span style='color: red'>" + data['message'] + "</span>" + "</p>");
                    }
 
                }
                websocket.onerror = function(evt){}
            }
 
            requestText()   // 開始 websocket
 
            function sendText(){    // 向伺服器傳送資訊
                websocket.send($("#chat_text").val())
            }
        })
 
    </script>
</head>
<body>
<div align="center">
    <div style="width: 70%">
        <h1>聊天室({{ n }})!</h1>
        <input type="hidden" value="{{ n }}" id="n">
        <input type="hidden" value="{{ u }}" id="u">
 
        <div id="chatinfo" style="padding:10px;border: 1px solid #888">
            <!-- 聊天內容 -->
        </div>
 
        <div style="clear: both; text-align:right; margin-top: 20px">
            <input type="text" name="chat_text" id="chat_text">
            <button id="btn">傳送</button>
        </div>
    </div>
</div>
</body>
</html>

長輪詢

這個例子來自Tornado原始碼附帶的demo中(原連結)

長輪詢在前端的程式碼比較複雜, 這裡就不貼出了, 有興趣的可以到原連結看. 這個聊天室工作原理就是利用gen.coroutine非阻塞等待(實際上是等待一個future完成, 這個future代表的就是新訊息)實現長輪詢, 客戶端在接收到一個新訊息後, 接著又發起一個新的長連線等待新訊息, 迴圈往復. 這個方案實現起來沒有Websocket直觀和方便, 不過看懂這個demo對理解協成和非同步有幫助.

import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid
 
from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
 
define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")
 
 
class MessageBuffer(object):
    # 這個類實現了訊息快取和輔助聊天訊息推送及連線管理
 
    def __init__(self):
        # 訊息快取區
        self.waiters = set()
        self.cache = []
        self.cache_size = 200
 
    def wait_for_messages(self, cursor=None):
        # 新增等待推送的使用者(future_waiter), 配合`gen.coroutine`實現非阻塞等待
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future
 
    def cancel_wait(self, future):
        # 登出等待推送的使用者, 並使用`future.set_result`讓阻塞的函式恢復
        self.waiters.remove(future)
        future.set_result([])
 
    def new_messages(self, messages):
        # 新訊息進來, 給等待推送的使用者(future_waiter)設定訊息(set_result)
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]
 
 
global_message_buffer = MessageBuffer()
 
 
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)
 
 
class MessageNewHandler(tornado.web.RequestHandler):
    """ 新訊息處理 """
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }
 
        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)
 
        # 給等待推送的使用者設定推送訊息
        global_message_buffer.new_messages([message])
 
 
class MessageUpdatesHandler(tornado.web.RequestHandler):
    """ 輪詢的長連線 """
    @gen.coroutine
    def post(self):
        # 通過使用者已經接收的訊息位置, 等待剩餘需要推送的訊息
        cursor = self.get_argument("cursor", None)
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        # yield 非阻塞等待新訊息的到來
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))
 
    def on_connection_close(self):
        # 處理連線斷開的情況
        global_message_buffer.cancel_wait(self.future)
 
 
def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()
 
 
if __name__ == "__main__":
    main()

本節內容就是這些, 同時這個筆記也暫告一段落了, 往後有機會還會在此基礎上繼續更新, 可能擴充套件的點會在非同步客戶端, IOLoop, 非同步服務端, 網路層和Tornado架構分析等更加深入的內容上面, 不過這麼做的話, 可能跟"應用筆記"這個命題不符了. 那就再說吧...

這也算是第一次對一個框架有比較深入的分析和了解, 嘗試了從原始碼理解Tornado的一些功能實現原理, 而不是僅僅停留在熟練使用上, 這對實際開發還是很有幫助的, 能讓你在開發中更加自信, 而不是像在操作一個黑盒.

標籤:Tornado Python