本文同步於Rust中文閱讀:Rust流(Streams),源自Rust中文營養計劃 ,時間:2019-06-21, 本文已發布在Rust中文網路點. 歡迎加入Rust中文營養計劃,共建Rust語言中文網路!

  • 本文譯者:krircc
  • 英文原文

隨著Rust的非同步故事不斷發展,Rust的流故事也在不斷發展。在這篇文章中,我們將了解Rust的流模型如何工作,如何有效地使用它,以及未來的發展方向。

Streams特質

在同步Rust中,流核心抽象是Iterator。它提供了一種在序列中產生項的方法,並在它們之間進行阻塞。通過將迭代器傳遞給其他迭代器的構造函數來完成組合,這使得我們可以毫不費力地將事物連接在一起。

在非同步Rust中,流核心抽象是Stream。它的行為非常相似Iterator,但它不是在每個項之間產生阻塞,而是允許其他任務在阻塞等待時運行。

在非同步Rust中與同步Rust中Read和Write對應的是AsyncRead和AsyncWrite。這些特質表示未解析的位元組,通常直接來自IO層(例如來自套接字或文件)。

use futures::prelude::*;
use runtime::fs::File;

let f = file::create("foo.txt").await?; // create a file
f.write_all(b"hello world").await?; // write data to the file (AsyncWrite)

let f = file::open("foo.txt").await?; // open a file
let mut buffer = Vec::new(); // init the buffer to read the data into
f.read_to_end(&mut buffer).await?; // read the whole file (AsyncRead)

Rust流具有其他語言的一些最佳功能。例如:他們通過利用Rust的特質系統來迴避Node.js的Duplex流中看到的遺留問題。也同時實施背壓和惰性迭代,這提高了效率。最重要的是,Rust流允許使用相同類型的非同步迭代。

關於Rust流還有很多值得關注的地方,儘管還有一些問題要解決。

Streams和Roles

讓我們首先列舉可以在典型系統中表達的各種流:

  • source:可以生成數據的流
  • Sink:可以消費數據的流
  • Through:消費數據,對其進行操作然後生成新數據的流
  • Duplex:流可以生成數據,也可以獨立的消費數據

建立通用術語很有用,因為Rust的流特質不會將1:1映射到這些角色。實際上,Rust的每個流特質都可以用來填充許多不同的角色。以下是每個特質可以參與的角色概述:

在這裡有很多東西需要拆開分析。我們來挖掘吧!

Duplex

Duplex始終使用AsyncRead + AsyncWrite實現。這與其他語言不同。一個關鍵的區別是,使用Rust的特質系統,我們可以避免困擾其他語言的多個遺留問題.duplex流的示例包括套接字和文件。

through

through流使用任一AsyncReadStream實現。通過將另一個流的through傳遞到他的構造函數中使得數據流從一個流流向另一個流。

在Rust中,sourcethrough唯一的區別在Traits是如何使用的,而不是在Traits定義本身。一個例子:

let s = b"hello planet"; // source (AsyncRead)
let s = gzip::compress(s).await?; // through (AsyncRead)
let s = my_protocol::parse(f).await?; // through (Stream)

asyncread vs stream

AsyncReadStream另一個有趣的區別是。兩種流都可以對byte進行操作。但不同之處在於AsyncRead是一個對借來的數據進行操作的位元組流。 而Stream是一個對擁有數據進行操作的對象流。這就是說Stream可以對任何類型的數據進行操作,而不僅僅是位元組。

雖然AsyncReadStream都可以對位元組進行操作,但AsyncRead會生成未解析的數據,而Stream會生成已解析的數據。 不同之處在於,使用Stream,每個產生的項通常可以單獨轉換為有效的消息。 使用AsyncRead時,我們可能需要請求更多數據。

AsyncRead的示例包括文件,套接字和HTTP正文。 Stream的示例包括ndjsonlines和protobuf消息。

AsyncReadStream之間的關係等同於標準庫的ReadIterator特質之間的關係。 在下面的示例中,我們使用split將任意數量的位元組轉換為單獨的位元組行。 我們用Traityield類型標記了每一行:

use std::io;

let f = io::File::open("foo.txt")?; // Read<[u8]>
let f = io::BufReader::new(f); // Read<[u8]>
for buf in f.split(b
) { // Iterator<[u8]>
println!("{}", buf);
}

相同的數據類型。不同的特質。

不幸的是,AsyncRead.split做了一些根本不同的事情,所以這個例子不能直接複製(下面有更多關於split的內容)。所以不要嘗試在async Rust中寫這個。

sinks

流的工作方式是 在管道流的末尾,有一個sink或迭代器從流中請求項。這意味著管道流只會在請求時生成數據。這通常被稱為延遲迭代具有背壓的流

目前,沒有專門的語法來循環流。相反,建議使用while let Some循環:

let stream = my_protocol::parse(f).await?;
while let Some(item) in stream.next().await {
println!("{:?}", item);
}

現在我們對Rust的流概念的特性有了更好的了解,我們已經準備好了解如何創建管道流。

管道

基於流的編程的主要內容之一是能夠將流組合在一起。在shell中,您可以使用在一起管道程序|,在Node.js中,您可以使用相同的方法.pipe。典型的shell示例如下所示:

cat foo.txt | gzip > foo.txt.gz

上面的示例從foo.txt中讀取數據,管道然後gzip以壓縮數據,並將結果寫回新文件。

Rust流有一個非常相似的模型。事實上,我們可以想像在Rust中編寫相同的代碼:

use runtime::fs::File;

File::open("foo.txt")
.and_then(|s| gzip::compress(s))
.and_then(|s| word_count::bytes(s))
.and_then(|s| s.copy_into(File::create("foo.txt.gz")))
.await?;

此代碼示例今天不會運行,因為有些包還未存在。但它很好地說明了Rust的流在實踐中如何運作。我們可以抽象地表達管道如下:

┌───────────┐ ┌───────────┐ ┌────────────┐
│ AsyncRead │──>│ AsyncRead │──>│ AsyncWrite │
└───────────┘ └───────────┘ └────────────┘

數據從源文件,通過壓縮器進入目標文件。不同的管道將使用不同的組合AsyncReadSink。但是在所有模式中,將最後一個流傳遞給下一個構造函數是常見的,直到我們到達接收器。

管道雙工流

當涉及雙工流時,流模型變得有點棘手。讓我們假裝我們打開一個實現AsyncRead+ AsyncWrite 的套接字:

let mut sock = Socket::new("localhost:3000");
dbg!(sock) // implements AsyncRead + AsyncWrite

我們希望從套接字讀取數據,對每個值進行操作,並將數據寫回套接字。在Rust中,這會讓我們陷入困境,因為我們無法在兩個地方保持對相同值的可變引用。所以雙工流有一個方便的split方法將套接字分成Read/Write(讀寫)各半:

let mut sock = Socket::new("localhost:3000");
let (reader, writer) = &mut sock.split();

AsyncRead管道化為AsyncWrite

在上面的示例中,Socket雙工是源和接收器。這些方法都沒有包裝另一個流。有時我們只對流的讀取或寫入一半感興趣。這就是為什麼Duplex流在其構造函數中採用其他流的原因並不常見。

那麼我們如何寫數據呢?

嗯,Rust方便地有一個copy_into組合器用於這個目的。它從AsyncRead獲取數據,並將其寫入AsyncWrite

let mut sock = Socket::new("localhost:3000");
let (reader, writer) = &mut sock.split();
reader.copy_into(writer).await?;

將流管道化為AsyncWrite

如果我們想從StreamAsyncWrite寫數據,事情就變得很有點棘手。首先Stream應該輸出位元組(&[u8]Vec<u8>),因為IO設備只能讀取位元組。

但更重要的是:目前沒有可用的copy_into組合器!但我們可以通過轉換StreamAsyncRead,然後調用copy_into來解決這個問題:

stream
.map(io::Result::Ok) // convert each `Vec<u8>` to `Result<Vec<u8>>`
.into_async_read() // convert the stream to `AsyncRead`
.copy_into(writer) // copy the data to the sink
.await?; // start the pipeline

目前這個代碼確實遭受了雙緩衝錯誤,這使得它的效率低於實際。但是,如果 copy_into 能使Stream工作可能會具有最好效果:

stream.copy_into(writer).await?;

處理錯誤

Node.js在引入流時犯的最大錯誤之一就是pipe不會轉發錯誤。幸運的是,在Rust流中,解決了這個問題,在於如何在構造函數中包裝流。這意味著流自動轉發錯誤,管道處理它們。

錯誤處理的唯一困難是錯誤類型需要排隊。在創建管道包含io::Error錯誤以外的錯誤的時,這可能特別棘手。但是生態系統仍處於年輕狀態,模式仍在不斷湧現,所以並非所有內容都是流模式的,這一點也不足為奇。

編寫編解碼器

解析器協議通常被分成編碼器和解碼器。編碼器將結構轉換為位元組序列。解碼器將位元組轉換為結構。這可以很容易地在Rust中建模:

/// The type were converting to and from.
pub struct MyFrame;

/// Convert frames to bytes.
pub struct Encode;
impl Encode {
/// Take a stream of frames, and return a stream of bytes.
pub fn new(stream: impl Stream<Item = MyFrame>) -> Self;
}
impl Stream for Encode {
type Item = Result<Vec<u8>, Error>;
}

/// Convert bytes to frames.
pub struct Decode;
impl Decode {
/// Take a stream of bytes, and return a stream of frames.
pub fn new(reader: impl AsyncRead) -> Self;
}
impl Stream for Decode {
type Item = Result<MyFrame, Error>;
}

存在專門的包,旨在幫助創建編解碼器。但實際上編解碼器主要是一種設計模式,編寫它們的最簡單方法是直接使用標準流特質。

注意:根據您的使用情況,您可能需要在編寫解碼器時執行一些內部緩衝。但所有需要的是一個好的(環)緩衝區抽象,crates.io上有各種各樣的。

使用組合器的AD-HOC流

有時您希望快速操作流的輸出。它是否過濾掉您不感興趣的結果,連接項目或快速計算。Streams組合器允許您以很少的開銷執行這些任務。

假設我們想從文件中讀取數據,並按換行分割。該lines 組合程序規定:

let mut sock = Socket::new("localhost:3000");
let (reader, _) = &mut sock.split();

// This is returns a stream of `String`
let lines = reader.lines().await?;

現在如果我們想要解析這些行serde怎麼辦?提示map 組合器:

let mut sock = Socket::new("localhost:3000");
let (reader, _) = &mut sock.split();

#[derive(Deserialize)]
struct Pet {
name: String,
}

// This returns a stream of `Result<Pet>`
let pet_stream = reader
.lines()
.map(|line| serde_json::parse::<Pet>(line));

另一個有趣的事實是,Vec<u8>實現AsyncReadAsyncWrite,這意味著如果你想連接一個流的所有值,就可以直接使用緩衝區。

可能會添加更多的組合器,以及需要探索的模式。但Rust流的核心部件感覺非常穩固,隨著生態系統的發展,可以添加更多的組合器。

為什麼我們不談論接收器(Sink)特質

驚喜!還有另一個你應該知道的特質。它的名字是Sink,而且它是很奇怪的一個。大聲說出來會令人困惑(我們在談論Sync還是Sink?),但特質本身就在那裡。看看定義:

pub trait Sink<Item> {
type SinkError;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Contex
) -> Poll<Result<(), Self::SinkError>>;
fn start_send(
self: Pin<&mut Self>,
item: Item
) -> Result<(), Self::SinkError>;
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Result<(), Self::SinkError>>;
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Result<(), Self::SinkError>>;
}

無論何時實現Sink您都需要實現4個方法,1個關聯類型和1個范型參數。哦,還有一個強制性的內部緩衝區。因為特質定義中的所有這些方法都是特定生命周期的鉤子。通過該周期移動數據的唯一方法是在內部臨時存儲數據,並在稍後再次產生它。

也許你已經理解了,但Sink並不簡單。它的存在理由是成為一個AsyncWrite類型的對應物。它通常將編寫器包裝在其構造函數中,然後將類型序列化到其中。

在表面上這可能聽起來很吸引人。但實際上沒有人敢在沒有crates.io的嚴格幫助的情況下寫下這個怪物。或許這種複雜性實際上是值得的?,這就引出了一個問題。答案越來越多似乎是一個響亮的「不」。

Sink不會帶來任何使用3個標準流特質無法更優雅或更少儀式地解決問題的東西。所以省去一些麻煩,不要打擾Sink

下一步是什麼

非同步迭代語法

目前可以對流進行非同步迭代,但使用它並不一定很好。大多數用戶領域迭代流使用while let Some循環完成

let mut listener = TcpListener::bind("127.0.0.1:8081")?;
let incoming = listener.incoming();
while let Some(conn) in incoming.await {
let conn = conn?;
/* handle connection */
}

如果我們可以將其寫為for await循環,那就更好了:

let mut listener = TcpListener::bind("127.0.0.1:8081")?;
for conn.await? in listener.incoming() {
/* handle connection */
}

目前還不清楚何時會發生這種情況。但這絕對值得期待!

非同步特質流

說到改進,流特質本身可以做一些工作。目前這些特質與Future特質非常相似:

pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8]
) -> Poll<io::Result<usize>>;
}

是什麼讓這個特別棘手的是:定義self: Pin<&mut Self>。這意味著這種方法只對Self被 固定的實例實現。我不想讓你厭倦為什麼這很棘手,但我想提一下,最近我一直聽到關於這些特質可能簡化的對話。

原則上,流特質沒有任何關於它們的非同步。他們非同步的唯一原因是因為他們返回Future,可能需要在內部等待其他Future。這很重要,因為trait一旦直接允許async,似乎可以顯著簡化特質。

pub trait AsyncRead {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

這將是特別好的,因為這將意味著AsyncReadAsyncWriteStream將被定義和STD的ReadWriteIterator 以相同的方式,具有唯一的區別是在方法前async關鍵字。

pub trait Read {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

但是,沒有什麼可以確定的。但我對這裡的可能性持謹慎樂觀態度。

使用yield的匿名流

談到我們如何定義流的改進,已經討論過的另一件事是為生成器添加語法。生成器可能會使用yield關鍵字,我們可以想像一個流本質上是一個Future的生成器。就像async/await允許我們跳過圍繞構建Future的樣板,yield也一樣對於流:

async fn keep_squaring(mut val: u64) -> yield u64 {
loop {
val *= 2;
yield val;
}
}

for val.await in keep_squaring(4) {
dbg!(val);
}

這個可能會更進一步,但似乎它有可能提供一些受歡迎的工作流程改進。

零拷貝讀寫

一個很好的特性是AsyncReadAsyncWrite通過poll_read_vectored和poll_write_vectored支持向量IO。這樣可以優化特定應用程序的性能。

在將來添加可能有用的類似方法是 (poll_read_vecpoll_write_vec在名稱不太混淆的情況下)。這些方法允許將緩衝區直接傳遞給方法,並使用mem::swap技巧,防止memcpy在每個操作上執行一個額外的操作。允許我們顯著提高某些API的性能,而無需根據需要修改最終用戶API。

這在包裝同步API時尤為重要(目前這意味著:幾乎每個文件系統操作)。但更重要的是:與直接使用OS API相比,它將允許我們刪除Rust目前基於Future IO的額外開銷。

結論

在這篇文章中,我們討論了Rust的各種非同步流,討論了常見的模式和陷阱,並展望了流的未來。

Rust流的未來非常令人興奮!如果我們能夠將管道流的人體工程學結合在一起,憑藉Rust的可靠性保證,我們將更接近使Rust成為腳本語言傳統空間的絕佳選擇。

我們希望您喜歡閱讀 Rust流! - 祝你有個美好的一周!

感謝Irina Shestak,Nemo157,David Barsky,Stjepan Glavina和Hugh Kennedy閱讀並提供有關此帖子的多次迭代的反饋,想法和意見!

推薦閱讀:

相关文章