Rust 1.36がリリースされ、皆さん待望のFuture
関連のAPIが安定化されました。
この記事ではFuture
関連APIを巡る過去と未来を紹介します。
Q&A(TL; DR)
長くなってしまったので要約をまとめました。 細かい話は目次以降をご覧ください。
Futureは何のために必要?
Future
は非同期処理を抽象化したもので、Rustにおけるあらゆる非同期処理はFuture
の上に成り立ちます。
処理をFuture
によって隠蔽することで、
ただ非同期に処理するだけでなく処理のパイプライン化、
すなわち単一のスレッドで複数の処理を実行出来るようになります。
futuresクレートではだめなの?
async/await構文を入れるために言語コアに取り込まれました。
futures 0.1と合わせて使える?
futures 0.3には互換性レイヤーがあり、相互に変換出来るため問題ありません。
tokioなどは対応している?
残念ながらtokioの対応版は執筆時点ではリリースされていませんが、着々と進行中のようです。 ネットワーク限定で良ければromioというクレートもあります。 hyperなど、その他のクレートはtokioが対応次第順次アップデートされると思われます。
Futureとは?
Rust 1.36から導入された、非同期処理を抽象化したクラスです。 単体では「非同期処理が完了したかどうかを問い合わせる関数」しかなく、 非同期処理の結果から処理を継続するようなプログラムを書くには別途futuresクレートなどが必要です。
これはlibcoreで実装するための制約で、その代わりに#![no_std]
環境でも使うことが出来ます。
つまり、組み込み環境でも非同期処理が書けると言うことです
*1。
非同期処理の何が嬉しいのか?
例えばチャットサーバーを作ろうとすると、今までのRust標準ではクライアントごとにスレッドを立て、 その中でソケット通信を実行するのがオーソドックスでした。 このアーキテクチャは至極シンプルでとても作りやすいものですが、 メモリ消費のオーバーヘッドやOSの制限などによりクライアントが大量になってくると限界を迎え、 処理が進まなくなる、あるいはプログラムごと落ちる可能性があります。 いわゆるC10K問題です。
チャットというのはメッセージを送る以外は処理を行わないため、各スレッドは基本的に待機しているものです。
であればその待機する時間を有効に使い、スレッド数を減らしてしまおうというのが非同期処理(とりわけ非同期I/O)の要です。
Unix系であればselect(2)
やepoll(2)
系*2の
システムコールを使うことで、ソケットを始めとした複数のI/O処理を一度に実行出来るのです。
非同期処理する関数を自分で扱うと処理が複雑化してしまうため、
現代のプログラミング言語の多くではこれを抽象化してFuture
やPromise
として提供しています。
また、この抽象化によって全く異なる処理(例えばチャットと音声通話の非同期実行など)もパイプライン化して
非同期に実行することが出来るようになります。
Rustでも以前からfuturesクレートによってFuture
が使えましたが、
所有権や使い勝手の問題からRust標準に取り入れ、より書きやすく、よりハイパフォーマンスな処理を実現しようとしています。
futuresクレートから取り込まれた理由
1.36より前のRustでは一般的に、futuresクレート(バージョン0.1)により提供されていたFuture
を使用していました。
ライブラリとしては必要十分な機能が提供されていましたが、借用と相性が良くありませんでした。
これは言語仕様上どうしようもなく、かつメソッドチェーンによってコードの見通しが悪くなると言う問題もありました。
これを多くの言語でも導入されているasync/await構文によって解消する過程で、
Future
が外部ライブラリにあると言語機能として参照できないため言語ライブラリとして取り入れることになりました。
過去の経緯については@qnighyさんの下記ツイートスレッドに詳しいため、そちらもご参考にどうぞ。
まず抽象非同期ライブラリのfuturesは2段階の破壊的なアップデートを控えている。
— Masaki Hara (@qnighy) 2018年5月31日
0.2は0.1内では適用できなかった細かい破壊的改良が盛り込まれている。(リリース済み)
0.3はより大きな変更になる予定で、async fn対応のためにコア部分が標準ライブラリに移され、APIにも重要な変更がある。
Futureの定義
Future
の定義の肝を抜粋します。
pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } pub enum Poll<T> { Ready(T), Pending, } pub struct Context<'a> { /* fields omitted */ } impl<'a> Context<'a> pub fn from_waker(waker: &'a Waker) -> Context<'a>; pub fn waker(&self) -> &'a Waker; } pub struct Waker { /* fields omitted */ } impl Waker { pub fn wake(self); pub fn wake_by_ref(&self); pub fn will_wake(&self, other: &Waker) -> bool; pub unsafe fn from_raw(waker: RawWaker) -> Waker; }
Future
には関連型としてOutput
が定義されています。
これはIterator
のItem
と同じ様なもので、そのFuture
の戻り値の型が入ります。
Future::poll
はコンテキストを引数に取り、処理の結果を取り出します。
この時、処理が進行中であればPoll::Pending
が返り、処理が完了していれば値と共にPoll::Ready
が返ります。
Future::poll
の定義を見ると、self
が&mut self
ではなくself: Pin<&mut self>
になっていることに気付きます。
これは自己参照構造体を実現するための機能で、async関数を実現するために必要なものです。
async関数はawaitの前後で変数の借用を持ち越す必要があり、そのためには自己参照構造体が必要になるのです。
詳しい話は下記記事を参照してください。なお、少し古い記事のため、定義が多少異なっていることに注意してください。
次にFuture::poll
の引数の&mut Context<'_>
ですが、これはFuture
を実行するランタイムが用意するコンテキストです。
実行ランタイムとは複数のFuture
を1つのスレッドでまとめてパイプライン実行するモジュールのことで、
実行すべきFuture
(未実行のFuture
など)を選別して実行し、実行すべきFuture
がない場合はスレッドを停止させます。
この時、Future
の処理が完了して停止したスレッドを再開させてFuture
の処理を継続する場合にコンテキストを使用します。
futuresクレートとの組み合わせ
Future
は単体では使いません。
futuresクレートと組み合わせることで各種処理を記述出来るようになります。
futuresクレートはFutureExt
トレイトとTryFuture
トレイト、及びTryFutureExt
トレイトを提供しています。
FutureExt
はFuture
を便利に使うためのトレイトで、
専ら非同期処理の結果を受け取り、その値を加工するための機能などを提供します。
例えばFutureExt::map
は処理済みの値を加工するためのメソッドで、Iterator::map
に近しいです。
FutureExt::then
はFutureExt::map
のようなものですが、戻り値としてFuture
を返すことが出来ます。
これにより、非同期処理の結果を使って非同期処理を実行することが可能です。
fn plus_1<F: Future<Output=u32>>(f: F) -> impl Future<Output=u32> { f.map(|v| v + 1) }
TryFuture
はResult
を返すFuture
への特化したトレイトで、Future<Output=Result<T, E>>
に自動的に実装されます。
TryFuture::Ok
にT
が関連付けられ、TryFuture::Error
にE
が関連付けられます。
このトレイト自体は何ら機能を持たず、TryFutureExt
と組み合わせて使います。
TryFutureExt
はTryFuture
を便利に使うためのトレイトで、
Future
の処理が成功した場合のみ処理を続行する機能や、エラー型をInto
トレイトによって変換する機能などを提供します。
Result
に実装されているメソッドの一部が(場合によっては別名で)TryFutureExt
にも実装されています。
例えばTryFutureExt::map_ok
はFuture
がOk
を返した場合のみ実行される、値を加工するためのメソッドです。
TryFutureExt::and_then
はTryFutureExt::map_ok
のようなものですが、戻り値としてFuture
を返すことが出来ます。
fn plus_1<E, F: TryFuture<Ok=u32, Error=E>>(f: F) -> impl TryFuture<Ok=u32, Error=E> { f.map_ok(|v| v + 1) }
futures 0.1との違い
以前のRustでfuturesクレートを使っていた場合、少し定義が異なるため混乱するかも知れません。 以前は下記のような定義でした。
pub enum Async<T> { Ready(T), NotReady, } type Poll<T, E> = Result<Async<T>, E>; pub trait Future { type Item; type Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error>; }
主な違いは下記の3つです。
- 戻り値が
Result
だった:
以前のFuture
では戻り値がResult
限定であり、失敗しない関数を書くには少し不便でした self
が&mut self
だった:
futuresクレートの公開当初はPin
がなかったこともあり、self
は&mut self
によって定義されていました- 引数にコンテキストがない:
コンテキストは暗黙的に定義され、別途関数で取得していました
Stream
もFuture
に合わせてResult
を返さないものに変更されており、
TryFuture
に対するTryStream
が追加されています。
他にもfutures::executor
を始めとした多くのモジュールへの変更もあります。
futures 0.1との互換性
Rust 1.36が浸透するまでのしばらくの間はfutures 0.1も並行してして使われるものと思われます。
ただし、futures 0.1までのFuture
とRust 1.36のFuture
は全く別の型であり、互換性はありません。
それらに互換性を持たせるためにfutures 0.3には互換性レイヤーが導入されており、
futures 0.1のFuture
とRust 1.36のFuture
を相互に変換して使うことが出来るようになっています。
詳細は下記ブログを参照してください。
tokioなどのクレートの対応状況
肝心要のファイル操作やネットワーク通信といった非同期I/Oに関してはtokioを使います。 ただし、執筆時点ではまだfutures 0.1からの移行中のようです。
なおRomioというクレートもあり、これがあればネットワークだけは非同期I/Oを使用することが出来ます。
非同期I/Oに関してはもうしばらく待つ必要がありそうです。
その他、いくつかのクレートも進行中のようです。
- hyper: https://github.com/hyperium/hyper/pull/1836
- tower: https://github.com/tower-rs/tower/pull/295
ちょっと先の未来
ちょっと先の9/27(金)にリリースされるRust 1.38で安定化予定のasync/await構文を見てみましょう。
async fn get() -> u32 { 0 } async fn get_and_plus_1() -> u32 { get().await + 1 }
関数をasync fn
で宣言すると戻り値がimpl Future<Output=T>
として扱われます。
そしてasync fn
の中ではfuture.await
によってFuture
を待つことが出来ます。
フィールドアクセスのように見えますが、この構文によって、
いくつかの言語で採用されているようなawait future
の形よりもメソッドチェーンが読みやすくなります。
async fn hoge() -> u32 { foo().await.bar().await.baz().await // vs await (await (await foo()).bar()).baz() }
また、Runtimeというクレートを使うとmain
関数からasync fn
化することが出来ます。
とても読みやすい非同期コードですね!
#![feature(async_await)] use runtime::net::UdpSocket; #[runtime::main] async fn main() -> std::io::Result<()> { let mut socket = UdpSocket::bind("127.0.0.1:8080")?; let mut buf = vec![0u8; 1024]; println!("Listening on {}", socket.local_addr()?); loop { let (recv, peer) = socket.recv_from(&mut buf).await?; let sent = socket.send_to(&buf[..recv], &peer).await?; println!("Sent {} out of {} bytes to {}", sent, recv, peer); } }https://github.com/rustasync/runtime#examplesより
はじまりのFuture
実はRustのリリース当初、具体的には1.3まではNightlyにFuture
APIがありました。
しかし、このAPIは現在とは全く異なったAPIで、トレイトではなく構造体で、
処理が完了したことも観測できない、使い勝手の悪いものでした。
さいごに
Rust標準にFuture
が入った結果、既存のクレートでも多くの非互換な変更が引き起こされました。
幸い互換性レイヤーがあるため大きな混乱はないと思われますが、移行には大きな苦しみが伴いそうです。
オプティムのプロジェクトにもfutures/tokioにべったり依存しているものがありますが、移行にはしばらく掛かりそうです・・・。
どうせ移行するなら1.38を待ってasync/awaitにリファクタリングするのも良いかも知れませんね。
オプティムでは未来志向のエンジニアを募集しています。