Tornado 方便的非阻塞套接字包裝器

2022-03-10 11:09 更新

用于寫入和讀取非阻塞文件和套接字的實(shí)用程序類。

主要有:

?BaseIOStream?:用于讀寫的通用接口。

?IOStream?:使用非阻塞套接字實(shí)現(xiàn) BaseIOStream。

?SSLIOStream?:支持 SSL 的 IOStream 版本。

?PipeIOStream?:基于管道的 IOStream 實(shí)現(xiàn)。

基本內(nèi)容

class tornado.iostream.BaseIOStream(max_buffer_size: Optional[int] = None, read_chunk_size: Optional[int] = None, max_write_buffer_size: Optional[int] = None)

用于寫入和讀取非阻塞文件或套接字的實(shí)用程序類。

我們支持非阻塞 ?write()? 和一系列 ?read_*()? 方法。操作完成后,?Awaitable將解析為讀取的數(shù)據(jù)(或 ?None用于 ?write()?)。當(dāng)流關(guān)閉時(shí),所有未完成的 ?Awaitables將使用 ?StreamClosedError解決;BaseIOStream.set_close_callback也可用于通知關(guān)閉的流。

當(dāng)流因錯(cuò)誤而關(guān)閉時(shí),?IOStream的錯(cuò)誤屬性包含異常對象。

子類必須實(shí)現(xiàn) ?fileno?、?close_fd?、?write_to_fd?、?read_from_fd和可選的 ?get_fd_error

?BaseIOStream構(gòu)造函數(shù)。

參數(shù):

?max_buffer_size– 要緩沖的最大傳入數(shù)據(jù)量;默認(rèn)為 100MB。

?read_chunk_size– 一次從底層傳輸讀取的數(shù)據(jù)量;默認(rèn)為 64KB。

?max_write_buffer_size– 輸出到緩沖區(qū)的數(shù)據(jù)量;默認(rèn)為無限制。

在 4.0 版更改: 添加 ?max_write_buffer_size參數(shù)。將默認(rèn) ?read_chunk_size更改為 64KB。

在 5.0 版中更改: ?io_loop參數(shù)(自 4.1 版以來已棄用)已被刪除。

主界面

BaseIOStream.write(data: Union[bytes, memoryview]) → Future[None]

將給定數(shù)據(jù)異步寫入此流。

此方法返回一個(gè) ?Future?,它在寫入完成時(shí)解析(結(jié)果為 ?None?)。

?data?參數(shù)可以是?bytes?或?memoryview?類型。

在 4.0 版更改: 如果沒有給出回調(diào),現(xiàn)在返回 ?Future?。

在 4.5 版更改: 添加了對 ?memoryview參數(shù)的支持。

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的 ?Future?。

BaseIOStream.read_bytes(num_bytes: int, partial: bool = False) → Awaitable[bytes]

異步讀取多個(gè)字節(jié)。

如果 ?partial為真,只要我們有任何要返回的字節(jié),就會(huì)返回?cái)?shù)據(jù)(但永遠(yuǎn)不會(huì)超過 ?num_bytes?)

在 4.0 版更改: 添加了部分參數(shù)。 回調(diào)參數(shù)現(xiàn)在是可選的,如果省略,將返回 ?Future?。

在 6.0 版更改: ?callback和 ?streaming_callback參數(shù)已被刪除。 改用返回的 ?Future?(對于 ?streaming_callback?,?partial=True?)。

BaseIOStream.read_into(buf: bytearray, partial: bool = False) → Awaitable[int]

異步讀取多個(gè)字節(jié)。

?buf必須是讀取數(shù)據(jù)的可寫緩沖區(qū)。

如果 ?partial為真,則在讀取任何字節(jié)后立即運(yùn)行回調(diào)。 否則,當(dāng) ?buf完全被讀取數(shù)據(jù)填充時(shí)運(yùn)行。

5.0 版中的新功能。

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的Future?。

BaseIOStream.read_until(delimiter: bytes, max_bytes: Optional[int] = None) → Awaitable[bytes]

異步讀取,直到我們找到給定的分隔符。

結(jié)果包括讀取的所有數(shù)據(jù),包括分隔符。

如果 ?max_bytes不是 ?None?,如果讀取的字節(jié)數(shù)超過 ?max_bytes并且找不到分隔符,則連接將被關(guān)閉。

在 4.0 版更改: 添加了 ?max_bytes參數(shù)。 回調(diào)參數(shù)現(xiàn)在是可選的,如果省略,將返回 ?Future

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的 ?Future。

BaseIOStream.read_until_regex(regex: bytes, max_bytes: Optional[int] = None) → Awaitable[bytes]

異步讀取,直到我們匹配給定的正則表達(dá)式。

結(jié)果包括與正則表達(dá)式匹配的數(shù)據(jù)以及它之前的任何內(nèi)容。

如果 ?max_bytes不是 ?None?,如果讀取的字節(jié)數(shù)超過 max_bytes 并且不滿足正則表達(dá)式,則連接將被關(guān)閉。

在 4.0 版更改: 添加了 max_bytes 參數(shù)。 回調(diào)參數(shù)現(xiàn)在是可選的,如果省略,將返回 Future。

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的 Future。

BaseIOStream.read_until_close() → Awaitable[bytes]

從套接字異步讀取所有數(shù)據(jù),直到它關(guān)閉。

這將緩沖所有可用數(shù)據(jù),直到達(dá)到 ?max_buffer_size?。 如果需要流量控制或取消,請使用帶有 ?read_bytes(partial=True)? 的循環(huán)。

在 4.0 版更改: 回調(diào)參數(shù)現(xiàn)在是可選的,如果省略,將返回 ?Future?。

在 6.0 版更改: ?callback和 ?streaming_callback參數(shù)已被刪除。 改為使用返回的 ?Future?(以及對于 ?streaming_callback的 ?read_bytes和 ?partial=True?)。

BaseIOStream.close(exc_info: Union[None, bool, BaseException, Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]]] = False) → None

關(guān)閉這個(gè)Stream

如果 ?exc_info為真,則將錯(cuò)誤屬性設(shè)置為 ?sys.exc_info? 中的當(dāng)前異常(或者如果 ?exc_info是一個(gè)元組,則使用它而不是 ?sys.exc_info?)。

BaseIOStream.set_close_callback(callback: Optional[Callable[[], None]]) → None

當(dāng)Stream關(guān)閉時(shí)調(diào)用給定的回調(diào)。

這對于使用 Future 接口的應(yīng)用程序來說是不必要的; 當(dāng)Stream關(guān)閉時(shí),所有未完成的 Future 都將使用 ?StreamClosedError解決。 但是,它仍然是一種有用的方式來表示流已關(guān)閉,而沒有其他讀取或?qū)懭胝谶M(jìn)行。

與其他基于回調(diào)的接口不同,?set_close_callback在 Tornado 6.0 中沒有被移除。

BaseIOStream.closed() → bool

如果Stream已關(guān)閉,則返回 ?True?

BaseIOStream.reading() → bool

如果我們當(dāng)前正在從Stream中讀取,則返回 ?True?

BaseIOStream.writing() → bool

如果我們當(dāng)前正在寫入Stream,則返回 ?True?。

BaseIOStream.set_nodelay(value: bool) → None

為此Stream設(shè)置無延遲標(biāo)志。

默認(rèn)情況下,寫入 TCP Stream的數(shù)據(jù)可能會(huì)保留一段時(shí)間,以最有效地利用帶寬(根據(jù) Nagle 算法)。 無延遲標(biāo)志要求盡快寫入數(shù)據(jù),即使這樣做會(huì)消耗額外的帶寬。

此標(biāo)志當(dāng)前僅針對基于 TCP 的 ?IOStream ?定義。

子類的方法

BaseIOStream.fileno() → Union[int, tornado.ioloop._Selectable]

返回此Stream的文件描述符。

BaseIOStream.close_fd() → None

關(guān)閉此Stream的基礎(chǔ)文件。

?close_fd ?由 ?BaseIOStream ?調(diào)用,不應(yīng)在其他地方調(diào)用; 其他用戶應(yīng)改為調(diào)用 ?close?。

BaseIOStream.write_to_fd(data: memoryview) → int

嘗試將數(shù)據(jù)寫入基礎(chǔ)文件。

返回寫入的字節(jié)數(shù)。

BaseIOStream.read_from_fd(buf: Union[bytearray, memoryview]) → Optional[int]

嘗試從基礎(chǔ)文件中讀取。

最多讀取 ?len(buf)? 個(gè)字節(jié),并將它們存儲(chǔ)在緩沖區(qū)中。 返回讀取的字節(jié)數(shù)。 如果沒有要讀取的內(nèi)容(套接字返回 ?EWOULDBLOCK? 或等效項(xiàng)),則返回 ?None?,在 ?EOF上返回零。

在 5.0 版更改: 重新設(shè)計(jì)接口以獲取緩沖區(qū)并返回多個(gè)字節(jié)而不是新分配的對象。

BaseIOStream.get_fd_error() → Optional[Exception]

返回有關(guān)基礎(chǔ)文件上的任何錯(cuò)誤的信息。

此方法在 ?IOLoop發(fā)出文件描述符上的錯(cuò)誤信號(hào)后調(diào)用,并且應(yīng)返回異常(例如帶有附加信息的 ?socket.error?,如果沒有此類信息可用,則返回 ?None?。

實(shí)現(xiàn)

class tornado.iostream.IOStream(socket: socket.socket, *args, **kwargs)

基于套接字的 ?IOStream ?實(shí)現(xiàn)。

此類支持來自 ?BaseIOStream ?的讀取和寫入方法以及連接方法。

套接字參數(shù)可以是已連接的或未連接的。 對于服務(wù)器操作,套接字是調(diào)用 ?socket.accept? 的結(jié)果。 對于客戶端操作,套接字是使用 ?socket.socket? 創(chuàng)建的,并且可以在將其傳遞給 ?IOStream之前進(jìn)行連接,也可以使用 ?IOStream.connect? 進(jìn)行連接。

一個(gè)使用此類的非常簡單(且損壞)的 HTTP 客戶端:

import tornado.ioloop
import tornado.iostream
import socket

async def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    stream = tornado.iostream.IOStream(s)
    await stream.connect(("friendfeed.com", 80))
    await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
    header_data = await stream.read_until(b"\r\n\r\n")
    headers = {}
    for line in header_data.split(b"\r\n"):
        parts = line.split(b":")
        if len(parts) == 2:
            headers[parts[0].strip()] = parts[1].strip()
    body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
    print(body_data)
    stream.close()

if __name__ == '__main__':
    tornado.ioloop.IOLoop.current().run_sync(main)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    stream = tornado.iostream.IOStream(s)
    stream.connect(("friendfeed.com", 80), send_request)
    tornado.ioloop.IOLoop.current().start()

connect(address: Any, server_hostname: Optional[str] = None) → Future[_IOStreamType]

將套接字連接到遠(yuǎn)程地址而不阻塞。

僅當(dāng)傳遞給構(gòu)造函數(shù)的套接字先前未連接時(shí)才可以調(diào)用。 ?address參數(shù)的格式與 ?socket.connect? 的格式相同,用于傳遞給 ?IOStream構(gòu)造函數(shù)的套接字類型,例如 一個(gè)?(ip,port)?元組。 此處接受主機(jī)名,但將同步解析并阻止 ?IOLoop?。 如果您有主機(jī)名而不是 IP 地址,則建議使用 ?TCPClient類而不是直接調(diào)用此方法。 ?TCPClient將進(jìn)行異步 DNS 解析并同時(shí)處理 IPv4 和 IPv6。

如果指定了回調(diào),則連接完成時(shí)將不帶參數(shù)調(diào)用; 如果不是,則此方法返回一個(gè) Future(成功連接后的結(jié)果將是Stream本身)。

在 SSL 模式下,?server_hostname參數(shù)將用于證書驗(yàn)證(除非在 ?ssl_options中禁用)和 ?SNI(如果支持;需要 Python 2.7.9+)。

請注意,在連接掛起時(shí)調(diào)用 ?IOStream.write? 是安全的,在這種情況下,一旦連接準(zhǔn)備好,數(shù)據(jù)就會(huì)被寫入。 在連接套接字之前調(diào)用 ?IOStream讀取方法適用于某些平臺(tái),但不可移植。

在 4.0 版更改: 如果沒有給出回調(diào),則返回 ?Future?。

在 4.2 版更改:默認(rèn)情況下驗(yàn)證 SSL 證書; 將 ?ssl_options=dict(cert_reqs=ssl.CERT_NONE)或適當(dāng)配置的 ?ssl.SSLContext? 傳遞給 ?SSLIOStream? 構(gòu)造函數(shù)以禁用。

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的 ?Future?。

start_tls(server_side: bool, ssl_options: Union[Dict[str, Any], ssl.SSLContext, None] = None, server_hostname: Optional[str] = None) → Awaitable[tornado.iostream.SSLIOStream]

將此?IOStream?轉(zhuǎn)換為?SSLIOStream?。

這使得能夠在清除文本模式下以清晰文本模式開頭的協(xié)議,并在一些初始協(xié)商后切換到SSL(例如?STARTTLS?擴(kuò)展到?SMTP?和?IMAP?)。

如果在?Stream?上有未完成的讀取或?qū)懭?,或者如果?IOStream?的緩沖區(qū)中存在任何數(shù)據(jù)(允許操作系統(tǒng)的套接字緩沖區(qū)中的數(shù)據(jù)),則無法使用此方法。這意味著它通常必須在閱讀或?qū)懭胱詈笠粋€(gè)清晰文本數(shù)據(jù)之后立即使用。在任何讀取或?qū)懭胫埃部梢粤⒓词褂盟?/p>

?ssl_options?參數(shù)可以是?ssl.slcontext?對象或?ssl.wrap_socket?函數(shù)的關(guān)鍵字參數(shù)字典。除非在?SSL_Options?中禁用,否則?Server_Hostname?參數(shù)將用于證書驗(yàn)證。

此方法返回一個(gè)?future?,其結(jié)果是新的?SSLIOStream?。在調(diào)用此方法后,原始?Stream?上的任何其他操作都未定義。

如果在此?Stream?上定義了關(guān)閉回調(diào),則將傳輸?shù)叫?Stream?。

4.0版中的新增功能。

在4.2版中更改:默認(rèn)情況下驗(yàn)證SSL證書; ?pass ssl_options = dict(cert_reqs = ssl.cert_none)?或適當(dāng)配置的?ssl.slcontext?禁用。

class tornado.iostream.SSLIOStream(*args, **kwargs)

用于寫入和讀取非阻塞 SSL 套接字的實(shí)用程序類。

如果傳遞給構(gòu)造函數(shù)的套接字已經(jīng)連接,則應(yīng)使用以下內(nèi)容進(jìn)行包裝:

ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)

在構(gòu)造 ?SSLIOStream之前。 當(dāng) ?IOStream.connect?完成時(shí),未連接的套接字將被包裝。

?ssl_options關(guān)鍵字參數(shù)可以是 ?ssl.SSLContext? 對象或 ?ssl.wrap_socket? 的關(guān)鍵字參數(shù)字典

wait_for_handshake() → Future[SSLIOStream]

等待初始 SSL 握手完成。

如果給定了回調(diào),則在握手完成后將不帶參數(shù)地調(diào)用它; 否則,此方法返回一個(gè) ?Future?,它將在握手完成后解析為?Stream?本身。

握手完成后,可以在 ?self.socket? 上訪問對等方的證書和 ?NPN/ALPN? 選擇等信息。

此方法旨在用于服務(wù)器端?Stream?或使用 ?IOStream.start_tls? 之后; 它不應(yīng)該與 ?IOStream.connect? 一起使用(它已經(jīng)在等待握手完成)。 每個(gè)流只能調(diào)用一次。

4.2 版中的新功能。

在 6.0 版更改: 回調(diào)參數(shù)已刪除。 改用返回的 ?Future?。

class tornado.iostream.PipeIOStream(fd: int, *args, **kwargs)

基于管道的 ?IOStream實(shí)現(xiàn)。

構(gòu)造函數(shù)采用整數(shù)文件描述符(例如 ?os.pipe? 返回的描述符)而不是打開的文件對象。 管道通常是單向的,因此 ?PipeIOStream可用于讀取或?qū)懭?,但不能同時(shí)用于兩者。

?PipeIOStream僅在基于Unix的平臺(tái)上可用。

例外

exception tornado.iostream.StreamBufferFullError

緩沖區(qū)已滿時(shí) ?IOStream方法引發(fā)的異常。

exception tornado.iostream.StreamClosedError(real_error: Optional[BaseException] = None)

關(guān)閉 ?Stream ?時(shí) ?IOStream ?方法引發(fā)的異常。

請注意,關(guān)閉回調(diào)計(jì)劃在?Stream?上的其他回調(diào)之后運(yùn)行(以允許處理緩沖數(shù)據(jù)),因此您可能會(huì)在看到關(guān)閉回調(diào)之前看到此錯(cuò)誤。

?real_error屬性包含導(dǎo)致?Stream?關(guān)閉的基礎(chǔ)錯(cuò)誤(如果有)。

在 4.3 版更改: 添加了 ?real_error屬性。

exception tornado.iostream.UnsatisfiableReadError

無法滿足讀取時(shí)引發(fā)異常。

由帶有 ?max_bytes?參數(shù)的 ?read_until和 ?read_until_regex引發(fā)。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)