Rustの未来いわゆるFuture

Rust 1.36がリリースされ、皆さん待望のFuture関連のAPIが安定化されました。 この記事ではFuture関連APIを巡る過去と未来を紹介します。

Q&A(TL; DR)

長くなってしまったので要約をまとめました。 細かい話は目次以降をご覧ください。

Futureは何のために必要?

Futureは非同期処理を抽象化したもので、Rustにおけるあらゆる非同期処理はFutureの上に成り立ちます。 処理をFutureによって隠蔽することで、 ただ非同期に処理するだけでなく処理のパイプライン化、 すなわち単一のスレッドで複数の処理を実行出来るようになります。

futuresクレートではだめなの?

async/await構文を入れるために言語コアに取り込まれました。

futures 0.1と合わせて使える?

futures 0.3には互換性レイヤーがあり、相互に変換出来るため問題ありません。

tokioなどは対応している?

残念ながらtokioの対応版は執筆時点ではリリースされていませんが、着々と進行中のようです。 ネットワーク限定で良ければromioというクレートもあります。 hyperなど、その他のクレートはtokioが対応次第順次アップデートされると思われます。

f:id:optim-tech:20190703140304p:plain

Futureとは?

Rust 1.36から導入された、非同期処理を抽象化したクラスです。 単体では「非同期処理が完了したかどうかを問い合わせる関数」しかなく、 非同期処理の結果から処理を継続するようなプログラムを書くには別途futuresクレートなどが必要です。

これはlibcoreで実装するための制約で、その代わりに#![no_std]環境でも使うことが出来ます。 つまり、組み込み環境でも非同期処理が書けると言うことです *1

非同期処理の何が嬉しいのか?

例えばチャットサーバーを作ろうとすると、今までのRust標準ではクライアントごとにスレッドを立て、 その中でソケット通信を実行するのがオーソドックスでした。 このアーキテクチャは至極シンプルでとても作りやすいものですが、 メモリ消費のオーバーヘッドやOSの制限などによりクライアントが大量になってくると限界を迎え、 処理が進まなくなる、あるいはプログラムごと落ちる可能性があります。 いわゆるC10K問題です。

チャットというのはメッセージを送る以外は処理を行わないため、各スレッドは基本的に待機しているものです。 であればその待機する時間を有効に使い、スレッド数を減らしてしまおうというのが非同期処理(とりわけ非同期I/O)の要です。 Unix系であればselect(2)epoll(2)*2の システムコールを使うことで、ソケットを始めとした複数のI/O処理を一度に実行出来るのです。

Client A
待機
送信
待機
Client B
待機
送信
待機
Client C
待機
送信
待機
パイプライン化
送信
待機
送信
待機
送信
待機

非同期処理する関数を自分で扱うと処理が複雑化してしまうため、 現代のプログラミング言語の多くではこれを抽象化してFuturePromiseとして提供しています。 また、この抽象化によって全く異なる処理(例えばチャットと音声通話の非同期実行など)もパイプライン化して 非同期に実行することが出来るようになります。

Rustでも以前からfuturesクレートによってFutureが使えましたが、 所有権や使い勝手の問題からRust標準に取り入れ、より書きやすく、よりハイパフォーマンスな処理を実現しようとしています。

futuresクレートから取り込まれた理由

1.36より前のRustでは一般的に、futuresクレート(バージョン0.1)により提供されていたFutureを使用していました。 ライブラリとしては必要十分な機能が提供されていましたが、借用と相性が良くありませんでした。 これは言語仕様上どうしようもなく、かつメソッドチェーンによってコードの見通しが悪くなると言う問題もありました。 これを多くの言語でも導入されているasync/await構文によって解消する過程で、 Futureが外部ライブラリにあると言語機能として参照できないため言語ライブラリとして取り入れることになりました。

過去の経緯については@qnighyさんの下記ツイートスレッドに詳しいため、そちらもご参考にどうぞ。

Futureの定義

Futureの定義の肝を抜粋します。

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

pub struct Context<'a> { /* fields omitted */ }

impl<'a> Context<'a>
    pub fn from_waker(waker: &'a Waker) -> Context<'a>;
    pub fn waker(&self) -> &'a Waker;
}

pub struct Waker { /* fields omitted */ }

impl Waker {
  pub fn wake(self);
  pub fn wake_by_ref(&self);
  pub fn will_wake(&self, other: &Waker) -> bool;
  pub unsafe fn from_raw(waker: RawWaker) -> Waker;
}

Futureには関連型としてOutputが定義されています。 これはIteratorItemと同じ様なもので、そのFutureの戻り値の型が入ります。

Future::pollはコンテキストを引数に取り、処理の結果を取り出します。 この時、処理が進行中であればPoll::Pendingが返り、処理が完了していれば値と共にPoll::Readyが返ります。

Future::pollの定義を見ると、self&mut selfではなくself: Pin<&mut self>になっていることに気付きます。 これは自己参照構造体を実現するための機能で、async関数を実現するために必要なものです。 async関数はawaitの前後で変数の借用を持ち越す必要があり、そのためには自己参照構造体が必要になるのです。 詳しい話は下記記事を参照してください。なお、少し古い記事のため、定義が多少異なっていることに注意してください。

次にFuture::pollの引数の&mut Context<'_>ですが、これはFutureを実行するランタイムが用意するコンテキストです。 実行ランタイムとは複数のFutureを1つのスレッドでまとめてパイプライン実行するモジュールのことで、 実行すべきFuture(未実行のFutureなど)を選別して実行し、実行すべきFutureがない場合はスレッドを停止させます。 この時、Futureの処理が完了して停止したスレッドを再開させてFutureの処理を継続する場合にコンテキストを使用します。

futuresクレートとの組み合わせ

Futureは単体では使いません。 futuresクレートと組み合わせることで各種処理を記述出来るようになります。

futuresクレートはFutureExtトレイトとTryFutureトレイト、及びTryFutureExtトレイトを提供しています。

FutureExtFutureを便利に使うためのトレイトで、 専ら非同期処理の結果を受け取り、その値を加工するための機能などを提供します。 例えばFutureExt::mapは処理済みの値を加工するためのメソッドで、Iterator::mapに近しいです。 FutureExt::thenFutureExt::mapのようなものですが、戻り値としてFutureを返すことが出来ます。 これにより、非同期処理の結果を使って非同期処理を実行することが可能です。

fn plus_1<F: Future<Output=u32>>(f: F) -> impl Future<Output=u32> {
  f.map(|v| v + 1)
}

TryFutureResultを返すFutureへの特化したトレイトで、Future<Output=Result<T, E>>に自動的に実装されます。 TryFuture::OkTが関連付けられ、TryFuture::ErrorEが関連付けられます。 このトレイト自体は何ら機能を持たず、TryFutureExtと組み合わせて使います。

TryFutureExtTryFutureを便利に使うためのトレイトで、 Futureの処理が成功した場合のみ処理を続行する機能や、エラー型をIntoトレイトによって変換する機能などを提供します。 Resultに実装されているメソッドの一部が(場合によっては別名で)TryFutureExtにも実装されています。 例えばTryFutureExt::map_okFutureOkを返した場合のみ実行される、値を加工するためのメソッドです。 TryFutureExt::and_thenTryFutureExt::map_okのようなものですが、戻り値としてFutureを返すことが出来ます。

fn plus_1<E, F: TryFuture<Ok=u32, Error=E>>(f: F) -> impl TryFuture<Ok=u32, Error=E> {
  f.map_ok(|v| v + 1)
}

futures 0.1との違い

以前のRustでfuturesクレートを使っていた場合、少し定義が異なるため混乱するかも知れません。 以前は下記のような定義でした。

pub enum Async<T> {
    Ready(T),
    NotReady,
}

type Poll<T, E> = Result<Async<T>, E>;

pub trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}

主な違いは下記の3つです。

  1. 戻り値がResultだった:
    以前のFutureでは戻り値がResult限定であり、失敗しない関数を書くには少し不便でした
  2. self&mut selfだった:
    futuresクレートの公開当初はPinがなかったこともあり、self&mut selfによって定義されていました
  3. 引数にコンテキストがない:
    コンテキストは暗黙的に定義され、別途関数で取得していました

StreamFutureに合わせてResultを返さないものに変更されており、 TryFutureに対するTryStreamが追加されています。

他にもfutures::executorを始めとした多くのモジュールへの変更もあります。

futures 0.1との互換性

Rust 1.36が浸透するまでのしばらくの間はfutures 0.1も並行してして使われるものと思われます。 ただし、futures 0.1までのFutureとRust 1.36のFutureは全く別の型であり、互換性はありません。

それらに互換性を持たせるためにfutures 0.3には互換性レイヤーが導入されており、 futures 0.1のFutureとRust 1.36のFutureを相互に変換して使うことが出来るようになっています。 詳細は下記ブログを参照してください。

tokioなどのクレートの対応状況

肝心要のファイル操作やネットワーク通信といった非同期I/Oに関してはtokioを使います。 ただし、執筆時点ではまだfutures 0.1からの移行中のようです。

なおRomioというクレートもあり、これがあればネットワークだけは非同期I/Oを使用することが出来ます。

非同期I/Oに関してはもうしばらく待つ必要がありそうです。

その他、いくつかのクレートも進行中のようです。

ちょっと先の未来

ちょっと先の9/27(金)にリリースされるRust 1.38で安定化予定のasync/await構文を見てみましょう。

async fn get() -> u32 {
  0
}

async fn get_and_plus_1() -> u32 {
  get().await + 1
}

関数をasync fnで宣言すると戻り値がimpl Future<Output=T>として扱われます。 そしてasync fnの中ではfuture.awaitによってFutureを待つことが出来ます。 フィールドアクセスのように見えますが、この構文によって、 いくつかの言語で採用されているようなawait futureの形よりもメソッドチェーンが読みやすくなります。

async fn hoge() -> u32 {
  foo().await.bar().await.baz().await
  // vs await (await (await foo()).bar()).baz()
}

また、Runtimeというクレートを使うとmain関数からasync fn化することが出来ます。 とても読みやすい非同期コードですね!

#![feature(async_await)]

use runtime::net::UdpSocket;

#[runtime::main]
async fn main() -> std::io::Result<()> {
    let mut socket = UdpSocket::bind("127.0.0.1:8080")?;
    let mut buf = vec![0u8; 1024];

    println!("Listening on {}", socket.local_addr()?);

    loop {
        let (recv, peer) = socket.recv_from(&mut buf).await?;
        let sent = socket.send_to(&buf[..recv], &peer).await?;
        println!("Sent {} out of {} bytes to {}", sent, recv, peer);
    }
}
https://github.com/rustasync/runtime#examplesより

はじまりのFuture

実はRustのリリース当初、具体的には1.3まではNightlyにFutureAPIがありました。 しかし、このAPIは現在とは全く異なったAPIで、トレイトではなく構造体で、 処理が完了したことも観測できない、使い勝手の悪いものでした。

さいごに

Rust標準にFutureが入った結果、既存のクレートでも多くの非互換な変更が引き起こされました。 幸い互換性レイヤーがあるため大きな混乱はないと思われますが、移行には大きな苦しみが伴いそうです。 オプティムのプロジェクトにもfutures/tokioにべったり依存しているものがありますが、移行にはしばらく掛かりそうです・・・。 どうせ移行するなら1.38を待ってasync/awaitにリファクタリングするのも良いかも知れませんね。

オプティムでは未来志向のエンジニアを募集しています。

ライセンス表記

  • 冒頭の画像中にはRust公式サイトで配布されているロゴを使用しており、 このロゴはMozillaによってCC-BYの下で配布されています
  • 冒頭の画像はいらすとやさんの画像を使っています。いつもありがとうございます

*1:もちろんFutureを実行するランタイムを用意する必要があります

*2:BSD系ならkqueue(2)、WindowsならI/O完了ポート