aiohttp初識(請求&響應)
aiohttp客戶端使用
用於asyncio和Python的非同步HTTP客戶端/伺服器:Asynchronous HTTP Client/Server for asyncio and Python.
發起請求
讓我們從匯入aiohttp模組開始:
import aiohttp
好啦,我們來嘗試獲取一個web頁面。比如我們來獲取下GitHub的時間軸。
async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text())
我們現在有了一個會話(session)
物件,由ClientSession物件賦值而來,還有一個變數resp
,它其實是ClientResponse物件。我們可以從這個響應物件中獲取我們任何想要的資訊。協程方法ClientSession.get()
的主要引數接受一個HTTP URL。
發起HTTP POST請求我們可以使用協程方法ClientSession.post():
session.post('http://httpbin.org/post', data=b'data')
其他的HTTP方法也同樣支援:
session.put('http://httpbin.org/put', data=b'data')session.delete('http://httpbin.org/delete')session.head('http://httpbin.org/get')session.options('http://httpbin.org/get')session.patch('http://httpbin.org/patch', data=b'data')
注意:
不要為每個請求都建立一個會話。大多數情況下每個應用程式只需要一個會話就可以執行所有的請求。
每個會話物件都包含一個連線池,可複用的連線和持久連線狀態(keep-alives,這兩個是預設的)可提升總體的執行效率。
發起JSON請求:
每個會話的請求方法都可接受json引數。
async with aiohttp.ClientSession() as session: async with session.post(json={'test': 'object'})
預設情況下會話(session)使用Python標準庫裡的json模組解析json資訊。但還可使用其他的json解析器。可以給ClientSession指定json_serialize
import ujsonasync with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: async with session.post(json={'test': 'object'})
傳遞URL中的引數:
你可能經常想在URL中傳送一系列的查詢資訊。如果你手動構建他們,這些資訊會以鍵值對的形式出現在?後面,比如: httpbin.org/get?key=val
。請求物件允許你使用dict(字典,python中的資料型別)傳送它們,使用params
引數即可。例如: 如果你要把 key1=value1,key2=value2
放到httpbin.org/get
後面,你可以用下面的方式:
params = {'key1': 'value1', 'key2': 'value2'}async with session.get('http://httpbin.org/get', params=params) as resp: assert str(resp.url) == 'http://httpbin.org/get?key2=value2&key1=value1'
看,URL已經被正確的編碼啦。
同鍵不同值的並聯字典(MultiDict) 也同樣支援。
可使用帶有兩個tuples(元組,python中的資料型別)的list(列表,python中的資料型別)來構建:
params = [('key', 'value1'), ('key', 'value2')]async with session.get('http://httpbin.org/get', params=params) as r: assert str(r.url) == 'http://httpbin.org/get?key=value2&key=value1'
同樣也允許你傳遞str(字串)給params,但要小心一些不能被編碼的字元。+
就是一個不能被編碼的字元:
async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert str(r.url) == 'http://httpbin.org/get?key=value+1'
注意:
aiohttp會在傳送請求前標準化URL。
域名部分會用IDNA 編碼,路徑和查詢條件會重新編譯(requoting)。
比如:URL('http://example.com/путь%30?a=%31')
會被轉化為URL('http://example.com/%D0%BF%D1%83%D1%82%D1%8C/0?a=1')
如果伺服器需要接受準確的表示並不要求編譯URL,那標準化過程應是禁止的。
禁止標準化可以使用encoded=True
:await session.get(URL('http://example.com/%30', encoded=True))
警告:
傳遞params時不要用
encode=True
,這倆引數不能同時使用。
獲取響應內容
我們可以讀取伺服器的響應內容。想想我們獲取GitHub時間軸的例子:
async with session.get('https://api.github.com/events') as resp: print(await resp.text())
這樣會打印出類似於下面的資訊:
'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
aiohttp
將會自動解碼內容。你可以為text()方法指定編碼(使用encoding引數):
await resp.text(encoding='windows-1251')
獲取二進位制響應內容
你也可以以位元組形式獲取響應,這樣得到的就不是文字了:
print(await resp.read())b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
gzip
和defalte
傳輸編碼會自動解碼。
你也可以使其支援brotli
傳輸編碼的解碼,只需安裝brotlipy即可。
獲取JSON響應內容
以防你需要處理JSON資料,內建了一個JSON解碼器:
async with session.get('https://api.github.com/events') as resp: print(await resp.json())
如果JSON解碼失敗,json()方法將會丟擲一個異常。你還可以在呼叫json()時指定編碼器和解碼器函式。
注意:
這些方法會讀出記憶體中所有響應的內容。如果你要讀非常多的資料,考慮使用流式響應方法進行讀取。請看之後的文件。
獲取流式響應內容
read(), json(), text()等方法使用起來很方便,但也要注意謹慎地使用。上述方法會將所有的響應內容載入到記憶體。舉個例子,如果你要下載幾個G的檔案,這些方法還是會將所有內容都載入到記憶體,記憶體會表示”臣妾做不到啊~”(如果記憶體不夠的話)。作為代替你可以用content屬性。content其實是 aiohttp.StreamReader類的例項。gzip
和deflate
傳輸編碼同樣會自動解碼。
async with session.get('https://api.github.com/events') as resp: await resp.content.read(10)
一般情況下你可以使用下列模式將內容儲存在一個檔案中:
with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)
在使用content讀了資料後,就不要在用read(), json(), text()了。
獲取請求資訊
ClientResponse(客戶端響應)物件含有request_info(請求資訊),主要是url和headers資訊。 raise_for_status結構體上的資訊會被複制給ClientResponseError例項。
自定義Headers
如果你需要給某個請求新增HTTP頭,可以使用headers引數,傳遞一個dict物件即可。
比如,如果你想給之前的例子指定 content-type可以這樣:
import jsonurl = 'https://api.github.com/some/endpoint'payload = {'some': 'data'}headers = {'content-type': 'application/json'}await session.post(url, data=json.dumps(payload), headers=headers)
自定義Cookies
傳送你自己的cookies給伺服器,你可以為ClientSession物件指定cookies引數:
url = 'http://httpbin.org/cookies'cookies = {'cookies_are': 'working'}async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}
注意:
訪問
httpbin.org/cookies
會看到以JSON形式返回的cookies。查閱會話中的cookies請看ClientSession.cookie_jar。
發起更復雜的POST請求
一般來說,如果你想以表單形式傳送一些資料 - 就像HTML表單。那麼只需要簡單的將一個dict通過data引數傳遞就可以。傳遞的dict資料會自動編碼:
payload = {'key1': 'value1', 'key2': 'value2'}async with session.post('http://httpbin.org/post', data=payload) as resp: print(await resp.text()){ ... "form": { "key2": "value2", "key1": "value1" }, ...}
如果你想傳送非表單形式的資料你可用str(字串)
代替dict(字典)
。這些資料會直接傳送出去。
例如,GitHub API v3 接受JSON編碼POST/PATCH資料:
import jsonurl = 'https://api.github.com/some/endpoint'payload = {'some': 'data'}async with session.post(url, data=json.dumps(payload)) as resp: ...
傳送多部分編碼檔案(Multipart-Encoded)
上傳多部分編碼檔案:
url = 'http://httpbin.org/post'files = {'file': open('report.xls', 'rb')}await session.post(url, data=files)
你也可以顯式地設定檔名,檔案型別:
url = 'http://httpbin.org/post'data = FormData()data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')await session.post(url, data=data)
如果你把一個檔案物件傳遞給data引數,aiohttp會自動將其以流的形式上傳。檢視StreamReader以獲取支援的格式資訊。
參見:
流式上傳
aiohttp 支援多種形式的流式上傳,允許你直接傳送大檔案而不必讀到記憶體。
下面是個簡單的例子,提供類檔案物件即可:
with open('massive-body', 'rb') as f: await session.post('http://httpbin.org/post', data=f)
或者你也可以使用aiohttp.streamer物件:
@aiohttp.streamerdef file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(2**16) while chunk: yield from writer.write(chunk) chunk = f.read(2**16)# 之後你可以使用’file_sender‘傳遞給data:async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())
同樣可以使用StreamReader物件.
我們來看下如何把來自於另一個請求的內容作為檔案上傳並計算其SHA1值:
async def feed_stream(resp, stream): h = hashlib.sha256() while True: chunk = await resp.content.readany() if not chunk: break h.update(chunk) stream.feed_data(chunk) return h.hexdigest()resp = session.get('http://httpbin.org/post')stream = StreamReader()loop.create_task(session.post('http://httpbin.org/post', data=stream))file_hash = await feed_stream(resp, stream)
因為響應物件的content屬性是一個StreamReader
例項,所以你可以將get和post請求連在一起用:
r = await session.get('http://python.org')await session.post('http://httpbin.org/post', data=r.content)
上傳預壓縮過的資料
上傳一個已經壓縮過的資料,需要為Headers中的Content-Encoding
指定演算法名(通常是deflate或者是zlib).
async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass
持久連線(keep-alive), 連線池和cookies共享
ClientSession可以在多個請求之間共享cookies:
async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'
你也可以為所有的會話請求設定headers:
async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] == \ 'Basic bG9naW46cGFzcw=='
ClientSession支援持久連線和連線池,可直接使用,不需要額外操作。
安全cookies
ClientSession中的預設的aiohttp.CookiesJar使用的是嚴苛模式,RFC 2109明確禁止使用ip地址形式的URL攜帶cookies資訊。比如: http://127.0.0.1:80/cookie
這樣很好,不過有些時候我們測試時需要允許攜帶cookies。在aiohttp.CookiesJar中傳遞unsafe=True來實現這一效果:
jar = aiohttp.CookieJar(unsafe=True)session = aiohttp.ClientSession(cookie_jar=jar)
使用虛假Cookie Jar
有時不想處理cookie。這時可以在會話中使用aiohttp.DummyCookieJar來達到目的。
jar = aiohttp.DummyCookieJar()session = aiohttp.ClientSession(cookie_jar=jar)
使用聯結器
想要調整請求的傳輸層你可以為ClientSession及其同類元件傳遞自定義的聯結器。例如:
conn = aiohttp.TCPConnector()session = aiohttp.ClientSession(connector=conn)
註解:
不要給多個會話物件使用同一個聯結器,某一會話物件擁有其所有權。
參見:
檢視聯結器部分了解更多不同的聯結器型別和配置選項資訊。
限制連線池的容量
限制同一時間開啟的連線數可以傳遞limit引數:
conn = aiohttp.TCPConnector(limit=30)
這樣就將總數限制在30.
預設情況下是100.
如果你不想有限制,傳遞0即可:
conn = aiohttp.TCPConnector(limit=0)
限制同一時間在同一個端點((host
, port
, is_ssl
) 3者都一樣的情況)開啟的連線數可指定limit_per_host引數:
conn = aiohttp.TCPConnector(limit_per_host=30)
這樣會限制在30.
預設情況下是0(也就是不做限制)。
使用自定義域名伺服器
底層需要aiodns支援:
from aiohttp.resolver import AsyncResolverresolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])conn = aiohttp.TCPConnector(resolver=resolver)
為TCP sockets新增SSL控制:
預設情況下aiohttp總會對使用了HTTPS協議(的URL請求)查驗其身份。但也可將verify_ssl設定為False
讓其不檢查:
r = await session.get('https://example.com', verify_ssl=False)
如果你需要設定自定義SSL資訊(比如使用自己的證書檔案)你可以建立一個ssl.SSLContext例項並傳遞到ClientSession中:
sslcontext = ssl.create_default_context( cafile='/path/to/ca-bundle.crt')r = await session.get('https://example.com', ssl_context=sslcontext)
如果你要驗證自簽名的證書,你也可以用之前的例子做同樣的事,但是用的是load_cert_chain():
sslcontext = ssl.create_default_context( cafile='/path/to/ca-bundle.crt')sslcontext.load_cert_chain('/path/to/client/public/device.pem', '/path/to/client/private/device.jey')r = await session.get('https://example.com', ssl_context=sslcontext)
SSL驗證失敗時丟擲的錯誤:
aiohttp.ClientConnectorSSLError:
try: await session.get('https://expired.badssl.com/')except aiohttp.ClientConnectorSSLError as e: assert isinstance(e, ssl.SSLError)
aiohttp.ClientConnectorCertificateError:
try: await session.get('https://wrong.host.badssl.com/')except aiohttp.ClientConnectorCertificateError as e: assert isinstance(e, ssl.CertificateError)
如果你需要忽略所有SSL的錯誤:
aiohttp.ClientSSLError:
try: await session.get('https://expired.badssl.com/')except aiohttp.ClientSSLError as e: assert isinstance(e, ssl.SSLError)try: await session.get('https://wrong.host.badssl.com/')except aiohttp.ClientSSLError as e: assert isinstance(e, ssl.CertificateError)
你還可以通過SHA256指紋驗證證書:
# Attempt to connect to https://www.python.org# with a pin to a bogus certificate:bad_fingerprint = b'0'*64exc = Nonetry: r = await session.get('https://www.python.org', fingerprint=bad_fingerprint)except aiohttp.FingerprintMismatch as e: exc = eassert exc is not Noneassert exc.expected == bad_fingerprint# www.python.org cert's actual fingerprintassert exc.got == b'...'
注意這是以DER編碼的證書的指紋。如果你的證書是PEM編碼,你需要轉換成DER格式:
openssl x509 -in crt.pem -inform PEM -outform DER > crt.der
註解:
提示: 從16進位制數字轉換成二進位制位元組碼,你可以用binascii.unhexlify().
TCPConnector中設定的verify_ssl, fingerprint和ssl_context都會被當做預設的verify_ssl, fingerprint和ssl_context,ClientSession或其他同類元件中的設定會覆蓋預設值。
警告:
verify_ssl 和 ssl_context是互斥的。
MD5和SHA1指紋雖不贊成使用但是是支援的 - 這倆是非常不安全的雜湊函式。
Unix 域套接字
如果你的伺服器使用UNIX域套接字你可以用UnixConnector:
conn = aiohttp.UnixConnector(path='/path/to/socket')session = aiohttp.ClientSession(connector=conn)
代理支援
aiohttp 支援 HTTP/HTTPS形式的代理。你需要使用proxy引數:
async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)
同時支援認證代理:
async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)
也可將代理的驗證資訊放在url中:
session.get("http://python.org", proxy="http://user:[email protected]")
與requests(另一個廣受歡迎的http包)
不同,aiohttp預設不會讀取環境變數中的代理值。但你可以通過傳遞trust_env=True
來讓aiohttp.ClientSession讀取HTTP_PROXY或HTTPS_PROXY環境變數中的代理資訊(不區分大小寫)。
async with aiohttp.ClientSession() as session: async with session.get("http://python.org", trust_env=True) as resp: print(resp.status)
檢視響應狀態碼
我們可以查詢響應狀態碼:
async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200
獲取響應頭資訊
我們可以檢視伺服器的響應資訊, ClientResponse.headers使用的資料型別是CIMultiDcitProxy:
>>> resp.headers{'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}
這是一個特別的字典,它只為HTTP頭資訊而生。根據 RFC 7230,HTTP頭資訊中的名字是不分割槽大小寫的。同時也支援多個不同的值對應同一個鍵。
所以我們可以通過任意形式訪問它:
>>> resp.headers['Content-Type']'application/json'>>> resp.headers.get('content-type')'application/json'
所有的header資訊都是由二進位制資料轉換而來,使用帶有surrogateescape
選項的UTF-8編碼方式(surrogateescape是一種錯誤處理方式,詳情看))。大部分時候都可以很好的工作,但如果伺服器使用的不是標準編碼就不能正常解碼了。從 RFC 7230的角度來看這樣的headers並不是合理的格式,你可以用ClientReponse.resp.raw_headers來檢視原形:
>>> resp.raw_headers((b'SERVER', b'nginx'), (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'), (b'CONTENT-TYPE', b'text/html; charset=utf-8'), (b'CONTENT-LENGTH', b'12150'), (b'CONNECTION', b'keep-alive'))
獲取響應cookies:
如果某響應包含一些Cookies,你可以很容易地訪問他們:
url = 'http://example.com/some/cookie/setting/url'async with session.get(url) as resp: print(resp.cookies['example_cookie_name'])
注意:
響應中的cookies只包含重定向鏈中最後一個請求中的
Set-Cookies
頭資訊設定的值。如果每一次重定向請求都收集一次cookies請使用 aiohttp.ClientSession物件.
獲取響應歷史
如果一個請求被重定向了,你可以用history屬性檢視其之前的響應:
>>> resp = await session.get('http://example.com/some/redirect/')>>> resp<ClientResponse(http://example.com/some/other/url/) [200]>>>> resp.history(<ClientResponse(http://example.com/some/redirect/) [301]>,)
如果沒有重定向或allow_redirects
設定為False
,history會被設定為空。
使用WebSockets
aiohttp提供開箱即用的客戶端websocket。
你需要使用aiohttp.ClientSession.ws_connect()協程物件。它的第一個引數接受URL,返回值是ClientWebSocketResponse,這樣你就可以用響應的方法與websocket伺服器進行通訊。
session = aiohttp.ClientSession()async with session.ws_connect('http://example.org/websocket') as ws: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: if msg.data == 'close cmd': await ws.close() break else: await ws.send_str(msg.data + '/answer') elif msg.type == aiohttp.WSMsgType.CLOSED: break elif msg.type == aiohttp.WSMsgType.ERROR: break
你只能使用一種讀取方式(例如await ws.receive()
或者 async for msg in ws:
)和寫入方法,但可以有多個寫入任務,寫入任務也是非同步完成的(ws.send_str('data')
)。
設定超時
預設情況下每個IO操作有5分鐘超時時間。可以通過給ClientSession.get()及其同類元件傳遞timeout
來覆蓋原超時時間:
async with session.get('https://github.com', timeout=60) as r: ...
None
或者0
則表示不檢測超時。
還可通過呼叫async_timeout.timeout上下文管理器來為連線和解析響應內容新增一個總超時時間:
import async_timeoutwith async_timeout.timeout(0.001): async with session.get('https://github.com') as r: await r.text()
注意:
超時時間是累計的,包含如傳送情況,重定向,響應解析,處理響應等所有操作在內…
愉快地結束:
當一個包含ClientSession
的async with
程式碼塊的末尾行結束時(或直接呼叫了.close()
),因為asyncio內部的一些原因底層的連線其實沒有關閉。在實際使用中,底層連線需要有一個緩衝時間來關閉。然而,如果事件迴圈在底層連線關閉之前就結束了,那麼會丟擲一個 資源警告: 存在未關閉的傳輸(通道)(ResourceWarning: unclosed transport
),如果警告可用的話。
為了避免這種情況,在關閉事件迴圈前加入一小段延遲讓底層連線得到關閉的緩衝時間。
對於非SSL的ClientSession
, 使用0即可(await asyncio.sleep(0)
):
async def read_website(): async with aiohttp.ClientSession() as session: async with session.get('http://example.org/') as response: await response.read()loop = asyncio.get_event_loop()loop.run_until_complete(read_website())# Zero-sleep to allow underlying connections to closeloop.run_until_complete(asyncio.sleep(0))loop.close()
對於使用了SSL的ClientSession
, 需要設定一小段合適的時間:
...# Wait 250 ms for the underlying SSL connections to closeloop.run_until_complete(asyncio.sleep(0.250))loop.close()
合適的時間因應用程式而異。
當asyncio內部的執行機制改變時就可以讓aiohttp去等待底層連線關閉在退出啦,上面這種額外的方法總會廢棄啦。你也可以跟進問題#1925來參與改進。