Rustの非同期プログラミングをマスターする

こんにちは、R&Dチームの齋藤(@aznhe21)です。

さあみなさん、ついにこの時がやってまいりました。 本日2019/11/8にリリースされたRust 1.39により、あらゆる環境で最高速な非同期プログラミングが可能になりました。 新たな時代に乗り遅れないよう、今のうちにRustでの非同期プログラミングをマスターしておきましょう。

なお、この記事は、先日開催したOPTiM TECH BLOG Meetupの内容を大幅に加筆修正した上でエントリに仕上げたものです。

8コアでマルチタスク

まず最初に伝えたいこと

Rust 1.39から使えるようになったasync/awaitですが、この「async」の読み方が分かるでしょうか? 日本ではよく「アシンク」と呼ばれることの多いこの単語ですが、英語圏では「エーシンク」と読まれるようです。

実際、YouTubeでasyncを検索すると、坂本龍一の楽曲*1と、 C#やTypeScriptなどでのasync/awaitの解説動画などが出てきます。 この解説動画を見てみると、確かに「エーシンク」と読んでいることが分かります。

多少違和感はあるでしょうが、「idea」が「イデア」ではなく「アイデア」になることを考えると 抵抗も少なくなるのではないでしょうか。

なお、「await」の読み方は「アウェイト」です。英語滅びろ

追記:下記ツイートで指摘を受けて確認したところ、asyncはギリシャ語由来、awaitはラテン語由来だそうです。 (´・ω・`)はあまじはあ

非同期の歴史

まず、Rustの非同期について説明する前に、 なぜプログラミング言語に非同期サポートが必要なのかをご説明しましょう。

2000年代後半から顕在化し始めたC10K問題により、1つのタスクに1つのスレッドやプロセスを割り当てる方式に限界が見えてきました。 この問題が発覚する以前は、例えばApacheであれば1つのリクエストに1つのプロセスが割り当てられ、 更にCGIによって1つのプロセスが割り当てられていました。 スレッドやプロセスを起動するには数KB〜数MBのメモリが必要なため、同時に数千〜数万のアクセスがあると 数十GBものメモリが必要になり、リソースが枯渇してしまうのです。

この問題を回避するため、nginxやNode.jsが誕生しました。 nginxは1つのリクエストに対して1つのプロセスを割り当てるのではなく、 スレッドプールとイベント駆動モデルを駆使し、スレッド数・プロセス数は固定のまま、 多くの処理を捌けるようにしたのです。

これらを図にまとめると下記のようになります。 それぞれのタスクでは待機時間が多くを占めており、 Apacheにおいては待機時間のために無駄にスレッドが生存していることが分かります。 逆に、nginxは待機時間中に他のタスクを実行することで、 1つのスレッドで3つのタスクを同時に実行できることが分かります。 このパイプライン化により、非同期プログラムは効率的な処理を実現するのです。

Apache#1
待機
受信
処理
送信
Apache#2
待機
受信
処理
送信
Apache#3
待機
受信
処理
送信
nginx
待機
#1
#1
#1
待機
#2
#2
#2
待機
#3
#3
#3

もちろん、これはいいことずくめではありません。 タスクの中の「待機時間」を他のタスクから使えるようにするためには どこが待機時間なのかを判別する必要がありますし、例え判別できたとしても、 他のタスクに「実行権限を移譲」する必要があります。 これは、従来のプログラミング言語の機能だけでは難しく、 実際、Node.jsの中身は大変複雑に実装されています。

ここで、様々な言語を経て洗練され、C#によって広く知れ渡ったasync/await構文が重要になってきます。 async/await構文によって「実行権限を移譲」する機能が簡単に実現されるのです。 この構文により非同期プログラミングが格段に楽になり、今日のハイパフォーマンスなサーバーサイドアプリケーションを支えているのです。

ちなみに、Go言語は「実行権限の移譲」を重厚なライブラリによって隠蔽しており、 「古い」プログラムのように書いても「今風の」プログラムのように実行されるようになっています *2。 この「あたかもOSのスレッドのように動作するが、実際は内部でうまく協調動作している」というものを グリーンスレッドと呼びます。 これはとても素晴らしい仕様のようにも思えますが、ライブラリが重厚になってしまうことを避けられず、 システムプログラム言語であるRustとは思想が合わないため実装されていません*3

Rustの非同期プログラミングの歴史

Rustの非同期プログラミングの歴史のうち、重要な部分をまとめました。

Rust 1.0以前

前章で書いたグリーンスレッドは、Rustの正式リリース前のバージョンである〜0.9の時代には標準で含まれていました。 しかし、これは各種オーバーヘッドを鑑み、Rustが正式バージョンになる前に削除されています。

Rust 1.0

時期:2015/5/15

Rustの正式版がリリースされました。

〜Rust 1.3

時期:〜2015/9/17

ほとんど余談ですが、Rustには1.3までFutureという名前の構造体が(未安定化の状態で)存在していました *4。 これはC++のstd::futureとC#のSystem.Threading.Tasks.Taskを 足して2で割ったようなインターフェースで、 「タスクの並列実行」と「タスクの結果をブロックして受け取る」という2つの主な機能から成り立っていました。

このFutureはnightly限定のまま、Rust 1.3で削除されています。

Rust 1.2あたり

時期:2015/8/7あたり

Rust標準から削除されたグリーンスレッドを非標準で実現するクレートである context-rs / rotor / mioco が登場し始めます。

これらのクレートはグリーンスレッドを実現するもので、グリーンスレッドであるためアプリケーションとしては楽に書けますが、 各タスクにスタックを確保する必要があったり、コンテキストスイッチのためにプラットフォームごとにアセンブリコードが必要になるなど、 Rustの特性に合わないものでした。

これらのクレートは、後に登場するクレートたちによって駆逐されていきます。

Rust 1.11あたり

時期:2016/8/18あたり

futures/tokioクレートが登場し始めます。

futuresクレートによって非同期プログラムのゼロコスト抽象化が実現されました。 これはスタックの確保もコンテキストスイッチも不要なため、 マイコンのような環境でもマルチタスクの実行が可能になり、 Rustの思想にあった非同期プログラミングが実現されました。

ただし、これはメソッドチェーンで処理を繋げる必要があり、 所有権も相まって実に使い勝手の悪いものでした。

Rust 1.26あたり

時期:2018/5/10あたり

futures 0.2が(一瞬だけ)登場しました。

futures 0.1にはコンテキストが暗黙的だったり、トレイトの定義が大きすぎるなどといった問題があったため、 それらを解決するためにfutures 0.2が登場しました。 ただし、これは実験目的で、数ヵ月以内に0.3を出すつもりだったもので、 その状態のものをcrates.io/docs.rsに公開すると利用者が混乱する*5ため futures 0.2は取り下げられ、futures-preview 0.2として再度公開されました。

また、その後Futureトレイトが標準ライブラリ入りする話も相まって、0.2から更にAPIが変更された0.3が出されることとなりました。

Rust 1.36

時期:2019/7/4

紆余曲折を経てようやく標準ライブラリとしてのFutureが安定化されました。

とは言えFutureを使うためのデファクトスタンダードライブラリであるfuturesクレートは正式版が出ず、 あくまで「準備が整っただけ」の状態です。

Rust 1.39

時期:2019/11/7

async/await構文がついに安定化されました!

この構文により非同期プログラミングが書きやすくなるだけではなく、 これまでは所有権からどうしてもコストが高い書き方をせざるを得なかった部分も直感的かつ低コストに実現できるようになります。

実にRust 1.0のリリース(2015/5/15)から4年半越しに、 快適な非同期プログラミングが実現されることになります。 長かった・・・!

Rustの非同期プログラミングの特徴

ここまではRustに非同期プログラミングが実装されるまでの経緯を紹介しました。 この章では、Rustの非同期プログラミングがどのような特徴を持っているのかをご紹介します。

ゼロコスト抽象化

これはRust自体の設計思想でもあるのですが、Rustの非同期プログラミングは他の言語とは違ってゼロコスト抽象化を謳い、 軽量かつ高速な動作を実現しています。

Rustで非同期なコードを書くとトレイトの仕組みによって関数呼び出した静的に解決されます。 これを静的ディスパッチと呼び、この仕組みがRustの高速性に寄与しています。

また、静的ディスパッチはヒープを使ったメモリ確保を必要としないためメモリ消費を抑えられるだけでなく、 メモリの確保が出来ないプラットフォームでの実行も可能にします。 これは次に説明する「プラットフォーム非依存」にも繋がってきます。

プラットフォーム非依存

Rustの非同期プログラミングはあらゆるプラットフォームで動くように設計されています。 一般的によく使われるWebサーバーやGUIアプリケーションはもちろん、 WebAssemblyでも薄いランタイムライブラリだけで動くようになっていますし、 果ては組み込み(ベアメタル)環境でも動くようになっているのです。

これは前述の静的ディスパッチだけでなくタスクの停止・再開など、非同期プログラミングに関する機能すべてがそのようになっており、 開発者が独自にタスクの制御が出来るようになっています。

これまでは非同期プログラミングがサポートされていなかった環境で、 同期的な記述を工夫して書いて非同期プログラミングを実現していたところも、 これからは直感的に書けるようになるのです。

Rustの非同期プログラミングで出来るようになること

Rustの非同期プログラミングの特徴が分かったところで、実際にどのような分野に応用できるのかを、妄想込みでお送りします。

組み込み

Rustの非同期プログラミングはランタイムライブラリを必要としないため、 組み込み分野においても使用されることが期待できます。

例えば、2つボタンがあり、それぞれボタンが押されたらI2Cで温度センサから値を読み込みんでシリアルに出力する、 というコードは下記のようになるでしょう(想像)。

async fn print_temp(bus: u8) -> impl Future<Output = ()> {
    let mut bus = i2c::Bus::new(bus);
    bus.write(reg);
    let temp = bus.read().await;
    println!("{}", temp);
}

fn main() {
    let task1 = async {
        let button = interrupt::enable(interrupt::buttons[0]);
        loop {
            // ボタン0が押されるのを待つ
            button.next().await;
            // センサから値を読み取ってシリアルに出力
            print_temp(0xE0).await;
        }
    };
    let task2 = async {
        let button = interrupt::enable(interrupt::buttons[1]);
        loop {
            // ボタン1が押されるのを待つ
            button.next().await;
            // センサから値を読み取ってシリアルに出力
            print_temp(0xE1).await;
        }
    };

    task::run(task1.join(task2));
}

interrupt::enableは割り込みテーブルにポートを登録し、wait_interruptによって割り込みを待つという動作を想定しています。 また、i2c::BusはI2Cのバスを表しており、writeで値の送信、readで値の受信をする想定です。

このコードは2つのタスクを同時に動かしています。 つまり、マイコンでマルチタスクが使えているのです(想像上の話です)。 さらに、このマルチタスクはプリエンプティブではないためスタックを確保する必要もなく、 メモリ消費量が決定的になり、メモリ管理も不要になるのです。

OS

RustはOSも作れる言語であり、実際OSを作るプロジェクトがいくつか存在します。

デバイスドライバを非同期で書くなど、カーネルでも非同期プログラミングが役に立つ可能性があります。

ゲーム

RustはAmethystPistonggezを始めとしたゲームエンジンが複数あり、2D/3Dゲーム開発にも使われています。

ゲームの世界でも非同期な処理は行われ、 例えばテクスチャの遅延読み込み(テクスチャストリーミング)や次シーンの先読み、 あるいはネットワーク経由での世界ランキングの取得などがあります。 その多くはゲームエンジンやライブラリにより隠蔽されますが、どうしても隠蔽できない部分は出てくるものです。

現在の主なゲームエンジンであるUnityとUnreal Engine 4の記述言語はC#とC++です。 C#には非同期プログラミングの機能が備わっていますが、その機能がUnityで使えるようになったのはごく最近*6ですし、 C++での非同期プログラミングは地獄です。 それらをRustで書けるとしたらどうでしょう?

例えば、Unityで非同期処理をすることを考えてみましょう。 Unityには非同期プログラミングを書くための機能StartCoroutineが備わっています。 これは当時採用していたランタイムの制約上、関数を中断する機能を流用したものであり、 非同期関数から戻り値を直接返せないという非常に使い勝手の悪いものです。

private void Start()
{
    StartCoroutine(LoadHtml());
}

private IEnumerator LoadHtml()
{
    Debug.Log("Start LoadHtml");
    using (WWW www = new WWW("http://example.com/"))
    {
        yield return www;
        Process(www.text);
    }
    Debug.Log("Done LoadHtml");
}

これをRustで書き直すとこのようになるでしょう(想像)。

fn start() {
    unity::task::spawn(async {
        unity::debug::log("start load_html");
        let text = unity::www::new("http://example.com/").get().await;
        process(text);
        unity::debug::log("done load_html");
    });
}

非同期以外でもメリットはあります。 Rustで書けば自ずとクラッシュしにくいコードが書けますし、自然に書いてもC#やC++よりもメモリ消費が減るでしょう。 となると、デバッグコストが大幅に減るのではないでしょうか? *7

Webアプリ(サーバー)

async/awaitの恩恵を最大限に受けるのはサーバーアプリでしょう。

Rustにはactix-webgothamなど、複数のWebフレームワークがあります。 これらを用いることで、サーバーのリソースをフルに使うことの出来る、大変高速なWebアプリケーションが作れます。

特にactix-webはベンチマークサイトでは最高スコアを記録し続けているWebフレームワークです。

今まではメソッドチェーンでコードが分かりにくくなっていましたが、 これからはasync/awaitで直感的かつ最高速のWebアプリを作ることが出来るのです。

Webアプリ(クライアント)

強力なWebAssemblyサポートによって、クライアントでもRustを使うことが出来ます。

wasm-bindgen-futuresによって、非同期関数をそのままJavaScriptから呼ぶことが出来るのです。

#[wasm_bindgen]
pub async fn run() -> Result<JsValue, Jsvalue> {
    let mut opts = RequestInit::new();
    opts.method("GET");
    opts.mode(RequestMode::Cors);

    let request = Request::new_with_str_and_init(
        "https://example.com/",
        &opts,
    )?;

    let window = web_sys::window().unwrap();
    let resp_value = JsFuture::from(window.fetch_with_request(&request)).await?;

    assert!(resp_value.is_instance_of::<Response>());
    let resp: Response = resp_value.dyn_into().unwrap();
    let text = JsFuture::from(resp.text()).await?;

    Ok(text)
}

サーバーサイドと合わせて最速のWebアプリを作ることも夢ではありません。

Rustの非同期の書き方

ここからは、Rustでの非同期プログラミングの書き方をご紹介します。

これまでとこれから

Rust 1.39から使えるようになったasync/awaitを紹介する前に、まずは以前の書き方と比べてそのありがたさを噛み締めましょう。

0〜9までの数値を1秒毎に表示するシンプルなプログラムを考えます。

これまでのRustでの書き方 with futures 0.1

やっていることはシンプルなのに、めちゃくちゃに長いです。 こんなコードはもう見たくもありません。

// [dependencies]
// tokio = "0.1"
// tokio-timer = "0.2"

use std::time::Duration;
use tokio::prelude::*;

fn print_1_10() -> impl Future<Item = (), Error = ()> {
    future::loop_fn(0, |i| {
        println!("{}", i);
        tokio_timer::sleep(Duration::from_secs(1))
            .map(move |()| {
                if i < 9 {
                    // iが9未満ならloop_fnにfuture::Loop::Continueを返し、処理を続行する
                    future::Loop::Continue(i + 1)
                } else {
                    // iが10以上ならfuture::Loop::Breakを返し、処理を中断する
                    future::Loop::Break(())
                }
            })
            .map_err(|e| Err(e).unwrap())
    })
}

fn main() {
    tokio::run(print_1_10());
}

Rust 1.39〜

ちょっとしたおまじないはあるものの、ごくごく自然な書き方になりました。 Rustが言語としてasync/awaitをサポートする意義が分かると思います。

// [dependencies]
// tokio = { version = "0.2", features = ["macros", "time"] }
// or
// tokio = { version = "0.2", features = ["full"] }

use std::time::Duration;

#[tokio::main]
async fn main() {
    for i in 0..10 {
        println!("{}", i);
        tokio::time::delay_for(Duration::from_secs(1)).await;
    }
}

Futureとは

async/awaitの書き方を解説する前に、ここまで時々登場していたFutureトレイトを説明しておきましょう。

Futureトレイトとは「そのうち完了するタスク」を抽象化したトレイトで、 Future::pollメソッドによって「タスクが完了した」あるいは「タスクが実行中」という状態を返します。

定義としてはこのようになっていますが、この辺りを直接使うことはまずないでしょう。

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

中身について詳しく知りたい方はFutureの解説記事をご覧ください。

async/awaitの書き方

非同期の基礎が分かったところで、非同期プログラムの書き方を紹介します。

非同期関数

前述したサンプルにおいて、Rust 1.39以降の書き方では関数にasyncという単語を付けています。 これによってその関数は「非同期関数」となり、Future(を実装した型)を返すようになります。

// このように書いたコードが...
async fn async_function() -> i32 {
    0
}

// コンパイラ内部でこのように変換される
fn async_function() -> impl Future<Output = i32> {
    UnknownObject::new()
}

このとき、async_function関数が返す型(上記ではUnknownObject)はコンパイラによって暗黙に生成されます。 この型をコードから直接参照することは出来ませんが、Futureトレイトを実装しているためimpl Traitを使って返すことが出来ます。 これはFnなどのトレイトを実装した暗黙型として扱われるクロージャと似ています。

// 非同期関数はFutureトレイトを実装した「よく分からない型」を返す
async fn async_function() -> impl Future<Output = i32> {
    delay_for(Duration::from_secs(1)).await;
    0
}

// クロージャはFnなどのトレイトを実装した「よく分からない型」となる
fn return_closure() -> impl Fn() -> i32 {
    || 0
}

非同期ブロック

ブロックにasyncキーワードを付けるとそのブロックは「非同期ブロック」となり、 非同期関数と同じようにFutureトレイトを実装した「よく分からない型」を返すようになります。 また、ブロックにmoveキーワードを付けると、クロージャーと同じように所有権を奪うブロックになります。

fn main() {
    // async_blockはasync_functionと同じように展開される
    // これもまたFutureトレイトを実装した「よく分からない型」となる
    // let async_block = UnknownObject::new();
    let async_block = async {
        delay_for(Duration::from_secs(1)).await;
        0
    };

    let s = "hoge".to_string();
    let move_block = async move {
        // このブロックはsの所有権を奪う
        println!("{}", s);
    };
    // ブロックが所有権を奪うのでsは使えなくなる
    // println!("{}", s);
}

非同期クロージャー

関数やブロックに非同期版があるのだから、クロージャーにもasyncを付けた非同期版があるように思えます。 残念ながらこれを書こうとするとコンパイラーに怒られます。

fn main() {
    let _ = async || {};
}
error[E0658]: async closures are unstable
 --> src/main.rs:2:13
  |
2 |     let _ = async || {};
  |             ^^^^^
  |
  = note: for more information, see https://github.com/rust-lang/rust/issues/62290

いくつかの問題があり、無効化されているようです。

これは普通のクロージャーから非同期ブロックを返すことで回避できます。

let closure = || async {};
closure().await;

// moveするときはasyncブロックにmoveを付けるだけでOK
let closure_move = || async move {};
closure_move().await;

非同期タスクの待ち方

ここからは、非同期関数や非同期ブロックなど、非同期に動作するタスクを「非同期タスク」と呼ぶことにします。

asyncを付けた関数やブロックの中ではawaitキーワードが使えるようになり、非同期タスクを待機することが出来るようになります。 awaitの使い方はFutureトレイトを実装する値に.awaitを付けるだけです。

async fn await_test() -> i32 {
    let b = async {
        0
    };

    // 非同期タスクbを待機し、その結果が関数の戻り値となる
    b.await
}

フィールド参照のようにも見えて気持ち悪いと思う方もいるかもしれません。 これはメソッドチェーンや?演算子を組み合わせたときの読みやすさを考えてこうなっています。

例えば、JavaScriptでfetch APIを使った時を考えます。

async function get() {
  return await (await fetch("http://example.com/")).text();
}

これをRustに置き換えるとこのようになります。

async fn get() -> String {
    fetch("http://example.com/")?.await.text()?.await
}

ね、読みやすいでしょう?

ランタイムで非同期タスクを起動する

ここまでで非同期関数や非同期ブロックで非同期タスクを作る方法を紹介しましたが、このままではタスクを実行することは出来ません。

例えば下記のコードを実行しても、表示されるのは一番下の"piyo"のみです。

async fn hoge() {
    println!("hoge");
}

fn main() {
    let _ = hoge();
    let _ = async {
        println!("fuga");
    };
    println!("piyo");
}

「Futureトレイトとは『そのうち完了するタスク』を抽象化したトレイト」と書いたように、 非同期タスクは正しく起動して実行を監視しなければ「タスクを完了」させることが出来ません。 非同期タスクを起動するためにはランタイムを用いる必要があります。

ランタイムとは「非同期タスクを実行するエンジン」であり、Rustでの非同期プログラミングには欠かせないものです。 ランタイムは標準ライブラリには含まれていないため、何らかのクレートを使用する必要があります。

例えばtokioクレートをランタイムとして使用する場合、このようになります。

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut rt = tokio::runtime::Runtime::new()?;

    // taskは非同期タスク
    let task = async {
        tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
        println!("hoge");
        Ok(())
    };

    // プログラムが実行されてから1秒後に"hoge"が表示される
    rt.block_on(task)
}

実行モデル

ランタイムによって非同期タスクは効率的に実行されるようになりますが、 どのようなランタイムを使うかによって非同期タスクが実行される方法が異なります。 非同期タスクの実行モデルを紹介しましょう。

1:1モデル

1つの非同期タスクを1つのOSスレッドに割り当てるランタイム。

非同期プログラミングの利点は全くありません。 実装としては「待機すべきときに待機する」だけで良いので、とてもシンプルですし、リソース消費もありません。

プログラムのエントリーポイント(main関数)では1:1モデルで起動し、のちに別のモデルに移行することが多いです。

M:1モデル

複数の非同期タスクを1つのOSスレッドに割り当てるランタイム。

リソースを制限しつつ非同期タスクを実行する際に有用です。

M:Nモデル(スレッドプール)

複数の非同期タスクを複数のOSスレッドに割り当てるランタイム。

細かいことを言うとワークスティーリング型もスレッドプールですが、ここでは便宜上こちらをスレッドプールと呼びます。

CPUリソースを効率的に使用することが出来る実行モデルです。 スレッドごとにキューを持ち、タスクを起動する際には最も数の少ないキューにタスクを入れるイメージです。

長期間タスク待機(.await)をしないタスクがいた場合、他のタスクの実行を妨げる可能性があります。

M:Nモデル(ワークスティーリング)

複数の非同期タスクを複数のOSスレッドに割り当てるランタイム。

CPUリソースをフルに使うことが出来る実行モデルです。 プログラム全体でキューを持ち、各スレッドでタスクが終わるごとにキューからタスクを取り出してタスクを実行するイメージです。

その性質上、タスク待機(.await)の前後で実行されるスレッドが異なる場合があります。

様々なランタイム

前述したサンプルではtokioを用いましたが、他にもいくつかのランタイムがあります。 これらは現時点で利用可能なものであって、将来的に消滅、あるいは新たに誕生する可能性があります。

tokio

ワークスティーリングのM:Nモデルで、効率的にタスクを実行できるランタイムです。 tokio-timerなど、いくつかの機能はtokioランタイムの上でしか動きません。

属性によるランタイムの起動の他、コードからも起動が出来ます。

// [dependencies]
// tokio = { version = "0.2", features = ["macros"] }
// or
// tokio = { version = "0.2", features = ["full"] }

// 属性による起動
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tokio::spawn(async {
        // do something
    });

    // do something
    Ok(())
}
// コードからの起動
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut rt = tokio::runtime::Runtime::new()?;
    rt.block_on(async {
        tokio::spawn(async {
            // do something
        });

        // do something
        Ok(())
    })
}
async-std

async_std::task::block_onは1:1モデル、 async_std::task::spawnはワークスティーリングのM:Nモデルです。

属性によるランタイムの起動の他、コードからも起動が出来ます。

// [dependencies]
// async-std = { version = "1.X", features = ["attributes"] }

// 属性による起動
#[async_std::main]
async fn main() {
    async_std::task::spawn(async {
        // do something
    });

    // do something
}
// コードからの起動
fn main() {
    async_std::task::block_on(async {
        async_std::task::spawn(async {
            // do something
        });

        // do something
    });
}
futures::executer

futures::executor::block_onは1:1モデル、 futures::executor::LocalPoolはM:1モデル、 futures::executor::ThreadPoolはスレッドプールのM:Nモデルです。

属性は用意されておらず、コードからのみ起動出来ます。

fn main() {
    let pool = futures::executor::ThreadPool::new().unwrap();
    pool.spawn_ok(async {
        // do something
    });

    // do something
}
wasm-bindgen

WebAssemblyとRustの架け橋となるwasm-bindgen(の一部であるwasm-bindgen-futures)もまた、ランタイムの機能を持っています。 これはJavaScriptのPromise.then(と、有効化されていればAtomics.waitAsync)によって実装されており、 M:1モデルで実行されます。

fn spawn_test() {
  wasm_bindgen_futures::spawn_local(async {
    // do something
  });
}

非同期タスクと組み合わせるトレイト

ここまでは非同期タスクの使い方を紹介してきました。 基本的にはこれまでの内容を覚えておけば問題ないのですが、 アプリケーション開発の際には更にいくつかのトレイトと組み合わせて使うことがほとんどです。 この章では、実際のアプリケーション開発で知っておくべきトレイトを紹介します。

futures::stream::Stream

Streamとは「任意個のデータを非同期に扱うオブジェクト」で、言わば「非同期版Iterator」です。 Streamにはnextメソッドが含まれて*8おり、 これを使うと「次のアイテムを取得するFuture」を取得できます。 このFutureの戻り値はOptionであり、次のデータがある場合はSomeでデータを、 データがない場合はNoneを返すため、これをwhileループで回します。

async fn stream_example() {
    let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");

    // linesはStreamを実装した型
    let mut lines = cursor.lines();
    // lines.next()はFuture<Output=Option<Result<String, Error>>>を返す
    while let Some(line) = lines.next().await {
        let line = line.unwrap();
        println!("{}", line);
    }
}

ちなみにstream.next()から返るFuturestreamを参照しているだけであるため、 下記のようにstream.next()の結果を破棄してもアイテムをスキップすることにはなりません。 アイテムをスキップするには.awaitした結果を破棄すると良いでしょう。

async fn stream_example() {
    let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");

    // linesはStreamを実装した型
    let mut lines = cursor.lines();
    // lines.next()はFuture<Item=Option<Result<String, Error>>>を返す
    let _ = lines.next();                                 // loremをスキップしたいが・・・
    println!("{}", lines.next().await.unwrap().unwrap()); // loremが出力される
    let _ = lines.next().await;                           // ipsumをスキップする
    println!("{}", lines.next().await.unwrap().unwrap()); // dolorが出力される
}

また、これらトレイトを実際に使う場合にはfutures::stream::Streamのインポートが必要です。 ただし、use futures::prelude::*;あるいはuse async_std::prelude::*;があれば問題ありません。

標準ライブラリ化の可能性

ここで説明したStreamはfuturesクレートに存在するもので、標準ライブラリのものではありません。 ただし、async/awaitのRFCはこれを標準化することを想定しており、 将来的にはforによるループも含めて標準入りする可能性はあります。 ただし標準入りさせる動きは今の所ないため、いつ入るか、あるいは本当に入るかどうかは不透明です。

// 標準入りしたらこんな感じ?
async fn stream_example() {
    let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");

    for await line in cursor.lines() {
        let line = line.unwrap();
        println!("{}", line);
    }
}

futures::io::AsyncReadなどのI/O系トレイト

futures::io::{AsyncRead, AsyncBufRead, AsyncWrite, AsyncSeek}は、 それぞれ標準ライブラリのstd::io::{Read, BufRead, Write, Seek}を非同期化したものです。

非同期タスク上でI/O処理をする際は標準ライブラリのトレイトの代わりにこれらのトレイトを使います。 これらトレイトにより、あらゆるI/O処理の効率的な非同期タスク化が実現されます。

use std::io::{Cursor, SeekFrom};
use futures::prelude::*;

let mut data = *b"lorem\nipsum\r\ndolor";

// cursorはAsyncRead / AsyncBufRead / AsyncWrite / AsyncSeekをすべて実装している
let mut cursor = Cursor::new(&mut data);

// AsyncWrite
cursor.write_all(b"rusty").await?;

// AsyncSeek
cursor.seek(SeekFrom::Start(0)).await?;

// AsyncRead
let mut buf = [0u8; 6];
cursor.read_exact(&mut buf).await?;
assert_eq!(&buf, b"rusty\n");

// AsyncBufRead
let line = cursor.lines().next().await.unwrap()?;
assert_eq!(line, "ipsum");

また、これらトレイトを実際に使う場合にはfutures::io::{AsyncReadExt, AsyncBufReadExt, AsyncWriteExt, AsyncSeekExt}のインポートが必要です。 ただし、use futures::prelude::*;あるいはuse async_std::prelude::*;があれば問題ありません。

futures::sink::Sink

Sinkトレイトはデータの送信する口を抽象化したものです。 AsyncWrite&[u8]を書き込むためのものですが、こちらは特定の型Tの送信に使います。

主にチャネルで使うことになるでしょう。

// txはSinkを実装している
let (mut tx, mut rx) = futures::channel::mpsc::unbounded();
tx.send(()).await?;
let () = rx.next().await.unwrap();
Ok(())

紛らわしいことに、futures::ioにもSinkがいます。 こちらはfutures::io::sink()によって作られる構造体で、どこにもデータを書き込まず、虚空に吐き捨てるためのものです。

また、このトレイトを実際に使う場合にはfutures::sink::SinkExtのインポートが必要です。 ただし、use futures::prelude::*;があれば問題ありません。

すべてを非同期化せよ

ここまでは純粋に非同期プログラミングの書き方を紹介してきました。 これで確かに非同期プログラミングを書けるようにはなるのですが、実際にコードを書く前に1つ注意点があります。 それは非同期タスクの中でブロッキングAPIを呼び出してはいけないということです。 これをすると同じスレッドで実行されている非同期タスクの実行もブロックされ、 非同期コードを書いたにも関わらずパフォーマンスが落ちます。 場合によってはデッドロックが起きることもあります。

例えば、ファイルを読んで、その中身をTCPソケットで送信することを考えます。

async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> {
    let s = std::fs::read(path)?; // ここで他のタスクも含めてブロックしてしまう!
    socket.write_all(s).await?;

    Ok(())
}

上記コードでは非同期タスクの中でstd::fs::readを実行してしまっています。 この関数の呼び出しに10秒掛かったとすれば、他の非同期タスクも10秒間実行されなくなってしまうのです。

これを回避するにはブロッキングタスクのstd::fs::readの代わりに非同期タスク(tokio::fs::readなど)を使うか、 ブロッキングタスクを「逃がす」APIを使います。

// [dependencies]
// tokio = { version = "0.2", features = ["fs", "io-util", "tcp"] }
// or
// tokio = { version = "0.2", features = ["full"] }

async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> {
    let s = tokio::fs::read(path).await?; // タスクを非同期化
    socket.write_all(&*s).await?;

    Ok(())
}

タスクを逃がす

std::fs::readのように代替関数があるなら良いのですが、中には外部クレートを呼び出しており、 どうしても非同期化出来ないタスクがあります。 そのようなときに使えるのが「同期タスクを逃がす」APIです。

同期タスクを逃がすAPIを使用することで、別の非同期タスクの実行を邪魔することなく同期タスクを実行することが出来ます。

tokioの場合

tokioではtokio::task::block_in_placeを使います。 この関数は現在タスクを実行しているスレッドに溜まっている非同期タスクを別のスレッドに委任し、 現在のスレッドで同期タスクを実行します。 現在のスレッドでタスクを実行するため戻り値はFutureではなく同期タスクの戻り値そのものです。 多くの場合は関係ありませんが、tokioのfeaturesにrt-threadedfullが含まれていない、 あるいはtokio::runtime::Builderthreaded_schedulerを指定していない場合は使うことが出来ません。

よく分からなければこちらを使うと良いでしょう。

// [dependencies]
// tokio = { version = "0.2", features = ["blocking", "io-util", "rt-threaded", "tcp"] }
// or
// tokio = { version = "0.2", features = ["full"] }

async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> {
    let s = tokio::task::block_in_place(move || {
        std::fs::read(path) // consider to use tokio::fs::read
    })?;
    socket.write_all(&*s).await?;

    Ok(())
}

また、クロージャーがSendトレイトと'static制約を満たせる(借用キャプチャをしないか、'static参照でのみ借用する)場合は tokio::task::spawn_blockingも使うことが出来ます。 この関数はタスクを実行するためのスレッドとは別の、同期タスク専用のスレッドを使ってタスクを実行します。 タスクとは別のスレッドで実行されるため、他のタスクの実行を妨げることがありません。

恒常的に同期タスクが発生する場合はこちらの使用を考えてみてください。

// [dependencies]
// tokio = { version = "0.2", features = ["blocking", "io-util", "tcp"] }
// or
// tokio = { version = "0.2", features = ["full"] }

async fn send_file(socket: &mut tokio::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> {
    // クロージャーを'staticにするために借用を断ち切る
    let path = path.to_path_buf();
    // このクロージャーはSendかつ'static
    let s = tokio::task::spawn_blocking(move || {
        std::fs::read(path) // consider to use tokio::fs::read
    }).await??;
    socket.write_all(&*s).await?;

    Ok(())
}
async-stdの場合

async-stdの場合はasync_std::task::spawn_blockingを使います。 この関数はタスクを実行するためのスレッドとは別の、同期タスク専用のスレッドを使ってタスクを実行します。 ただし、クロージャーがSendトレイトと'static制約を満たす(借用キャプチャをしないか、'static参照でのみ借用する)必要があります。

async fn send_file(socket: &mut async_std::net::TcpStream, path: &std::path::Path) -> std::io::Result<()> {
    // クロージャーを'staticにするために借用を断ち切る
    let path = path.to_path_buf();
    // このクロージャーはSendかつ'static
    let s = async_std::task::spawn_blocking(move || {
        std::fs::read(path) // consider to use async_std::fs::read
    }).await?;
    socket.write_all(&*s).await?;

    Ok(())
}

外部クレートにご注意

依存したクレートがタスクをブロックするかどうかは実装を見ないと分かりません。 実装を見ずに見分けるポイントとしては、その関数が「ファイルかインターネットにアクセスするかどうか」です。 その関数がファイルかインターネットにアクセスし、かつ戻り値がFutureではない場合、大抵その関数はブロックするでしょう。 もちろんその関数がブロックしない場合もありますし、あるいはそれ以外の関数がブロックしたりする場合もあるため、 非同期タスクの中での外部クレートの呼び出しには十分注意しましょう。

後方互換性

futures 0.3には以前のfuturesを使ったクレートとの互換性レイヤーが含まれており、 Rust 1.39以前のコードと非同期関数・ブロックを組み合わせることが出来ます。

これを使うためには、Cargo.tomlでfuturesにfeatureを追加する必要があります。

[dependencies]
futures = { version = "0.3", features = ["io-compat"] }
# compatでも良い
# futures = { version = "0.3", features = ["compat"] }

このとき、futuresという名前で複数のバージョン(0.1と0.3)に対して依存することは出来ません。 この問題に対処するためにfutures01というパッケージが用意されており、 これを使うとfutures 0.3とfutures 0.1を同時に利用することが出来ます。

[dependencies]
futures = { version = "0.3", features = ["io-compat"] }
futures01 = "0.1"

Rust 1.39以前の機能を非同期関数から使う

futures::compat::Future01CompatExtをインポートすることで、futures 0.1のFutureをRust 1.39以降のFutureに変換できます。 futures01::future::Future<Item = T, Error = E>std::future::Future<Output = Result<T, E>>に変換されます。

use futures::compat::Future01CompatExt;

let value = future01.compat().await?;

上記を含め、互換トレイトには下記があります。

元となるトレイト 互換トレイト 変換メソッド
futures01::future::Future futures::compat::Future01CompatExt compat
futures01::stream::Stream futures::compat::Stream01CompatExt compat
futures01::sink::Sink futures::compat::Sink01CompatExt sink_compat
futures01::future::Executor futures::compat::Executor01CompatExt compat
tokio_io::AsyncRead futures::compat::AsyncRead01CompatExt compat
tokio_io::AsyncWrite futures::compat::AsyncWrite01CompatExt compat

Rust 1.39以前の機能から非同期関数を使う

futures01::future::FutureにはResultのような機能が含まれているため、 std::future::Future<Output = O>におけるOResult<T, E>となっている必要があります。 ただし、OResultでなかったとしても、future.unit_error()を呼び出すことでResult<O, ()>に、 future.never_error()を呼び出すことでResult<O, Never>に変換することが出来ます。 Never型は「絶対にありえない型」を表しており、将来的には!型に置き換わることになっています *9

同時にfutures01::future::Futureに変換するには変換元の型がUnpinを実装している必要があるため、 Box化するためにfuture.boxed()を呼び出します。

上記2つを満たしてようやくfuture.compat()が呼び出せるようになります。 長々と書きましたが、要はfuture.boxed().compat()future.never_error().boxed().compat()を呼び出せば良いです。

// future03はFuture<Output = ()>
let future03 = async {
    println!("async block");
};
let future01 = future03
    .never_error()
    .boxed()
    .compat();

// future03はFuture<Output = Result<(), ()>>
let future03 = async {
    if true {
        Ok(())
    } else {
        Err(())
    }
];
let future01 = future03
    .boxed()
    .compat();

上記を含め下記の変換が行えます。

元となるトレイト 必要な制約
std::future::Future Output: Result<T, E>
futures::stream::Stream Item: Result<T, E>
futures::sink::Sink -
futures::io::AsyncRead -
futures::task::Spasn -

また、これらのトレイトに一度に変換したい場合はfutures::compat::Compatが使えます。

// obj01は、obj03がFutureを実装していればFutureの互換を実装し、
// obj03がStreamを実装していればStreamの互換を実装し、
// obj03がSinkを実装していればSinkの互換を実装する
let obj01 = futures::compat::Compat::new(obj03);

tokioのサンプルで解説

これまでに出てきたもののおさらいとして、tokioのサンプルコードを読んでみます。 なお、解説用に少し変更しています。

// [dependencies]
// futures = "0.3"
// tokio = { version = "0.2", features = ["full"] }
// tokio-util = { version = "0.2", features = ["full"] }

// futures::preludeではStreamExtなどの必須アイテムがエクスポートされている
use futures::prelude::*;
use tokio::net::TcpListener;
use tokio_util::codec::{BytesCodec, Decoder};

use std::env;

// 関数にasyncを付けることで、その関数を非同期化する。
// また、#[tokio::main]属性を指定することで非同期main関数を実行するランタイムを設定する。
// この属性を指定しない場合、自身でランタイムを起動する必要がある。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());

    // TCP接続を受け付けるリスナーを作成する。
    // TcpListener::bind()はFutureを返すため、.awaitを付けてタスクの完了を待機する。
    let mut listener = TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        // メインタスク上で新たな接続を受け付ける。
        // listener.accept()はFutureを返すため、`.await`を付けてタスクの完了を待機する。
        let (socket, _) = listener.accept().await?;

        // タスクをtokio::spawnによってバックグラウンドタスクとして起動する。
        // タスクをメインループ上で実行した場合、タスクが終了するまで
        // 新たな接続を受け付けることが出来なくなってしまうため、バックグラウンドタスクとしての起動が必要。
        //
        // タスクはasyncブロックを使用し、moveを付けることでsocketの所有権を奪っている。
        tokio::spawn(async move {
            // `tokio::codec::BytesCodec`を使い、クライアントから送られてくるデータを処理する。
            // framedの型はStreamを実装している。
            let mut framed = BytesCodec::new().framed(socket);

            // クライアントから送られてくるデータを受信する。
            // `framed.next()`によってデータを受信するFutureを取得する。
            // このFutureはクライアントとの接続が切れた場合、Noneを返す。
            while let Some(message) = framed.next().await {
                match message {
                    // データを受信したので表示
                    Ok(bytes) => println!("bytes: {:?}", bytes),
                    // ソケットでエラーが発生した
                    Err(err) => println!("Socket closed with error: {:?}", err),
                }
            }
            println!("Socket received FIN packet and closed connection");
        });
    }
}

asyncで使えるクレートたち

Rust 1.39リリース時点でasync/await/futures 0.3に対応している主要なクレートを紹介します。 これらのクレートを使えば、futures 0.1との互換性で躓くこともないでしょう。

futures

バージョン0.3以降が必要です。

Futureを操作するために必要な機能が集まったクレートで、 非同期処理を行うあらゆるクレートが依存している、事実上の標準ライブラリです。

tokio

バージョン0.2以降が必要です。

ファイルやネットワークなどのI/O処理を行うためのクレートです。 futures 0.1時代ではfuturesと並んで事実上の標準ライブラリでしたが、 async時代はasync-stdの登場により競争が生まれそうです。

標準の状態では機能が制限されているため、Cargo.tomlにはこのように指定すると良いでしょう。

[dependencies]
tokio = { version = "0.2", features = ["full"] }

async-std

Rustの標準ライブラリを、使い勝手をそのままに非同期化するクレートです。 標準ライブラリと同じモジュール名、関数名、引数で使えるため、学習コストが低いことが特徴です。 面白いのがprintln!write!といったマクロなども非同期化されていることで、 すべてを非同期化するにはtokioよりもasync-stdの方が向いているかもしれません。

hyper

バージョン0.13以降が必要です。

HTTPクライアントやサーバーを構築するための基底クレートです。 tokioに依存しています。

reqwest

バージョン0.10以降が必要です。

HTTPをリクエストするために便利なクレートです。 hyperに依存しています。

thin_main_loop

バージョン0.2以降が必要です。

その名の通り「メインループ」を書くためのクレートで、主にGUIアプリケーションを想定しているようです。

tracing(旧tokio-trace)

アプリケーションを分析するためのトレーシングを提供するクレートです。 非同期関数に#[instrument]属性を付けると非同期関数であることを考慮して実行時間を計測してくれるなど 非同期タスクへのサポートがあります。

wasm-bindgen

RustでWebAssemblyを書くための必須クレートです。 wasm-bindgenのサブクレートであるwasm-bindgen-futuresクレートには、 JavaScriptのPromiseをRustのFutureに変換して非同期タスクで使用できるようになったり、 Futureを起動するランタイムがあるなどの非同期タスクへのサポートがあります。

async-trait

トレイトメソッドを非同期関数化するための#[async_trait]属性を提供するクレートです。

これがどういうことかというと、まず、トレイトメソッドは戻り値の型を具体的に指定しなければなりません。 しかし非同期関数の戻り値はimpl Future<Output=T>になるので型が分かりません。 結果としてトレイトメソッドを非同期関数化出来ません。

そこで、戻り値をトレイトオブジェクト化してやると戻り値の型を固定できます。 実際はPin<Box<dyn Future<Output=T> + Send + 'a>>のような型を用いますが、 この型にするためには次のようなコードを書く必要があります。

trait Trait {
    fn method(&self) -> Pin<Box<dyn Future<Output = i32 + Send>>>;
}

struct Hoge;

impl Trait for Hoge {
    fn method(&self) -> Pin<Box<dyn Future<Output = i32 + Send>>> {
        async {
            0
        }.boxed()
    }
}

これをいちいち書くのは大変です。 そこでasync-traitクレートを使うと簡単に書くことが出来ます。

// トレイトの定義に#[async_trait]属性を付ける
#[async_trait]
trait Trait {
    async fn method(&self) -> i32;
}

struct Hoge;

// 実装にも#[async_trait]属性を付ける
#[async_trait]
impl Trait for Hoge {
    async fn method(&self) -> i32 {
        0
    }
}

また、#[async_trait]の代わりに#[async_trait(?Send)]とすることでSendを要求しなくなります。 型としてはPin<Box<dyn Future<Output = T> + 'a>>のようになります。 非同期関数の中でSendを実装しない型(ポインタ型など)を使用する場合はタスクをSend化出来ないため、 この機能を用いることになるでしょう。

async-stream

Streamを簡単に書くためのクレートです。 async/awaitのRFCではジェネレーターを使ったStreamを提案していますが、 このクレートはマクロを使ってこの機能を実現しています。 ただし、RFCのように関数をStream化するのではなくブロックをStream化するものであることに注意が必要です。

// 0〜2の値を非同期に返す関数
fn zero_to_three() -> impl Stream<Item = u32> {
    // stream!の代わりにtry_stream!を使うと`?`を使ってエラー処理が出来るようになる
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = zero_to_three();

    pin_mut!(s); // ループに必要なおまじない
    while let Some(value) = s.next().await {
        println!("{}", value);
    }
}

ただし、これはマクロなので、内部でIDE支援が効かないというデメリットがあります。 ランタイムが固定できるなら、ちょっと煩雑にはなるものの、代わりにfutures::channel::mpsc::channel(0)を使っても良いと思います。

fn zero_to_three() -> impl Stream<Item = u32> {
    let (mut tx, rx) = futures::channel::mpsc::channel(0);
    // ランタイムはtokioを想定
    tokio::spawn(async move {
        for i in 0..3 {
            //  バッファサイズが0なのでデータが受信されるまで待機する
            if tx.send(i).await.is_err() {
                // Streamが切断された
                break;
            }
        }
    });

    rx
}

さいごに

これであなたも今日から非同期プログラミングを扱えるようになったはずです。 あなたはどのような分野で、Rustを使った非同期プログラムを書くでしょうか。

私はと言えば、組み込み分野ではESP32を使った温湿度モニターなどをRustで作ってみたいですし、 ゲーム分野ではE1K問題*10が叫ばれて久しい[要出典]EDF(私の好きなゲーム)も、この問題を解消できる可能性がなきにしもあらずということでRustで作ってみたいです。 オプティムの製品としても、AI Cameraをより効率的に実行できるようにしたいと思っています。

Rustの学習は困難とよく言われますが、今回のバージョンアップで今までと比べ物にならないほど魅力が増し、 一度覚えてしまえば極限に楽しいコーディングが実現されつつあります。 これまでは難易度から尻込みしていた方も、これを機に学習を始めてみてはいかがでしょうか。

オプティムでは最高速エンジニアを募集しています。

謝辞

Rustの非同期プログラミングの歴史の章はこちらの記事に大変お世話になりました。 ありがとうございます。

こちらも大変参考にさせていただきました。ありがとうございます。

この記事のもととなったOPTiM TECH BLOG Meetup #1に参加、 非同期プログラミングに興味を持っていただき、Qiitaに入門記事をアップしていただきました。 ありがとうございます。

ライセンス表記

  • 冒頭の画像中にはRust公式サイトで配布されているロゴを使用しており、 このロゴはMozillaによってCC-BYの下で配布されています
  • 冒頭の画像はいらすとやさんの画像を使っています。いつもありがとうございます

*1:そんな曲がΣ(´∀`;)

*2:File#ReadといったI/O処理関数などの中で他のgoroutineに実行権限を移譲するようになっています

*3:厳密に言えば正式リリース前は実装されていたが削除されました

*4:現在のFutureはトレイトです

*5:実際、gtk-rsがfutures 0.2を使ってしまっていました

*6:Unity 2018.1〜

*7:ゲームはあれほど巨大なシステムなのに滅多に落ちないのが不思議です。デバッグを超絶頑張っているのだと思いますが・・・

*8:厳密に言えばStreamExtに含まれる

*9:!型はRust 1.41を目標に安定化作業中の機能で、コンパイラが「絶対にありえない型」を認識することで広範な最適化が可能になります

*10:敵を1000体出すと流石に処理落ちがうざい問題