在同步Rust中,流核心抽象是Iterator。它提供了一種在序列中產生項的方法,並在它們之間進行阻塞。通過將迭代器傳遞給其他迭代器的構造函數來完成組合,這使得我們可以毫不費力地將事物連接在一起。
在非同步Rust中與同步Rust中Read和Write對應的是AsyncRead和AsyncWrite。這些特質表示未解析的位元組,通常直接來自IO層(例如來自套接字或文件)。
Rust流具有其他語言的一些最佳功能。例如:他們通過利用Rust的特質系統來迴避Node.js的Duplex流中看到的遺留問題。也同時實施背壓和惰性迭代,這提高了效率。最重要的是,Rust流允許使用相同類型的非同步迭代。
在這裡有很多東西需要拆開分析。我們來挖掘吧!
Duplex
Duplex
始終使用AsyncRead + AsyncWrite
實現。這與其他語言不同。一個關鍵的區別是,使用Rust的特質系統,我們可以避免困擾其他語言的多個遺留問題.duplex
流的示例包括套接字和文件。
through
through
流使用任一AsyncRead
或Stream
實現。通過將另一個流的through傳遞到他的構造函數中使得數據流從一個流流向另一個流。
在Rust中,source
和through
唯一的區別在Traits
是如何使用的,而不是在Traits
定義本身。一個例子:
let s = b"hello planet" ; // source (AsyncRead)
let s = gzip ::compress ( s ). await ? ; // through (AsyncRead)
let s = my_protocol ::parse ( f ). await ? ; // through (Stream)
asyncread vs stream
AsyncRead
和Stream
另一個有趣的區別是。兩種流都可以對byte
進行操作。但不同之處在於AsyncRead
是一個對借來的數據進行操作的位元組流。 而Stream
是一個對擁有數據進行操作的對象流。這就是說Stream
可以對任何類型的數據進行操作,而不僅僅是位元組。
雖然AsyncRead
和Stream
都可以對位元組進行操作,但AsyncRead
會生成未解析的數據,而Stream
會生成已解析的數據。 不同之處在於,使用Stream
,每個產生的項通常可以單獨轉換為有效的消息。 使用AsyncRead
時,我們可能需要請求更多數據。
AsyncRead
的示例包括文件,套接字和HTTP正文。 Stream
的示例包括ndjsonlines和protobuf
消息。
AsyncRead
和Stream
之間的關係等同於標準庫的Read
和Iterator
特質之間的關係。 在下面的示例中,我們使用split將任意數量的位元組轉換為單獨的位元組行。 我們用Trait
和yield
類型標記了每一行:
use std ::io ;
let f = io ::File ::open ( "foo.txt" ) ? ; // Read<[u8]>
let f = io ::BufReader ::new ( f ); // Read<[u8]>
for buf in f . split ( b
) { // Iterator<[u8]>
println ! ( "{}" , buf );
}
相同的數據類型。不同的特質。
不幸的是,AsyncRead.split做了一些根本不同的事情,所以這個例子不能直接複製(下面有更多關於split
的內容)。所以不要嘗試在async Rust
中寫這個。
sinks
流的工作方式是 在管道流的末尾,有一個sink
或迭代器從流中請求項。這意味著管道流只會在請求時生成數據。這通常被稱為延遲迭代
或具有背壓的流
。
目前,沒有專門的語法來循環流。相反,建議使用while let Some
循環:
let stream = my_protocol ::parse ( f ). await ? ;
while let Some ( item ) in stream . next (). await {
println ! ( "{:?}" , item );
}
現在我們對Rust的流概念的特性有了更好的了解,我們已經準備好了解如何創建管道流。
管道
基於流的編程的主要內容之一是能夠將流組合在一起。在shell中,您可以使用在一起管道程序|,在Node.js中,您可以使用相同的方法.pipe。典型的shell示例如下所示:
cat foo.txt | gzip > foo.txt.gz
上面的示例從foo.txt
中讀取數據,管道然後gzip
以壓縮數據,並將結果寫回新文件。
Rust流有一個非常相似的模型。事實上,我們可以想像在Rust
中編寫相同的代碼:
use runtime ::fs ::File ;
File ::open ( "foo.txt" )
. and_then ( | s | gzip ::compress ( s ))
. and_then ( | s | word_count ::bytes ( s ))
. and_then ( | s | s . copy_into ( File ::create ( "foo.txt.gz" )))
. await ? ;
此代碼示例今天不會運行,因為有些包還未存在。但它很好地說明了Rust
的流在實踐中如何運作。我們可以抽象地表達管道如下:
┌───────────┐ ┌───────────┐ ┌────────────┐
│ AsyncRead │──>│ AsyncRead │──>│ AsyncWrite │
└───────────┘ └───────────┘ └────────────┘
數據從源文件,通過壓縮器進入目標文件。不同的管道將使用不同的組合AsyncRead
和 Sink
。但是在所有模式中,將最後一個流傳遞給下一個構造函數是常見的,直到我們到達接收器。
管道雙工流
當涉及雙工流時,流模型變得有點棘手。讓我們假裝我們打開一個實現AsyncRead
+ AsyncWrite
的套接字:
let mut sock = Socket ::new ( "localhost:3000" );
dbg ! ( sock ) // implements AsyncRead + AsyncWrite
我們希望從套接字讀取數據,對每個值進行操作,並將數據寫回套接字。在Rust中,這會讓我們陷入困境,因為我們無法在兩個地方保持對相同值的可變引用。所以雙工流有一個方便的split
方法將套接字分成Read/Write
(讀寫)各半:
let mut sock = Socket ::new ( "localhost:3000" );
let ( reader , writer ) = & mut sock . split ();
將AsyncRead
管道化為AsyncWrite
在上面的示例中,Socket
雙工是源和接收器。這些方法都沒有包裝另一個流。有時我們只對流的讀取或寫入一半感興趣。這就是為什麼Duplex
流在其構造函數中採用其他流的原因並不常見。
那麼我們如何寫數據呢?
嗯,Rust
方便地有一個copy_into組合器用於這個目的。它從AsyncRead
獲取數據,並將其寫入AsyncWrite
:
let mut sock = Socket ::new ( "localhost:3000" );
let ( reader , writer ) = & mut sock . split ();
reader . copy_into ( writer ). await ? ;
將流管道化為AsyncWrite
如果我們想從Stream
到AsyncWrite
寫數據,事情就變得很有點棘手。首先Stream
應該輸出位元組(&[u8]
或Vec<u8>
),因為IO設備只能讀取位元組。
但更重要的是:目前沒有可用的copy_into
組合器!但我們可以通過轉換Stream
為AsyncRead
,然後調用copy_into
來解決這個問題:
stream
. map ( io ::Result ::Ok ) // convert each `Vec<u8>` to `Result<Vec<u8>>`
. into_async_read () // convert the stream to `AsyncRead`
. copy_into ( writer ) // copy the data to the sink
. await ? ; // start the pipeline
目前這個代碼確實遭受了雙緩衝錯誤,這使得它的效率低於實際。但是,如果 copy_into
能使Stream工作可能會具有最好效果:
stream . copy_into ( writer ). await ? ;
處理錯誤
Node.js
在引入流時犯的最大錯誤之一就是pipe
不會轉發錯誤。幸運的是,在Rust
流中,解決了這個問題,在於如何在構造函數中包裝流。這意味著流自動轉發錯誤,管道處理它們。
錯誤處理的唯一困難是錯誤類型需要排隊。在創建管道包含io::Error
錯誤以外的錯誤的時,這可能特別棘手。但是生態系統仍處於年輕狀態,模式仍在不斷湧現,所以並非所有內容都是流模式的,這一點也不足為奇。
編寫編解碼器
解析器協議通常被分成編碼器和解碼器。編碼器將結構轉換為位元組序列。解碼器將位元組轉換為結構。這可以很容易地在Rust
中建模:
/// The type were converting to and from.
pub struct MyFrame ;
/// Convert frames to bytes.
pub struct Encode ;
impl Encode {
/// Take a stream of frames, and return a stream of bytes.
pub fn new ( stream : impl Stream < Item = MyFrame > ) -> Self ;
}
impl Stream for Encode {
type Item = Result < Vec < u8 > , Error > ;
}
/// Convert bytes to frames.
pub struct Decode ;
impl Decode {
/// Take a stream of bytes, and return a stream of frames.
pub fn new ( reader : impl AsyncRead ) -> Self ;
}
impl Stream for Decode {
type Item = Result < MyFrame , Error > ;
}
存在專門的包,旨在幫助創建編解碼器。但實際上編解碼器主要是一種設計模式,編寫它們的最簡單方法是直接使用標準流特質。
注意:根據您的使用情況,您可能需要在編寫解碼器時執行一些內部緩衝。但所有需要的是一個好的(環)緩衝區抽象,crates.io
上有各種各樣的。
使用組合器的AD-HOC流
有時您希望快速操作流的輸出。它是否過濾掉您不感興趣的結果,連接項目或快速計算。Streams
組合器允許您以很少的開銷執行這些任務。
假設我們想從文件中讀取數據,並按換行分割。該lines 組合程序規定:
let mut sock = Socket ::new ( "localhost:3000" );
let ( reader , _ ) = & mut sock . split ();
// This is returns a stream of `String`
let lines = reader . lines (). await ? ;
現在如果我們想要解析這些行serde怎麼辦?提示map 組合器:
let mut sock = Socket ::new ( "localhost:3000" );
let ( reader , _ ) = & mut sock . split ();
#[derive(Deserialize)]
struct Pet {
name : String ,
}
// This returns a stream of `Result<Pet>`
let pet_stream = reader
. lines ()
. map ( | line | serde_json ::parse ::< Pet > ( line ));
另一個有趣的事實是,Vec<u8>
實現AsyncRead
和AsyncWrite
,這意味著如果你想連接一個流的所有值,就可以直接使用緩衝區。
可能會添加更多的組合器,以及需要探索的模式。但Rust
流的核心部件感覺非常穩固,隨著生態系統的發展,可以添加更多的組合器。
為什麼我們不談論接收器(Sink)特質
驚喜!還有另一個你應該知道的特質。它的名字是Sink,而且它是很奇怪的一個。大聲說出來會令人困惑(我們在談論Sync還是Sink?),但特質本身就在那裡。看看定義:
pub trait Sink < Item > {
type SinkError ;
fn poll_ready (
self : Pin <& mut Self > ,
cx : & mut Contex
) -> Poll < Result < (), Self ::SinkError >> ;
fn start_send (
self : Pin <& mut Self > ,
item : Item
) -> Result < (), Self ::SinkError > ;
fn poll_flush (
self : Pin <& mut Self > ,
cx : & mut Context
) -> Poll < Result < (), Self ::SinkError >> ;
fn poll_close (
self : Pin <& mut Self > ,
cx : & mut Context
) -> Poll < Result < (), Self ::SinkError >> ;
}
無論何時實現Sink
您都需要實現4個方法,1個關聯類型和1個范型參數。哦,還有一個強制性的內部緩衝區。因為特質定義中的所有這些方法都是特定生命周期的鉤子。通過該周期移動數據的唯一方法是在內部臨時存儲數據,並在稍後再次產生它。
也許你已經理解了,但Sink
並不簡單。它的存在理由是成為一個AsyncWrite
類型的對應物。它通常將編寫器包裝在其構造函數中,然後將類型序列化到其中。
在表面上這可能聽起來很吸引人。但實際上沒有人敢在沒有crates.io
的嚴格幫助的情況下寫下這個怪物。或許這種複雜性實際上是值得的?,這就引出了一個問題。答案越來越多似乎是一個響亮的「不」。
Sink
不會帶來任何使用3個標準流特質無法更優雅或更少儀式地解決問題的東西。所以省去一些麻煩,不要打擾Sink
。
下一步是什麼
非同步迭代語法
目前可以對流進行非同步迭代,但使用它並不一定很好。大多數用戶領域迭代流使用while let Some
循環完成
let mut listener = TcpListener ::bind ( "127.0.0.1:8081" ) ? ;
let incoming = listener . incoming ();
while let Some ( conn ) in incoming . await {
let conn = conn ? ;
/* handle connection */
}
如果我們可以將其寫為for await
循環,那就更好了:
let mut listener = TcpListener ::bind ( "127.0.0.1:8081" ) ? ;
for conn . await ? in listener . incoming () {
/* handle connection */
}
目前還不清楚何時會發生這種情況。但這絕對值得期待!
非同步特質流
說到改進,流特質本身可以做一些工作。目前這些特質與Future
特質非常相似:
pub trait AsyncRead {
fn poll_read (
self : Pin <& mut Self > ,
cx : & mut Context ,
buf : & mut [ u8 ]
) -> Poll < io ::Result < usize >> ;
}
是什麼讓這個特別棘手的是:定義self: Pin<&mut Self>
。這意味著這種方法只對Self
被 固定的實例實現。我不想讓你厭倦為什麼這很棘手,但我想提一下,最近我一直聽到關於這些特質可能簡化的對話。
原則上,流特質沒有任何關於它們的非同步。他們非同步的唯一原因是因為他們返回Future
,可能需要在內部等待其他Future
。這很重要,因為trait
一旦直接允許async
,似乎可以顯著簡化特質。
pub trait AsyncRead {
async fn read ( & mut self , buf : & mut [ u8 ]) -> io ::Result < usize > ;
}
這將是特別好的,因為這將意味著AsyncRead
,AsyncWrite
和Stream
將被定義和STD的Read
,Write
和Iterator
以相同的方式,具有唯一的區別是在方法前async
關鍵字。
pub trait Read {
fn read ( & mut self , buf : & mut [ u8 ]) -> io ::Result < usize > ;
}
但是,沒有什麼可以確定的。但我對這裡的可能性持謹慎樂觀態度。
使用yield的匿名流
談到我們如何定義流的改進,已經討論過的另一件事是為生成器添加語法。生成器可能會使用yield
關鍵字,我們可以想像一個流本質上是一個Future
的生成器。就像async/await
允許我們跳過圍繞構建Future
的樣板,yield
也一樣對於流:
async fn keep_squaring ( mut val : u64 ) -> yield u64 {
loop {
val *= 2 ;
yield val ;
}
}
for val . await in keep_squaring ( 4 ) {
dbg ! ( val );
}
這個可能會更進一步,但似乎它有可能提供一些受歡迎的工作流程改進。
零拷貝讀寫
一個很好的特性是AsyncRead
,AsyncWrite
通過poll_read_vectored和poll_write_vectored支持向量IO
。這樣可以優化特定應用程序的性能。
在將來添加可能有用的類似方法是 (poll_read_vec
和poll_write_vec
在名稱不太混淆的情況下)。這些方法允許將緩衝區直接傳遞給方法,並使用mem::swap
技巧,防止memcpy
在每個操作上執行一個額外的操作。允許我們顯著提高某些API的性能,而無需根據需要修改最終用戶API。
這在包裝同步API時尤為重要(目前這意味著:幾乎每個文件系統操作)。但更重要的是:與直接使用OS API相比,它將允許我們刪除Rust目前基於Future
IO的額外開銷。
結論
在這篇文章中,我們討論了Rust的各種非同步流,討論了常見的模式和陷阱,並展望了流的未來。
Rust流的未來非常令人興奮!如果我們能夠將管道流的人體工程學結合在一起,憑藉Rust的可靠性保證,我們將更接近使Rust成為腳本語言傳統空間的絕佳選擇。
我們希望您喜歡閱讀 Rust流! - 祝你有個美好的一周!
感謝Irina Shestak,Nemo157,David Barsky,Stjepan Glavina和Hugh Kennedy閱讀並提供有關此帖子的多次迭代的反饋,想法和意見!
推薦閱讀: