こんにちは、R&Dチームの齋藤(@aznhe21)です。
さあみなさん、ついにこの時がやってまいりました。 本日2019/11/8にリリースされたRust 1.39により、あらゆる環境で最高速な非同期プログラミングが可能になりました。 新たな時代に乗り遅れないよう、今のうちにRustでの非同期プログラミングをマスターしておきましょう。
なお、この記事は、先日開催したOPTiM TECH BLOG Meetupの内容を大幅に加筆修正した上でエントリに仕上げたものです。
- まず最初に伝えたいこと
- 非同期の歴史
- Rustの非同期プログラミングの歴史
- Rustの非同期プログラミングの特徴
- Rustの非同期プログラミングで出来るようになること
- Rustの非同期の書き方
- tokioのサンプルで解説
- asyncで使えるクレートたち
- さいごに
- 謝辞
- ライセンス表記
まず最初に伝えたいこと
Rust 1.39から使えるようになったasync/awaitですが、この「async」の読み方が分かるでしょうか? 日本ではよく「アシンク」と呼ばれることの多いこの単語ですが、英語圏では「エーシンク」と読まれるようです。
実際、YouTubeでasyncを検索すると、坂本龍一の楽曲*1と、 C#やTypeScriptなどでのasync/awaitの解説動画などが出てきます。 この解説動画を見てみると、確かに「エーシンク」と読んでいることが分かります。
多少違和感はあるでしょうが、「idea」が「イデア」ではなく「アイデア」になることを考えると 抵抗も少なくなるのではないでしょうか。
なお、「await」の読み方は「アウェイト」です。英語滅びろ
追記:下記ツイートで指摘を受けて確認したところ、asyncはギリシャ語由来、awaitはラテン語由来だそうです。 (´・ω・`)はあまじはあ
- asyncとawaitの由来を調べたところ,asyncとawaitのaは関係なかったことがわかりました,英語はクソ - ncaq
- await って言う単語 | ++C++; // 未確認飛行 C ブログ
しょうもない小ネタ披露しておくと
— yuki (@helloyuki_) 2019年11月9日
asyncのa-はたしかギリシャ語由来で、notくらいの意味、
awaitのaはad-でラテン語由来だったと記憶しています。ラテン語のad-はtowordだったかな
非同期の歴史
まず、Rustの非同期について説明する前に、 なぜプログラミング言語に非同期サポートが必要なのかをご説明しましょう。
2000年代後半から顕在化し始めたC10K問題により、1つのタスクに1つのスレッドやプロセスを割り当てる方式に限界が見えてきました。 この問題が発覚する以前は、例えばApacheであれば1つのリクエストに1つのプロセスが割り当てられ、 更にCGIによって1つのプロセスが割り当てられていました。 スレッドやプロセスを起動するには数KB〜数MBのメモリが必要なため、同時に数千〜数万のアクセスがあると 数十GBものメモリが必要になり、リソースが枯渇してしまうのです。
この問題を回避するため、nginxやNode.jsが誕生しました。 nginxは1つのリクエストに対して1つのプロセスを割り当てるのではなく、 スレッドプールとイベント駆動モデルを駆使し、スレッド数・プロセス数は固定のまま、 多くの処理を捌けるようにしたのです。
これらを図にまとめると下記のようになります。 それぞれのタスクでは待機時間が多くを占めており、 Apacheにおいては待機時間のために無駄にスレッドが生存していることが分かります。 逆に、nginxは待機時間中に他のタスクを実行することで、 1つのスレッドで3つのタスクを同時に実行できることが分かります。 このパイプライン化により、非同期プログラムは効率的な処理を実現するのです。
もちろん、これはいいことずくめではありません。 タスクの中の「待機時間」を他のタスクから使えるようにするためには どこが待機時間なのかを判別する必要がありますし、例え判別できたとしても、 他のタスクに「実行権限を移譲」する必要があります。 これは、従来のプログラミング言語の機能だけでは難しく、 実際、Node.jsの中身は大変複雑に実装されています。
ここで、様々な言語を経て洗練され、C#によって広く知れ渡ったasync/await構文が重要になってきます。 async/await構文によって「実行権限を移譲」する機能が簡単に実現されるのです。 この構文により非同期プログラミングが格段に楽になり、今日のハイパフォーマンスなサーバーサイドアプリケーションを支えているのです。
ちなみに、Go言語は「実行権限の移譲」を重厚なライブラリによって隠蔽しており、 「古い」プログラムのように書いても「今風の」プログラムのように実行されるようになっています *2。 この「あたかもOSのスレッドのように動作するが、実際は内部でうまく協調動作している」というものを グリーンスレッドと呼びます。 これはとても素晴らしい仕様のようにも思えますが、ライブラリが重厚になってしまうことを避けられず、 システムプログラム言語であるRustとは思想が合わないため実装されていません*3。
Rustの非同期プログラミングの歴史
Rustの非同期プログラミングの歴史のうち、重要な部分をまとめました。
Rust 1.0以前
前章で書いたグリーンスレッドは、Rustの正式リリース前のバージョンである〜0.9の時代には標準で含まれていました。 しかし、これは各種オーバーヘッドを鑑み、Rustが正式バージョンになる前に削除されています。
Rust 1.0
時期:2015/5/15
Rustの正式版がリリースされました。
〜Rust 1.3
時期:〜2015/9/17
ほとんど余談ですが、Rustには1.3までFuture
という名前の構造体が(未安定化の状態で)存在していました
*4。
これはC++のstd::future
とC#のSystem.Threading.Tasks.Task
を
足して2で割ったようなインターフェースで、
「タスクの並列実行」と「タスクの結果をブロックして受け取る」という2つの主な機能から成り立っていました。
このFuture
はnightly限定のまま、Rust 1.3で削除されています。
Rust 1.2あたり
時期:2015/8/7あたり
Rust標準から削除されたグリーンスレッドを非標準で実現するクレートである context-rs / rotor / mioco が登場し始めます。
これらのクレートはグリーンスレッドを実現するもので、グリーンスレッドであるためアプリケーションとしては楽に書けますが、 各タスクにスタックを確保する必要があったり、コンテキストスイッチのためにプラットフォームごとにアセンブリコードが必要になるなど、 Rustの特性に合わないものでした。
これらのクレートは、後に登場するクレートたちによって駆逐されていきます。
Rust 1.11あたり
時期:2016/8/18あたり
futures/tokioクレートが登場し始めます。
futuresクレートによって非同期プログラムのゼロコスト抽象化が実現されました。 これはスタックの確保もコンテキストスイッチも不要なため、 マイコンのような環境でもマルチタスクの実行が可能になり、 Rustの思想にあった非同期プログラミングが実現されました。
ただし、これはメソッドチェーンで処理を繋げる必要があり、 所有権も相まって実に使い勝手の悪いものでした。
Rust 1.26あたり
時期:2018/5/10あたり
futures 0.2が(一瞬だけ)登場しました。
futures 0.1にはコンテキストが暗黙的だったり、トレイトの定義が大きすぎるなどといった問題があったため、 それらを解決するためにfutures 0.2が登場しました。 ただし、これは実験目的で、数ヵ月以内に0.3を出すつもりだったもので、 その状態のものをcrates.io/docs.rsに公開すると利用者が混乱する*5ため futures 0.2は取り下げられ、futures-preview 0.2として再度公開されました。
また、その後Future
トレイトが標準ライブラリ入りする話も相まって、0.2から更にAPIが変更された0.3が出されることとなりました。
Rust 1.36
時期:2019/7/4
紆余曲折を経てようやく標準ライブラリとしてのFuture
が安定化されました。
とは言えFutureを使うためのデファクトスタンダードライブラリであるfuturesクレートは正式版が出ず、 あくまで「準備が整っただけ」の状態です。
Rust 1.39
時期:2019/11/7
async/await構文がついに安定化されました!
この構文により非同期プログラミングが書きやすくなるだけではなく、 これまでは所有権からどうしてもコストが高い書き方をせざるを得なかった部分も直感的かつ低コストに実現できるようになります。
実にRust 1.0のリリース(2015/5/15)から4年半越しに、 快適な非同期プログラミングが実現されることになります。 長かった・・・!
Rustの非同期プログラミングの特徴
ここまではRustに非同期プログラミングが実装されるまでの経緯を紹介しました。 この章では、Rustの非同期プログラミングがどのような特徴を持っているのかをご紹介します。
ゼロコスト抽象化
これはRust自体の設計思想でもあるのですが、Rustの非同期プログラミングは他の言語とは違ってゼロコスト抽象化を謳い、 軽量かつ高速な動作を実現しています。
Rustで非同期なコードを書くとトレイトの仕組みによって関数呼び出した静的に解決されます。 これを静的ディスパッチと呼び、この仕組みがRustの高速性に寄与しています。
また、静的ディスパッチはヒープを使ったメモリ確保を必要としないためメモリ消費を抑えられるだけでなく、 メモリの確保が出来ないプラットフォームでの実行も可能にします。 これは次に説明する「プラットフォーム非依存」にも繋がってきます。
プラットフォーム非依存
Rustの非同期プログラミングはあらゆるプラットフォームで動くように設計されています。 一般的によく使われるWebサーバーやGUIアプリケーションはもちろん、 WebAssemblyでも薄いランタイムライブラリだけで動くようになっていますし、 果ては組み込み(ベアメタル)環境でも動くようになっているのです。
これは前述の静的ディスパッチだけでなくタスクの停止・再開など、非同期プログラミングに関する機能すべてがそのようになっており、 開発者が独自にタスクの制御が出来るようになっています。
これまでは非同期プログラミングがサポートされていなかった環境で、 同期的な記述を工夫して書いて非同期プログラミングを実現していたところも、 これからは直感的に書けるようになるのです。
Rustの非同期プログラミングで出来るようになること
Rustの非同期プログラミングの特徴が分かったところで、実際にどのような分野に応用できるのかを、妄想込みでお送りします。
組み込み
Rustの非同期プログラミングはランタイムライブラリを必要としないため、 組み込み分野においても使用されることが期待できます。
例えば、2つボタンがあり、それぞれボタンが押されたらI2Cで温度センサから値を読み込みんでシリアルに出力する、 というコードは下記のようになるでしょう(想像)。
async fn print_temp(bus: u8) -> impl Future<Output = ()> { let mut bus = i2c::Bus::new(bus); bus.write(reg); let temp = bus.read().await; println!("{}", temp); } fn main() { let task1 = async { let button = interrupt::enable(interrupt::buttons[0]); loop { // ボタン0が押されるのを待つ button.next().await; // センサから値を読み取ってシリアルに出力 print_temp(0xE0).await; } }; let task2 = async { let button = interrupt::enable(interrupt::buttons[1]); loop { // ボタン1が押されるのを待つ button.next().await; // センサから値を読み取ってシリアルに出力 print_temp(0xE1).await; } }; task::run(task1.join(task2)); }
interrupt::enable
は割り込みテーブルにポートを登録し、wait_interrupt
によって割り込みを待つという動作を想定しています。
また、i2c::Bus
はI2Cのバスを表しており、write
で値の送信、read
で値の受信をする想定です。
このコードは2つのタスクを同時に動かしています。 つまり、マイコンでマルチタスクが使えているのです(想像上の話です)。 さらに、このマルチタスクはプリエンプティブではないためスタックを確保する必要もなく、 メモリ消費量が決定的になり、メモリ管理も不要になるのです。
OS
RustはOSも作れる言語であり、実際OSを作るプロジェクトがいくつか存在します。
デバイスドライバを非同期で書くなど、カーネルでも非同期プログラミングが役に立つ可能性があります。
ゲーム
RustはAmethystやPiston、ggezを始めとしたゲームエンジンが複数あり、2D/3Dゲーム開発にも使われています。
ゲームの世界でも非同期な処理は行われ、 例えばテクスチャの遅延読み込み(テクスチャストリーミング)や次シーンの先読み、 あるいはネットワーク経由での世界ランキングの取得などがあります。 その多くはゲームエンジンやライブラリにより隠蔽されますが、どうしても隠蔽できない部分は出てくるものです。
現在の主なゲームエンジンであるUnityとUnreal Engine 4の記述言語はC#とC++です。 C#には非同期プログラミングの機能が備わっていますが、その機能がUnityで使えるようになったのはごく最近*6ですし、 C++での非同期プログラミングは地獄です。 それらをRustで書けるとしたらどうでしょう?
例えば、Unityで非同期処理をすることを考えてみましょう。
Unityには非同期プログラミングを書くための機能StartCoroutine
が備わっています。
これは当時採用していたランタイムの制約上、関数を中断する機能を流用したものであり、
非同期関数から戻り値を直接返せないという非常に使い勝手の悪いものです。
private void Start() { StartCoroutine(LoadHtml()); } private IEnumerator LoadHtml() { Debug.Log("Start LoadHtml"); using (WWW www = new WWW("http://example.com/")) { yield return www; Process(www.text); } Debug.Log("Done LoadHtml"); }
これをRustで書き直すとこのようになるでしょう(想像)。
fn start() { unity::task::spawn(async { unity::debug::log("start load_html"); let text = unity::www::new("http://example.com/").get().await; process(text); unity::debug::log("done load_html"); }); }
非同期以外でもメリットはあります。 Rustで書けば自ずとクラッシュしにくいコードが書けますし、自然に書いてもC#やC++よりもメモリ消費が減るでしょう。 となると、デバッグコストが大幅に減るのではないでしょうか? *7
Webアプリ(サーバー)
async/awaitの恩恵を最大限に受けるのはサーバーアプリでしょう。
Rustにはactix-webやgothamなど、複数のWebフレームワークがあります。 これらを用いることで、サーバーのリソースをフルに使うことの出来る、大変高速なWebアプリケーションが作れます。
特にactix-webはベンチマークサイトでは最高スコアを記録し続けているWebフレームワークです。
今まではメソッドチェーンでコードが分かりにくくなっていましたが、 これからはasync/awaitで直感的かつ最高速のWebアプリを作ることが出来るのです。
Webアプリ(クライアント)
強力なWebAssemblyサポートによって、クライアントでもRustを使うことが出来ます。
wasm-bindgen-futuresによって、非同期関数をそのままJavaScriptから呼ぶことが出来るのです。
#[wasm_bindgen] pub async fn run() -> Result<JsValue, Jsvalue> { let mut opts = RequestInit::new(); opts.method("GET"); opts.mode(RequestMode::Cors); let request = Request::new_with_str_and_init( "https://example.com/", &opts, )?; let window = web_sys::window().unwrap(); let resp_value = JsFuture::from(window.fetch_with_request(&request)).await?; assert!(resp_value.is_instance_of::<Response>()); let resp: Response = resp_value.dyn_into().unwrap(); let text = JsFuture::from(resp.text()).await?; Ok(text) }
サーバーサイドと合わせて最速のWebアプリを作ることも夢ではありません。
Rustの非同期の書き方
ここからは、Rustでの非同期プログラミングの書き方をご紹介します。
これまでとこれから
Rust 1.39から使えるようになったasync/awaitを紹介する前に、まずは以前の書き方と比べてそのありがたさを噛み締めましょう。
0〜9までの数値を1秒毎に表示するシンプルなプログラムを考えます。
これまでのRustでの書き方 with futures 0.1
やっていることはシンプルなのに、めちゃくちゃに長いです。 こんなコードはもう見たくもありません。
// [dependencies] // tokio = "0.1" // tokio-timer = "0.2" use std::time::Duration; use tokio::prelude::*; fn print_1_10() -> impl Future<Item = (), Error = ()> { future::loop_fn(0, |i| { println!("{}", i); tokio_timer::sleep(Duration::from_secs(1)) .map(move |()| { if i < 9 { // iが9未満ならloop_fnにfuture::Loop::Continueを返し、処理を続行する future::Loop::Continue(i + 1) } else { // iが10以上ならfuture::Loop::Breakを返し、処理を中断する future::Loop::Break(()) } }) .map_err(|e| Err(e).unwrap()) }) } fn main() { tokio::run(print_1_10()); }
Rust 1.39〜
ちょっとしたおまじないはあるものの、ごくごく自然な書き方になりました。 Rustが言語としてasync/awaitをサポートする意義が分かると思います。
// [dependencies] // tokio = { version = "0.2", features = ["macros", "time"] } // or // tokio = { version = "0.2", features = ["full"] } use std::time::Duration; #[tokio::main] async fn main() { for i in 0..10 { println!("{}", i); tokio::time::delay_for(Duration::from_secs(1)).await; } }
Futureとは
async/awaitの書き方を解説する前に、ここまで時々登場していたFuture
トレイトを説明しておきましょう。
Future
トレイトとは「そのうち完了するタスク」を抽象化したトレイトで、
Future::poll
メソッドによって「タスクが完了した」あるいは「タスクが実行中」という状態を返します。
定義としてはこのようになっていますが、この辺りを直接使うことはまずないでしょう。
pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } pub enum Poll<T> { Ready(T), Pending, }
中身について詳しく知りたい方はFutureの解説記事をご覧ください。
async/awaitの書き方
非同期の基礎が分かったところで、非同期プログラムの書き方を紹介します。
非同期関数
前述したサンプルにおいて、Rust 1.39以降の書き方では関数にasync
という単語を付けています。
これによってその関数は「非同期関数」となり、Future
(を実装した型)を返すようになります。
// このように書いたコードが... async fn async_function() -> i32 { 0 } // コンパイラ内部でこのように変換される fn async_function() -> impl Future<Output = i32> { UnknownObject::new() }
このとき、async_function
関数が返す型(上記ではUnknownObject
)はコンパイラによって暗黙に生成されます。
この型をコードから直接参照することは出来ませんが、Future
トレイトを実装しているためimpl Trait
を使って返すことが出来ます。
これはFn
などのトレイトを実装した暗黙型として扱われるクロージャと似ています。
// 非同期関数はFutureトレイトを実装した「よく分からない型」を返す async fn async_function() -> impl Future<Output = i32> { delay_for(Duration::from_secs(1)).await; 0 } // クロージャはFnなどのトレイトを実装した「よく分からない型」となる fn return_closure() -> impl Fn() -> i32 { || 0 }
非同期ブロック
ブロックにasync
キーワードを付けるとそのブロックは「非同期ブロック」となり、
非同期関数と同じようにFuture
トレイトを実装した「よく分からない型」を返すようになります。
また、ブロックにmove
キーワードを付けると、クロージャーと同じように所有権を奪うブロックになります。
fn main() { // async_blockはasync_functionと同じように展開される // これもまたFutureトレイトを実装した「よく分からない型」となる // let async_block = UnknownObject::new(); let async_block = async { delay_for(Duration::from_secs(1)).await; 0 }; let s = "hoge".to_string(); let move_block = async move { // このブロックはsの所有権を奪う println!("{}", s); }; // ブロックが所有権を奪うのでsは使えなくなる // println!("{}", s); }
非同期クロージャー
関数やブロックに非同期版があるのだから、クロージャーにもasync
を付けた非同期版があるように思えます。
残念ながらこれを書こうとするとコンパイラーに怒られます。
fn main() { let _ = async || {}; }
error[E0658]: async closures are unstable --> src/main.rs:2:13 | 2 | let _ = async || {}; | ^^^^^ | = note: for more information, see https://github.com/rust-lang/rust/issues/62290
いくつかの問題があり、無効化されているようです。
これは普通のクロージャーから非同期ブロックを返すことで回避できます。
let closure = || async {}; closure().await; // moveするときはasyncブロックにmoveを付けるだけでOK let closure_move = || async move {}; closure_move().await;
非同期タスクの待ち方
ここからは、非同期関数や非同期ブロックなど、非同期に動作するタスクを「非同期タスク」と呼ぶことにします。
async
を付けた関数やブロックの中ではawait
キーワードが使えるようになり、非同期タスクを待機することが出来るようになります。
await
の使い方はFuture
トレイトを実装する値に.await
を付けるだけです。
async fn await_test() -> i32 { let b = async { 0 }; // 非同期タスクbを待機し、その結果が関数の戻り値となる b.await }
フィールド参照のようにも見えて気持ち悪いと思う方もいるかもしれません。
これはメソッドチェーンや?
演算子を組み合わせたときの読みやすさを考えてこうなっています。
例えば、JavaScriptでfetch APIを使った時を考えます。
async function get() { return await (await fetch("http://example.com/")).text(); }
これをRustに置き換えるとこのようになります。
async fn get() -> String { fetch("http://example.com/")?.await.text()?.await }
ね、読みやすいでしょう?
ランタイムで非同期タスクを起動する
ここまでで非同期関数や非同期ブロックで非同期タスクを作る方法を紹介しましたが、このままではタスクを実行することは出来ません。
例えば下記のコードを実行しても、表示されるのは一番下の"piyo"
のみです。
async fn hoge() { println!("hoge"); } fn main() { let _ = hoge(); let _ = async { println!("fuga"); }; println!("piyo"); }
「Futureトレイトとは『そのうち完了するタスク』を抽象化したトレイト」と書いたように、 非同期タスクは正しく起動して実行を監視しなければ「タスクを完了」させることが出来ません。 非同期タスクを起動するためにはランタイムを用いる必要があります。
ランタイムとは「非同期タスクを実行するエンジン」であり、Rustでの非同期プログラミングには欠かせないものです。 ランタイムは標準ライブラリには含まれていないため、何らかのクレートを使用する必要があります。
例えばtokioクレートをランタイムとして使用する場合、このようになります。
fn main() -> Result<(), Box<dyn std::error::Error>> { let mut rt = tokio::runtime::Runtime::new()?; // taskは非同期タスク let task = async { tokio::time::delay_for(std::time::Duration::from_secs(1)).await; println!("hoge"); Ok(()) }; // プログラムが実行されてから1秒後に"hoge"が表示される rt.block_on(task) }
実行モデル
ランタイムによって非同期タスクは効率的に実行されるようになりますが、 どのようなランタイムを使うかによって非同期タスクが実行される方法が異なります。 非同期タスクの実行モデルを紹介しましょう。
1:1モデル
1つの非同期タスクを1つのOSスレッドに割り当てるランタイム。
非同期プログラミングの利点は全くありません。 実装としては「待機すべきときに待機する」だけで良いので、とてもシンプルですし、リソース消費もありません。
プログラムのエントリーポイント(main
関数)では1:1モデルで起動し、のちに別のモデルに移行することが多いです。
M:1モデル
複数の非同期タスクを1つのOSスレッドに割り当てるランタイム。
リソースを制限しつつ非同期タスクを実行する際に有用です。
M:Nモデル(スレッドプール)
複数の非同期タスクを複数のOSスレッドに割り当てるランタイム。
細かいことを言うとワークスティーリング型もスレッドプールですが、ここでは便宜上こちらをスレッドプールと呼びます。
CPUリソースを効率的に使用することが出来る実行モデルです。 スレッドごとにキューを持ち、タスクを起動する際には最も数の少ないキューにタスクを入れるイメージです。
長期間タスク待機(.await
)をしないタスクがいた場合、他のタスクの実行を妨げる可能性があります。
M:Nモデル(ワークスティーリング)
複数の非同期タスクを複数のOSスレッドに割り当てるランタイム。
CPUリソースをフルに使うことが出来る実行モデルです。 プログラム全体でキューを持ち、各スレッドでタスクが終わるごとにキューからタスクを取り出してタスクを実行するイメージです。
その性質上、タスク待機(.await
)の前後で実行されるスレッドが異なる場合があります。
様々なランタイム
前述したサンプルではtokioを用いましたが、他にもいくつかのランタイムがあります。 これらは現時点で利用可能なものであって、将来的に消滅、あるいは新たに誕生する可能性があります。
tokio
ワークスティーリングのM:Nモデルで、効率的にタスクを実行できるランタイムです。 tokio-timerなど、いくつかの機能はtokioランタイムの上でしか動きません。
属性によるランタイムの起動の他、コードからも起動が出来ます。
// [dependencies] // tokio = { version = "0.2", features = ["macros"] } // or // tokio = { version = "0.2", features = ["full"] } // 属性による起動 #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { tokio::spawn(async { // do something }); // do something Ok(()) }
// コードからの起動 fn main() -> Result<(), Box<dyn std::error::Error>> { let mut rt = tokio::runtime::Runtime::new()?; rt.block_on(async { tokio::spawn(async { // do something }); // do something Ok(()) }) }
async-std
async_std::task::block_on
は1:1モデル、
async_std::task::spawn
はワークスティーリングのM:Nモデルです。
属性によるランタイムの起動の他、コードからも起動が出来ます。
// [dependencies] // async-std = { version = "1.X", features = ["attributes"] } // 属性による起動 #[async_std::main] async fn main() { async_std::task::spawn(async { // do something }); // do something }
// コードからの起動 fn main() { async_std::task::block_on(async { async_std::task::spawn(async { // do something }); // do something }); }
futures::executer
futures::executor::block_on
は1:1モデル、
futures::executor::LocalPool
はM:1モデル、
futures::executor::ThreadPool
はスレッドプールのM:Nモデルです。
属性は用意されておらず、コードからのみ起動出来ます。
fn main() { let pool = futures::executor::ThreadPool::new().unwrap(); pool.spawn_ok(async { // do something }); // do something }
wasm-bindgen
WebAssemblyとRustの架け橋となるwasm-bindgen(の一部であるwasm-bindgen-futures)もまた、ランタイムの機能を持っています。
これはJavaScriptのPromise.then
(と、有効化されていればAtomics.waitAsync
)によって実装されており、
M:1モデルで実行されます。
fn spawn_test() { wasm_bindgen_futures::spawn_local(async { // do something }); }
非同期タスクと組み合わせるトレイト
ここまでは非同期タスクの使い方を紹介してきました。 基本的にはこれまでの内容を覚えておけば問題ないのですが、 アプリケーション開発の際には更にいくつかのトレイトと組み合わせて使うことがほとんどです。 この章では、実際のアプリケーション開発で知っておくべきトレイトを紹介します。
futures::stream::Stream
Stream
とは「任意個のデータを非同期に扱うオブジェクト」で、言わば「非同期版Iterator
」です。
Stream
にはnext
メソッドが含まれて*8おり、
これを使うと「次のアイテムを取得するFuture
」を取得できます。
このFuture
の戻り値はOption
であり、次のデータがある場合はSome
でデータを、
データがない場合はNone
を返すため、これをwhile
ループで回します。
async fn stream_example() { let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); // linesはStreamを実装した型 let mut lines = cursor.lines(); // lines.next()はFuture<Output=Option<Result<String, Error>>>を返す while let Some(line) = lines.next().await { let line = line.unwrap(); println!("{}", line); } }
ちなみにstream.next()
から返るFuture
はstream
を参照しているだけであるため、
下記のようにstream.next()
の結果を破棄してもアイテムをスキップすることにはなりません。
アイテムをスキップするには.await
した結果を破棄すると良いでしょう。
async fn stream_example() { let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); // linesはStreamを実装した型 let mut lines = cursor.lines(); // lines.next()はFuture<Item=Option<Result<String, Error>>>を返す let _ = lines.next(); // loremをスキップしたいが・・・ println!("{}", lines.next().await.unwrap().unwrap()); // loremが出力される let _ = lines.next().await; // ipsumをスキップする println!("{}", lines.next().await.unwrap().unwrap()); // dolorが出力される }
また、これらトレイトを実際に使う場合にはfutures::stream::Stream
のインポートが必要です。
ただし、use futures::prelude::*;
あるいはuse async_std::prelude::*;
があれば問題ありません。
標準ライブラリ化の可能性
ここで説明したStream
はfuturesクレートに存在するもので、標準ライブラリのものではありません。
ただし、async/awaitのRFCはこれを標準化することを想定しており、
将来的にはfor
によるループも含めて標準入りする可能性はあります。
ただし標準入りさせる動きは今の所ないため、いつ入るか、あるいは本当に入るかどうかは不透明です。
// 標準入りしたらこんな感じ? async fn stream_example() { let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); for await line in cursor.lines() { let line = line.unwrap(); println!("{}", line); } }
futures::io::AsyncReadなどのI/O系トレイト
futures::io::{AsyncRead, AsyncBufRead, AsyncWrite, AsyncSeek}
は、
それぞれ標準ライブラリのstd::io::{Read, BufRead, Write, Seek}
を非同期化したものです。
非同期タスク上でI/O処理をする際は標準ライブラリのトレイトの代わりにこれらのトレイトを使います。 これらトレイトにより、あらゆるI/O処理の効率的な非同期タスク化が実現されます。
use std::io::{Cursor, SeekFrom}; use futures::prelude::*; let mut data = *b"lorem\nipsum\r\ndolor"; // cursorはAsyncRead / AsyncBufRead / AsyncWrite / AsyncSeekをすべて実装している let mut cursor = Cursor::new(&mut data); // AsyncWrite cursor.write_all(b"rusty").await?; // AsyncSeek cursor.seek(SeekFrom::Start(0)).await?; // AsyncRead let mut buf = [0u8; 6]; cursor.read_exact(&mut buf).await?; assert_eq!(&buf, b"rusty\n"); // AsyncBufRead let line = cursor.lines().next().await.unwrap()?; assert_eq!(line, "ipsum");
また、これらトレイトを実際に使う場合にはfutures::io::{AsyncReadExt, AsyncBufReadExt, AsyncWriteExt, AsyncSeekExt}
のインポートが必要です。
ただし、use futures::prelude::*;
あるいはuse async_std::prelude::*;
があれば問題ありません。
futures::sink::Sink
Sink
トレイトはデータの送信する口を抽象化したものです。
AsyncWrite
は&[u8]
を書き込むためのものですが、こちらは特定の型T
の送信に使います。
主にチャネルで使うことになるでしょう。
// txはSinkを実装している let (mut tx, mut rx) = futures::channel::mpsc::unbounded(); tx.send(()).await?; let () = rx.next().await.unwrap(); Ok(())
紛らわしいことに、futures::io
にもSink
がいます。
こちらはfutures::io::sink()
によって作られる構造体で、どこにもデータを書き込まず、虚空に吐き捨てるためのものです。
また、このトレイトを実際に使う場合にはfutures::sink::SinkExt
のインポートが必要です。
ただし、use futures::prelude::*;
があれば問題ありません。
すべてを非同期化せよ
ここまでは純粋に非同期プログラミングの書き方を紹介してきました。 これで確かに非同期プログラミングを書けるようにはなるのですが、実際にコードを書く前に1つ注意点があります。 それは非同期タスクの中でブロッキングAPIを呼び出してはいけないということです。 これをすると同じスレッドで実行されている非同期タスクの実行もブロックされ、 非同期コードを書いたにも関わらずパフォーマンスが落ちます。 場合によってはデッドロックが起きることもあります。
例えば、ファイルを読んで、その中身をTCPソケットで送信することを考えます。
async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> { let s = std::fs::read(path)?; // ここで他のタスクも含めてブロックしてしまう! socket.write_all(s).await?; Ok(()) }
上記コードでは非同期タスクの中でstd::fs::read
を実行してしまっています。
この関数の呼び出しに10秒掛かったとすれば、他の非同期タスクも10秒間実行されなくなってしまうのです。
これを回避するにはブロッキングタスクのstd::fs::read
の代わりに非同期タスク(tokio::fs::read
など)を使うか、
ブロッキングタスクを「逃がす」APIを使います。
// [dependencies] // tokio = { version = "0.2", features = ["fs", "io-util", "tcp"] } // or // tokio = { version = "0.2", features = ["full"] } async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> { let s = tokio::fs::read(path).await?; // タスクを非同期化 socket.write_all(&*s).await?; Ok(()) }
タスクを逃がす
std::fs::read
のように代替関数があるなら良いのですが、中には外部クレートを呼び出しており、
どうしても非同期化出来ないタスクがあります。
そのようなときに使えるのが「同期タスクを逃がす」APIです。
同期タスクを逃がすAPIを使用することで、別の非同期タスクの実行を邪魔することなく同期タスクを実行することが出来ます。
tokioの場合
tokioではtokio::task::block_in_place
を使います。
この関数は現在タスクを実行しているスレッドに溜まっている非同期タスクを別のスレッドに委任し、
現在のスレッドで同期タスクを実行します。
現在のスレッドでタスクを実行するため戻り値はFuture
ではなく同期タスクの戻り値そのものです。
多くの場合は関係ありませんが、tokioのfeaturesにrt-threaded
やfull
が含まれていない、
あるいはtokio::runtime::Builder
にthreaded_scheduler
を指定していない場合は使うことが出来ません。
よく分からなければこちらを使うと良いでしょう。
// [dependencies] // tokio = { version = "0.2", features = ["blocking", "io-util", "rt-threaded", "tcp"] } // or // tokio = { version = "0.2", features = ["full"] } async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> { let s = tokio::task::block_in_place(move || { std::fs::read(path) // consider to use tokio::fs::read })?; socket.write_all(&*s).await?; Ok(()) }
また、クロージャーがSend
トレイトと'static
制約を満たせる(借用キャプチャをしないか、'static
参照でのみ借用する)場合は
tokio::task::spawn_blocking
も使うことが出来ます。
この関数はタスクを実行するためのスレッドとは別の、同期タスク専用のスレッドを使ってタスクを実行します。
タスクとは別のスレッドで実行されるため、他のタスクの実行を妨げることがありません。
恒常的に同期タスクが発生する場合はこちらの使用を考えてみてください。
// [dependencies] // tokio = { version = "0.2", features = ["blocking", "io-util", "tcp"] } // or // tokio = { version = "0.2", features = ["full"] } async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> { // クロージャーを'staticにするために借用を断ち切る let path = path.to_path_buf(); // このクロージャーはSendかつ'static let s = tokio::task::spawn_blocking(move || { std::fs::read(path) // consider to use tokio::fs::read }).await??; socket.write_all(&*s).await?; Ok(()) }
async-stdの場合
async-stdの場合はasync_std::task::spawn_blocking
を使います。
この関数はタスクを実行するためのスレッドとは別の、同期タスク専用のスレッドを使ってタスクを実行します。
ただし、クロージャーがSend
トレイトと'static
制約を満たす(借用キャプチャをしないか、'static
参照でのみ借用する)必要があります。
async fn send_file(socket: &mut async_std::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> { // クロージャーを'staticにするために借用を断ち切る let path = path.to_path_buf(); // このクロージャーはSendかつ'static let s = async_std::task::spawn_blocking(move || { std::fs::read(path) // consider to use async_std::fs::read }).await?; socket.write_all(&*s).await?; Ok(()) }
外部クレートにご注意
依存したクレートがタスクをブロックするかどうかは実装を見ないと分かりません。
実装を見ずに見分けるポイントとしては、その関数が「ファイルかインターネットにアクセスするかどうか」です。
その関数がファイルかインターネットにアクセスし、かつ戻り値がFuture
ではない場合、大抵その関数はブロックするでしょう。
もちろんその関数がブロックしない場合もありますし、あるいはそれ以外の関数がブロックしたりする場合もあるため、
非同期タスクの中での外部クレートの呼び出しには十分注意しましょう。
後方互換性
futures 0.3には以前のfuturesを使ったクレートとの互換性レイヤーが含まれており、 Rust 1.39以前のコードと非同期関数・ブロックを組み合わせることが出来ます。
これを使うためには、Cargo.tomlでfuturesにfeatureを追加する必要があります。
[dependencies] futures = { version = "0.3", features = ["io-compat"] } # compatでも良い # futures = { version = "0.3", features = ["compat"] }
このとき、futuresという名前で複数のバージョン(0.1と0.3)に対して依存することは出来ません。 この問題に対処するためにfutures01というパッケージが用意されており、 これを使うとfutures 0.3とfutures 0.1を同時に利用することが出来ます。
[dependencies] futures = { version = "0.3", features = ["io-compat"] } futures01 = "0.1"
Rust 1.39以前の機能を非同期関数から使う
futures::compat::Future01CompatExt
をインポートすることで、futures 0.1のFuture
をRust 1.39以降のFuture
に変換できます。
futures01::future::Future<Item = T, Error = E>
がstd::future::Future<Output = Result<T, E>>
に変換されます。
use futures::compat::Future01CompatExt; let value = future01.compat().await?;
上記を含め、互換トレイトには下記があります。
元となるトレイト | 互換トレイト | 変換メソッド |
---|---|---|
futures01::future::Future |
futures::compat::Future01CompatExt |
compat |
futures01::stream::Stream |
futures::compat::Stream01CompatExt |
compat |
futures01::sink::Sink |
futures::compat::Sink01CompatExt |
sink_compat |
futures01::future::Executor |
futures::compat::Executor01CompatExt |
compat |
tokio_io::AsyncRead |
futures::compat::AsyncRead01CompatExt |
compat |
tokio_io::AsyncWrite |
futures::compat::AsyncWrite01CompatExt |
compat |
Rust 1.39以前の機能から非同期関数を使う
futures01::future::Future
にはResult
のような機能が含まれているため、
std::future::Future<Output = O>
におけるO
はResult<T, E>
となっている必要があります。
ただし、O
がResult
でなかったとしても、future.unit_error()
を呼び出すことでResult<O, ()>
に、
future.never_error()
を呼び出すことでResult<O, Never>
に変換することが出来ます。
Never
型は「絶対にありえない型」を表しており、将来的には!
型に置き換わることになっています
*9。
同時にfutures01::future::Future
に変換するには変換元の型がUnpin
を実装している必要があるため、
Box
化するためにfuture.boxed()
を呼び出します。
上記2つを満たしてようやくfuture.compat()
が呼び出せるようになります。
長々と書きましたが、要はfuture.boxed().compat()
かfuture.never_error().boxed().compat()
を呼び出せば良いです。
// future03はFuture<Output = ()> let future03 = async { println!("async block"); }; let future01 = future03 .never_error() .boxed() .compat(); // future03はFuture<Output = Result<(), ()>> let future03 = async { if true { Ok(()) } else { Err(()) } ]; let future01 = future03 .boxed() .compat();
上記を含め下記の変換が行えます。
元となるトレイト | 必要な制約 |
---|---|
std::future::Future |
Output: Result<T, E> |
futures::stream::Stream |
Item: Result<T, E> |
futures::sink::Sink |
- |
futures::io::AsyncRead |
- |
futures::task::Spasn |
- |
また、これらのトレイトに一度に変換したい場合はfutures::compat::Compat
が使えます。
// obj01は、obj03がFutureを実装していればFutureの互換を実装し、 // obj03がStreamを実装していればStreamの互換を実装し、 // obj03がSinkを実装していればSinkの互換を実装する let obj01 = futures::compat::Compat::new(obj03);
tokioのサンプルで解説
これまでに出てきたもののおさらいとして、tokioのサンプルコードを読んでみます。 なお、解説用に少し変更しています。
// [dependencies] // futures = "0.3" // tokio = { version = "0.2", features = ["full"] } // tokio-util = { version = "0.2", features = ["full"] } // futures::preludeではStreamExtなどの必須アイテムがエクスポートされている use futures::prelude::*; use tokio::net::TcpListener; use tokio_util::codec::{BytesCodec, Decoder}; use std::env; // 関数にasyncを付けることで、その関数を非同期化する。 // また、#[tokio::main]属性を指定することで非同期main関数を実行するランタイムを設定する。 // この属性を指定しない場合、自身でランタイムを起動する必要がある。 #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); // TCP接続を受け付けるリスナーを作成する。 // TcpListener::bind()はFutureを返すため、.awaitを付けてタスクの完了を待機する。 let mut listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); loop { // メインタスク上で新たな接続を受け付ける。 // listener.accept()はFutureを返すため、`.await`を付けてタスクの完了を待機する。 let (socket, _) = listener.accept().await?; // タスクをtokio::spawnによってバックグラウンドタスクとして起動する。 // タスクをメインループ上で実行した場合、タスクが終了するまで // 新たな接続を受け付けることが出来なくなってしまうため、バックグラウンドタスクとしての起動が必要。 // // タスクはasyncブロックを使用し、moveを付けることでsocketの所有権を奪っている。 tokio::spawn(async move { // `tokio::codec::BytesCodec`を使い、クライアントから送られてくるデータを処理する。 // framedの型はStreamを実装している。 let mut framed = BytesCodec::new().framed(socket); // クライアントから送られてくるデータを受信する。 // `framed.next()`によってデータを受信するFutureを取得する。 // このFutureはクライアントとの接続が切れた場合、Noneを返す。 while let Some(message) = framed.next().await { match message { // データを受信したので表示 Ok(bytes) => println!("bytes: {:?}", bytes), // ソケットでエラーが発生した Err(err) => println!("Socket closed with error: {:?}", err), } } println!("Socket received FIN packet and closed connection"); }); } }
asyncで使えるクレートたち
Rust 1.39リリース時点でasync/await/futures 0.3に対応している主要なクレートを紹介します。 これらのクレートを使えば、futures 0.1との互換性で躓くこともないでしょう。
futures
バージョン0.3以降が必要です。
Future
を操作するために必要な機能が集まったクレートで、
非同期処理を行うあらゆるクレートが依存している、事実上の標準ライブラリです。
tokio
バージョン0.2以降が必要です。
ファイルやネットワークなどのI/O処理を行うためのクレートです。 futures 0.1時代ではfuturesと並んで事実上の標準ライブラリでしたが、 async時代はasync-stdの登場により競争が生まれそうです。
標準の状態では機能が制限されているため、Cargo.tomlにはこのように指定すると良いでしょう。
[dependencies] tokio = { version = "0.2", features = ["full"] }
async-std
Rustの標準ライブラリを、使い勝手をそのままに非同期化するクレートです。
標準ライブラリと同じモジュール名、関数名、引数で使えるため、学習コストが低いことが特徴です。
面白いのがprintln!
やwrite!
といったマクロなども非同期化されていることで、
すべてを非同期化するにはtokioよりもasync-stdの方が向いているかもしれません。
hyper
バージョン0.13以降が必要です。
HTTPクライアントやサーバーを構築するための基底クレートです。 tokioに依存しています。
reqwest
バージョン0.10以降が必要です。
HTTPをリクエストするために便利なクレートです。 hyperに依存しています。
thin_main_loop
バージョン0.2以降が必要です。
その名の通り「メインループ」を書くためのクレートで、主にGUIアプリケーションを想定しているようです。
tracing(旧tokio-trace)
アプリケーションを分析するためのトレーシングを提供するクレートです。
非同期関数に#[instrument]
属性を付けると非同期関数であることを考慮して実行時間を計測してくれるなど
非同期タスクへのサポートがあります。
wasm-bindgen
RustでWebAssemblyを書くための必須クレートです。
wasm-bindgenのサブクレートであるwasm-bindgen-futuresクレートには、
JavaScriptのPromise
をRustのFuture
に変換して非同期タスクで使用できるようになったり、
Future
を起動するランタイムがあるなどの非同期タスクへのサポートがあります。
async-trait
トレイトメソッドを非同期関数化するための#[async_trait]
属性を提供するクレートです。
これがどういうことかというと、まず、トレイトメソッドは戻り値の型を具体的に指定しなければなりません。
しかし非同期関数の戻り値はimpl Future<Output=T>
になるので型が分かりません。
結果としてトレイトメソッドを非同期関数化出来ません。
そこで、戻り値をトレイトオブジェクト化してやると戻り値の型を固定できます。
実際はPin<Box<dyn Future<Output=T> + Send + 'a>>
のような型を用いますが、
この型にするためには次のようなコードを書く必要があります。
trait Trait { fn method(&self) -> Pin<Box<dyn Future<Output = i32 + Send>>>; } struct Hoge; impl Trait for Hoge { fn method(&self) -> Pin<Box<dyn Future<Output = i32 + Send>>> { async { 0 }.boxed() } }
これをいちいち書くのは大変です。 そこでasync-traitクレートを使うと簡単に書くことが出来ます。
// トレイトの定義に#[async_trait]属性を付ける #[async_trait] trait Trait { async fn method(&self) -> i32; } struct Hoge; // 実装にも#[async_trait]属性を付ける #[async_trait] impl Trait for Hoge { async fn method(&self) -> i32 { 0 } }
また、#[async_trait]
の代わりに#[async_trait(?Send)]
とすることでSend
を要求しなくなります。
型としてはPin<Box<dyn Future<Output = T> + 'a>>
のようになります。
非同期関数の中でSend
を実装しない型(ポインタ型など)を使用する場合はタスクをSend
化出来ないため、
この機能を用いることになるでしょう。
async-stream
Streamを簡単に書くためのクレートです。 async/awaitのRFCではジェネレーターを使ったStreamを提案していますが、 このクレートはマクロを使ってこの機能を実現しています。 ただし、RFCのように関数をStream化するのではなくブロックをStream化するものであることに注意が必要です。
// 0〜2の値を非同期に返す関数 fn zero_to_three() -> impl Stream<Item = u32> { // stream!の代わりにtry_stream!を使うと`?`を使ってエラー処理が出来るようになる stream! { for i in 0..3 { yield i; } } } #[tokio::main] async fn main() { let s = zero_to_three(); pin_mut!(s); // ループに必要なおまじない while let Some(value) = s.next().await { println!("{}", value); } }
ただし、これはマクロなので、内部でIDE支援が効かないというデメリットがあります。
ランタイムが固定できるなら、ちょっと煩雑にはなるものの、代わりにfutures::channel::mpsc::channel(0)
を使っても良いと思います。
fn zero_to_three() -> impl Stream<Item = u32> { let (mut tx, rx) = futures::channel::mpsc::channel(0); // ランタイムはtokioを想定 tokio::spawn(async move { for i in 0..3 { // バッファサイズが0なのでデータが受信されるまで待機する if tx.send(i).await.is_err() { // Streamが切断された break; } } }); rx }
さいごに
これであなたも今日から非同期プログラミングを扱えるようになったはずです。 あなたはどのような分野で、Rustを使った非同期プログラムを書くでしょうか。
私はと言えば、組み込み分野ではESP32を使った温湿度モニターなどをRustで作ってみたいですし、 ゲーム分野ではE1K問題*10が叫ばれて久しい[要出典]EDF(私の好きなゲーム)も、この問題を解消できる可能性がなきにしもあらずということでRustで作ってみたいです。 オプティムの製品としても、AI Cameraをより効率的に実行できるようにしたいと思っています。
Rustの学習は困難とよく言われますが、今回のバージョンアップで今までと比べ物にならないほど魅力が増し、 一度覚えてしまえば極限に楽しいコーディングが実現されつつあります。 これまでは難易度から尻込みしていた方も、これを機に学習を始めてみてはいかがでしょうか。
オプティムでは最高速エンジニアを募集しています。
謝辞
Rustの非同期プログラミングの歴史の章はこちらの記事に大変お世話になりました。 ありがとうございます。
こちらも大変参考にさせていただきました。ありがとうございます。
この記事のもととなったOPTiM TECH BLOG Meetup #1に参加、 非同期プログラミングに興味を持っていただき、Qiitaに入門記事をアップしていただきました。 ありがとうございます。
ライセンス表記
- 冒頭の画像中にはRust公式サイトで配布されているロゴを使用しており、 このロゴはMozillaによってCC-BYの下で配布されています
- 冒頭の画像はいらすとやさんの画像を使っています。いつもありがとうございます
*1:そんな曲がΣ(´∀`;)
*2:File#ReadといったI/O処理関数などの中で他のgoroutineに実行権限を移譲するようになっています
*3:厳密に言えば正式リリース前は実装されていたが削除されました
*4:現在のFutureはトレイトです
*5:実際、gtk-rsがfutures 0.2を使ってしまっていました
*6:Unity 2018.1〜
*7:ゲームはあれほど巨大なシステムなのに滅多に落ちないのが不思議です。デバッグを超絶頑張っているのだと思いますが・・・
*8:厳密に言えばStreamExtに含まれる
*9:!型はRust 1.41を目標に安定化作業中の機能で、コンパイラが「絶対にありえない型」を認識することで広範な最適化が可能になります
*10:敵を1000体出すと流石に処理落ちがうざい問題