1. 程式人生 > 程式設計 >用Python進行websocket介面測試

用Python進行websocket介面測試

我們在做介面測試時,除了常見的http介面,還有一種比較多見,就是socket介面,今天講解下怎麼用Python進行websocket介面測試。

現在大多數用的都是websocket,那我們就先來安裝一下websocket的安裝包。

pip install websocket-client

用Python進行websocket介面測試

安裝完之後,我們就開始我們的websocket之旅了。

我們先來看個炒雞簡單的栗子:

import websocket
ws = websocket.WebSocket()
ws.connect("ws://example.com/websocket",http_proxy_host="proxy_host_name",http_proxy_port=3128)

這個栗子就是建立一個websocket連線,這個模組支援通過http代理訪問websocket。代理伺服器允許使用connect方法連線到websocket埠。預設的squid設定是“只允許連線HTTPS埠”。

在websocket裡,我們有常用的這幾個方法:

on_message方法:

def on_message(ws,message):
  print(message)

on_message是用來接受訊息的,server傳送的所有訊息都可以用on_message這個方法來收取。

on_error方法:

def on_error(ws,error):
  print(error)

這個方法是用來處理錯誤異常的,如果一旦socket的程式出現了通訊的問題,就可以被這個方法捕捉到。

on_open方法:

def on_open(ws):
  def run(*args):
    for i in range(30):
      # send the message,then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()

on_open方法是用來保持連線的,上面這樣的一個例子,就是保持連線的一個過程,每隔一段時間就會來做一件事,他會在30s內一直髮送hello。最後停止。

on_close方法:

def on_close(ws):
  print("### closed ###")

onclose主要就是關閉socket連線的。

如何建立一個websocket應用:

ws = websocket.WebSocketApp("wss://echo.websocket.org")

括號裡面就是你要連線的socket的地址,在WebSocketApp這個例項化的方法裡面還可以有其他引數,這些引數就是我們剛剛介紹的這些方法。

ws = websocket.WebSocketApp("ws://echo.websocket.org/",on_message=on_message,on_error=on_error,on_close=on_close)

指定了這些引數之後就可以直接進行呼叫了,例如:

ws.on_open = on_open

這樣就是呼叫了on_open方法

如果我們想讓我們的socket保持長連線,一直連線著,就可以使用run_forever方法:

ws.run_forever()

完整程式碼:

import websocket
from threading import Thread
import time
import sys

def on_message(ws,message):
  print(message)

def on_error(ws,error):
  print(error)

def on_close(ws):
  print("### closed ###")

def on_open(ws):
  def run(*args):
    for i in range(3):
      # send the message,then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()


if __name__ == "__main__":

  websocket.enableTrace(True)
  host = "ws://echo.websocket.org/"
  ws = websocket.WebSocketApp(host,on_close=on_close)
  ws.on_open = on_open
  ws.run_forever()

如果想要通訊一條短訊息,並在完成後立即斷開連線,我們可以使用短連線:

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello,World'...")
ws.send("Hello,World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()

關於websocket的介紹就到這兒了。

以上就是用Python進行websocket介面測試的詳細內容,更多關於python 介面測試的資料請關注我們其它相關文章!