websocket-client的使用
阿新 • • 發佈:2018-11-11
1. 代理訪問:
import websocket
ws = websocket.WebSocket()
# websocket的代理使用
ws.connect("ws://example.com/websocket", http_proxy_host="proxy_host_name", http_proxy_port=3128)
2. Long-lived connection
import websocket try: import thread except ImportError: import _thread as thread import time def on_message(ws, message): print(message) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp("ws://echo.websocket.org/", on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()
3. Short-lived one-off send-receive
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
If you want to customize socket options, set sockopt.
sockopt example
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/",
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY),))
4. 建立自定義的websocket類:
import socket from websocket import create_connection, WebSocket class MyWebSocket(WebSocket): def recv_frame(self): frame = super().recv_frame() print('yay! I got this frame: ', frame) return frame ws = create_connection("ws://echo.websocket.org/", sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),), class_=MyWebSocket)
5. SSL連線問題: disable ssl cert verification
WebSocketApp sample
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
create_connection sample
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"cert_reqs": ssl.CERT_NONE})
WebSocket sample
ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
6. disable hostname verification
WebSocketApp sample
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"check_hostname": False})
create_connection sample
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"check_hostname": False})
WebSocket sample
ws = websocket.WebSocket(sslopt={"check_hostname": False})
ws.connect("wss://echo.websocket.org")
7. SNI support
自動支援Python 2.7.9+ and 3.2+
8. 支援子協議:
ws = websocket.create_connection("ws://example.com/websocket", subprotocols=["binary", "base64"])