MQTT Version 3.1.1 をふりかえる

ご無沙汰してます。中野です。
相変わらずNATSやNSQやら、AWSやAzureやらと戯れてます。

はじめに

IoTの流行りとともに脚光を浴びた通信プロトコルにMQTTと呼ばれるものがあります。

MQTTがIoTのシーンにおいて、HTTPなどの他のプロトコルと比較して、
プロトコル的にどう優れているのか、どう劣っているのか、結局ユースケース次第なのでここでは割愛します。

ただ事実としてあるのは、

らが、IoT向けのメッセージブローカーとしてMQTTをサポートした以上、
MQTTというプロトコルの存在を無視はできないということです。
(製品に採用するかどうかは、ユースケース次第なのでまた別の話です。)
(MQTTそのものの良し悪しより、それぞれのクラウドのメッセージブローカーと連動した各サービスが便利だから使う、という理由が多い気がします。)

そのMQTTですが、2019/03/07に最新版であるver5.0が正式公開されました。
それまでは長い間、ver3.1.1が利用されてきました。

AWSらのメッセージブローカーで現在サポートしているMQTTのバージョンはver3.1.1です。
いずれこれらのメッセージブローカーもver5.0に対応し、
これらに接続するデバイスが利用するバージョンも緩やかに移行していくと考えられますが、
まだまだver3.1.1が現役として利用されていくことでしょう。

本稿では、これまでのver3.1.1とはどのようなものだったのか、振り返りたいと思います。
次回、ver5.0についての記事を書く予定です。
以後、MQTT ver3.1.1を前提に話をすすめます。

【2020/09/30追記】
MQTT Version 5.0についての記事を書きました。こちらもご覧いただければ幸いです。

tech-blog.optim.co.jp

MQTTとは

MQTTとは、ブローカー(中継者)が介在するPub/Sub型の、メッセージ伝送プロトコルです。
MQTTが何の略なのかは、混乱のもとになるので省きます。
MQTTはMQTT以外の何物でもありません。
HTTPが昨今『HTTP』という名称で定着し、HTTPが何の略だったのかを改めて意識することが少なくなってきた流れと同じだと考えましょう。

一般的に、ブローカーが介在するPub/Sub型のメッセージ伝送プロトコルには、

  • メッセージの送信者
  • メッセージの受信者
  • メッセージの中継者

という3つのアクターが存在します。
『ブローカーが介在する』という表現は、この『中継者』の存在を示唆します。

このモデルでは上記三者が下記図のような流れでメッセージを伝送します。
矢印はメッセージの流れる向きであって、リクエストの向きではありません。

figure001.png

一般的にはそれぞれを、

  • Publisher/パブリッシャー/発行者
  • Subscriber/サブスクライバー/購読者
  • Broker/ブローカー

や、

  • Producer/プロデューサー/生産者
  • Consumer/コンシューマー/消費者
  • Broker/ブローカー

という名前で呼ぶ傾向にあります。
ただし、MQTTのプロトコルの仕様上ではこのような名称を規定はされていません。
Publisher、Subscriberをまとめてクライアント、Brokerをサーバと称しています。
クライアントはどちらか一方にしかなれないのではなく、Publisher、Subscriberどちらにもなることができます。

ところで上記の図を見ただけでは、一見ただのロードバランサーやリバースプロキシではないか、と思われるかもしれません。
Pub/Sub型では、メッセージを受信したいクライアント側が、特定のメッセージを受信したいことをサーバに登録することではじめてサーバがメッセージを転送するようになります。
この処理を購読手続き、サブスクライブなどと呼び、サーバ上で保存されるその状態を購読、サブスクリプションと呼びます。
メッセージの特定のためのキーとなるものが、メッセージの発行時にメッセージそれぞれに付与されるラベルで、トピックやサブジェクトなどと呼ばれるものです。
MQTTではトピックと呼ばれます。
メッセージの受信者は購読手続きの際にこのトピックを指定することで、該当するトピックが付与されたメッセージを受信することができます。

MQTTというプロトコルは、このようなPub/Subを実現するためのプロトコルの一つです。
Pub/Subを実現可能な実装は多々あり、中には標準化されているもの、されていないもの、様々ですが、MQTTはそのうちの標準化されているものの一つ、という立ち位置となります。
MQTTを標準化しているのはOASISという団体です。

MQTTの仕様は下記URLに存在します。

MQTT Version 3.1.1

MQTTは、順序が保証され、データの欠落がないことが保証され、双方向通信であるようなネットワークプロトコル上に実装されます。
TCPやWebSocketがその代表例です。
一般的にはTCP上で動かしますが、エンタープライズなネットワーク空間などを想定すると、ファイアウォールを突破できる可能性の高い80番上で動かしたいケースもあり、その場合WebSocketが利用されます。
(ただしこの場合暗号化されていないので、WebSocketだと判定されると弾かれる可能性はありますが。)

また、暗号化オプションも利用され、その場合TLSやWebSocket Secure(wss)なども利用されます。

MQTTの機能

メッセージの発行と購読

任意のクライアントはメッセージを発行(Publish)することができます。
メッセージの発行とは、あるメッセージの最初の送信者が、ブローカーたるサーバにメッセージを伝送することを指します。
このときに、全てのメッセージにはそれぞれのメッセージの内容に関連するラベルとしてトピックを付与します。
このトピックがPub/Subにおける論理的なメッセージの伝送路となります。

figure003.png

ある特定のトピックが付与されたメッセージを受信したいクライアントは、そのトピックを受信するためのトピックフィルタを作成し、購読手続き(Subscribe)を行います。
この手続きを行うことで、手続き以後、サーバはメッセージの送信者からメッセージを受信した際にその購読を登録しているクライアントにメッセージを伝送するようになります。

figure004.png

figure005.png

ある特定のトピックのメッセージの受信を停止したい場合は、購読解除手続きを行うことで、停止させる事ができます。

これらはすべて、1つのTCPやWebSocketコネクション上で実行されます。

購読の永続性

購読の永続性とは、ある特定のトピックが付与されたメッセージを受信したいクライアントが、何かしらの理由でサーバと接続できていなかった期間に、該当するトピック向けに発行されたメッセージをサーバ上で保持する機能です。
俗にDurable Subscriptionと呼ばれる機能です。

購読が永続化されている場合、クライアントが再接続したときに改めて購読手続きをせずとも、メッセージの送信が再開されます。
また、切断していた間に発行されていたメッセージも伝送されます。

MQTTでは、個別の購読を永続化制御することはできません。
メッセージのやり取りを行うサーバとの接続、つまりコネクション単位で永続化するかどうかを定めます。
この永続化の単位をセッションと呼称します。
接続時にCleanSessionと呼ばれる情報を有効化(CleanSession=1)して送ることで、既存のセッションを破棄し、新しいセッションを開始することができます。
(具体的なフローはもう少し複雑です。)

CleanSessionを常に有効化しておくと、購読は常にコネクションの切断とともに削除されます。
俗にNondurable Subscriptionと呼ばれる状態になります。

最新メッセージの受信

クライアントが新たに購読を作成したとき、その購読で指定しているトピック宛ての最新(最後)のメッセージを受信する機能があります。
これをRetainと呼びます。

この受信を有効にするには、もとのメッセージの発行者がRetainフラグを有効化したメッセージを発行する必要があります。
すべてのメッセージが自動的にRetainなメッセージになるわけではないことに注意が必要です。
あくまでもクライアントが明示する必要があります。

また、Retainメッセージはトピックあたり最大1つのみ保持されます。
Retainフラグが有効化されたメッセージをすべて保持するのではなく、最新(最後)のメッセージのみを保持します。
つまり、Retainメッセージは常に新しいものに上書きされていくことを意味します。

永続化が有効なセッションでは、最初に購読を作成したときにはRetainメッセージを受け取ることができますが、再接続した際に再度Retainメッセージが送られることはありません。
逆に、CleanSessionを常に有効化した永続化無効なセッションでは、コネクションを確立するたびに購読を作り直さねばならない反面、毎回Retainのメッセージを受信することができます。

メッセージの伝送保証

MQTTにおけるメッセージの転送には、発行におけるクライアントからサーバへの伝送、購読におけるサーバからクライアントへの伝送の2つがあります。
これらのメッセージの伝送において、MQTTにはその伝達保証を行う機能があります。
これをQuarity of Service(QoS)と言います。
このQoSには以下3つがあり、QoS Levelとして分類されています。

QoS Level 意味 確認応答 メッセージの再送 メッセージの重複
0 At Most Once
最大1回
なし なし なし
1 At Least Once
少なくとも1回
あり あり あり
2 Exactly Once
正確に1回
あり あり なし

QoSが1以上のとき、メッセージの送信者は受信者からの確認応答を求めます。
適切な確認応答がないとき、メッセージの送信者はメッセージを再送します。

QoS Levelが上がるほど、1つのメッセージあたりのMQTTプロトコル上での手続きが多くなるため、オーバーへッドが大きくなります。

QoS Levelは、メッセージの発行者とサーバ間、メッセージの購読者とサーバ間、それぞれ別に設定されます。
発行時にはそれぞれのメッセージごとに設定します。
購読時には、購読したいトピックの指定時にクライアントが希望する最大QoSを指定し、その希望するQoS Levelを見てサーバが総合的に判断して決めた最大QoS Levelを利用します。
購読として受信するそれぞれのメッセージは、購読時に決定した最大QoSと、発行者が指定したメッセージのQoSの、低い方のQoSで受信することになります。

そのため購読手続きで、本来メッセージに付与されるはずのQoSを上回る最大QoSを指定したとしても、メッセージの受信時のQoSは発行時のQoSを超えることはできません。 これはつまり、発行者の意図するところから購読者が意図的にQoSを下げることはできても、発行者の意図するところより上回るQoSを購読者が指定することはできないということです。
例えば、最大QoS 1で購読手続きをしていたとしても、QoS 0で発行されたメッセージは、QoS 1で受信することはできません。
逆に、QoS 1で発行されたメッセージを、明示的にQoS 0で購読することはできます。
このQoSが下がる現象を俗にQoS Downgradeといいます。

ただし実際には、製品によってはQoS 0で発行されたメッセージをQoS 1で購読するできるようにする設定があるものもあります。
例えばEMQなどがこれに該当します。
これはMQTTの仕様外の挙動です。
このような、購読時に設定した最大QoS Levelにもとづいてメッセージ受信時のQoSを強制的に上げる変更をQoS Upgradeと呼びます。

結果的にメッセージの伝送に利用されるQoSに依存はありますが、必ずしも発行者の意図したQoSで購読者が受信するとは限らないことに注意が必要です。

遺言

ブローカー有りのPub/Sub型では、サーバには様々なクライアントが接続し、サーバを経由してメッセージを交換しあいます。
その中で、ネットワークや電源などの何かしらの理由で、あるクライアントのコネクションが意図せず切断されたとき、クライアント間は直接接続しているわけではないため、現存するクライアントがそのコネクションの切断を知る直接的な手段が存在しません。
アプリケーションロジックで、定期的に相互にメッセージを送りあって死活監視するしかありません。

MQTTでは、このようなユースケースをカバーするために、Will(遺言)と呼ばれる機能があります。
このWillは、クライアントの接続が切れたタイミングでサーバによって発行されるメッセージのことで、クライアントの接続時にWillメッセージを予め登録することで実現されます。
Willは任意のトピックに発行でき、任意のQoSを設定できます。
これによってアプリケーションロジックで死活監視をしなくても特定のトピックを購読さえしておけば、他のクライアントの切断を検出することができます。

クライアントがコネクション明示的に閉じたとき、それは正常系の動作であるためWillメッセージは発行されません。
Willが発行されるのはあくまでも異常系のときとなります。

余談ですが、Willはクライアントが事前に登録して意図せず切断した後に発行されるため、ダイイングメッセージではなく、遺言という名称がついています。

MQTTの制御パケット

ここから少々具体的な話になっていきます。

MQTTには14種の制御パケットがあり、これらをTCPやWebSocket上でやり取りすることでPub/Subを実現しています。
制御パケット番号0と15は将来の利用のために予約されており、現在は利用されません。

制御パケット番号 制御パケット名 利用シーン
1 CONNECT 接続
2 CONNACK 接続応答
3 PUBLISH メッセージ伝送
4 PUBACK QoS=1における確認応答
5 PUCREC QoS=2における確認応答
6 PUCREL QoS=2における確認応答
7 PUBCOMP QoS=2における確認応答
8 SUBSCRIBE 購読手続き
9 SUBACK 購読手続き応答
10 UNSUBSCRIBE 購読解除手続き
11 UNSUBACK 購読解除手続き応答
12 PINGREQ PING要求
13 PINGRESP PING応答
14 DISCONNECT 切断

以下図は、各種制御パケットに共通する構造です。固定ヘッダ、可変ヘッダ、ペイロードの3つの部位からなります。 figure002.png

固定ヘッダはどの制御パケットにも存在する部位です。
そのためFixed、固定という名称がついています。
この固定ヘッダに含まれるRemaining Lengthという領域は、固定ヘッダに続く、可変ヘッダ、ペイロード部を合わせた領域の長さを示す部位となります。
このRemaining Length領域は可変長であり、1〜4バイトで表現されます。
各バイトの7ビット目がRemaining Lengthのバイト数を示す領域となっており、各バイトは8ビットのうち7ビット分が長さを示す情報として利用されます。
最大4バイトで28ビットとなるため、228で256MBが、可変ヘッダとペイロードを合わせた長さの最大値となります。
このRemaining Length部の長さの幅があることから、このヘッダ部位は固定『長』ではないことに注意してください。

可変ヘッダは、制御パケットの追加のメタデータを格納する領域となります。
可変ヘッダ、ペイロード部の内容は、制御パケットの種類によって変わります。
また、可変ヘッダ、ペイロード部は、制御パケットの種類によっては存在しないことがあります。

以下に、制御パケットと各部位の有無の対応を記載します。

制御パケット番号 制御パケット名 固定ヘッダ部 可変ヘッダ部 ペイロード部
1 CONNECT Required Required Required
2 CONNACK Required Required None
3 PUBLISH Required Required Optional
4 PUBACK Required Required None
5 PUCREC Required Required None
6 PUCREL Required Required None
7 PUBCOMP Required Required None
8 SUBSCRIBE Required Required Required
9 SUBACK Required Required Required
10 UNSUBSCRIBE Required Required Required
11 UNSUBACK Required Required None
12 PINGREQ Required None None
13 PINGRESP Required None None
14 DISCONNECT Required None None

PUBLISH制御パケットのペイロードがOptionalとなっているのは、長さ0のペイロード、つまり空文字なメッセージを発行する事が可能なためです。
(メッセージに付与されるトピックは可変ヘッダに含まれます。)

また、制御パケットの種類によっては伝送される向きが決まっており、下記のようになります。

  • クライアント→サーバのみ

    • CONNECT
    • SUBSCRIBE
    • UNSUBSCRIBE
    • PINGREQ
    • DISCONNECT
  • サーバ→クライアントのみ

    • CONNACK
    • SUBACK
    • UNSUBACK
    • PINGRESP
  • 双方向

    • PUBLISH
    • PUBACK
    • PUBREC
    • PUBREL
    • PUBCOMP

MQTTの挙動

接続と切断

クライアントはサーバにTCPやWebSocketで接続した後、CONNECT制御パケットを送り、サーバに接続初期化を伝達します。

サーバはCONNECT制御パケットの内容をもとに初期化処理を行い、CONNACK制御パケットで応答します。
この初期化処理では、MQTTバージョンの特定、クライアントの識別、認証、Willの登録、既存のセッションの破棄判定などの処理が行われます。

クライアントもサーバも、CONNACK制御パケットの送受信後も確立したコネクションを維持します。
このコネクションは以後、後述のDISCONNECT制御パケットが処理されるまで維持し、その間メッセージの発行や購読に利用されます。

上記の処理のイメージが下記図となります。

img

CONNACKメッセージはエラーを返すこともあります。このエラーには、

  • サーバがサポートしていないMQTTのバージョンである
  • 不正なクライアント識別子である
  • サーバが利用不可な状態にある
  • 認証情報が不正である
  • そのクライアント識別子での接続は許可されていない

などがあります。
エラーなCONNACK制御パケットを受け取ったクライアントは、この場合、コネクションを閉じて通信を終了します。

またCONNACK制御パケットには、指定したクライアントIDに対する既存のセッションがサーバ上にあるかどうかを示すフラグが含まれます。
クライアントはこの情報により、改めて購読手続きを行う必要があるかどうかを知ることができます。

メッセージの発行

メッセージの発行は、クライアントからサーバにPUBLISH制御パケットを伝送することを意味します。
このPUBLISH制御パケットには、トピック、メッセージ本体、QoS Level、再送フラグ、Retainフラグが含まれます。
下記図はそのフローを示しています。

img

QoS Levelが1以上の場合は追加の情報が存在し、可変ヘッダに確認応答に利用するパケット識別子が含まれます。

また、上図のようなPUBLISH制御パケットの送信に加え、それぞれのQoS Levelに応じた確認応答を行います。

QoS 1の場合、サーバはPUBACK制御パケットによる応答を行います。
下記図がそのイメージです。

img

PUBACK制御パケットは可変ヘッダに、どのPUBLISH制御パケットに対する応答かを判定するために、対応するPUBLISH制御パケットに含まれていたパケット識別子を含んでいます。

QoS 2の場合、サーバはPUBREC制御パケットによる応答を行います。
さらにサーバからのPUBREC制御パケットを受け取ったクライアントは、続いてPUBREL制御パケットを伝送します。
PUBREL制御パケットを受信したサーバは、PUBCOMP制御パケットをクライアントに伝送します。 一連の流れを下記図で示しています。

img

PUBREC制御パケット、PUBREL制御パケット、PUBCOMP制御パケットはそれぞれ、対応する最初のPUBLISH制御パケットに含まれていたパケット識別子を可変ヘッダに含んでいます。

QoSが1以上の場合、メッセージが再送される可能性があります。
時として、クライアント側の実装がそのメッセージを再送であると判定する必要があるケースがありえます。
メッセージの受信者は、固定ヘッダの再送フラグ(DUPフラグ)を参照することで、そのメッセージが再送かどうかを判定することができます。

そもそもMQTTはTCPやWebSocket上に実装されるため、なぜMQTTが再送処理を持っているのか疑問に思われるかもしれません。
これはアプリケーションロジックが関係します。 メッセージの送信者と受信者の間にはそれぞれ、メッセージを発行するための一連の処理、メッセージを受信してからの一連の処理がアプリケーションごとに、アプリケーションロジックとして存在します。
たとえば、

  • 受信者は、ネットワーク的には正しくメッセージは受け取ったもののディスクやデータベースへの永続化で失敗して結果的にメッセージをロストする
  • 送信者は、このような受信者側の事情でネットワーク的にはデータを送れているものの実際的には再送せねばならないがこれを検知できない

などのケースが有りえます。
TCPなどのネットワーク的には正常に送れた事になっているからこそ、MQTTでエラーを返し、再送を要求する必要があるということです。
要するに、HTTPでステータスコードで500などが返ってきたときに再送するロジックと同じです。

QoS 1と2の違いは、QoS 1の場合はPUBACK制御パケットに対する応答がないため、PUBACK制御パケットがロストすると送信側が再送を試みるため受信者側は同じメッセージの重複があり得るのに対して、QoS 2はそこを多重化して起こり得ないようにしているという点です。

メッセージの購読

あるトピックに関するメッセージを受信したいクライアントは、サーバにSUBSCRIBE制御パケットを伝送します。
サーバはSUBSCRIBE制御パケットを受け取りサーバ内で適切に処理した後、SUBACK制御パケットで応答します。
下記図はその流れを示しています。

img

SUBSCRIBE制御パケットでトピックを指定するとき、そのトピックに関するメッセージをサーバがクライアントに伝送してくる時の、『希望する』『最大』QoS Levelも指定する必要があります。
これは、購読側のアプリケーションロジックがそのトピックのメッセージをどの程度の伝達保証で受信したいかを示すものです。
正確性を重んじるために高いQoS Levelを採用することもあれば、性能重視でオーバーヘッドを少なくするためにあえてQoS Levelを下げることもあります。

具体的に採用された最大QoS Levelは、SUBACK制御パケットにて返答されます。 これは、MQTTサーバの具体的なプロダクトやユースケースにより、サーバがすべてのQoS Levelをサポートしていないことがあることに起因しています。
実際、サーバ側がQoS 2をサポートしていない、あるいは無効化している例も少なくありません。
クライアントが要求したQoS Levelはあくまでも希望であり、具体的にどのQoS Levelが利用できるようになったのかをクライアントが知るには、総合的にサーバが利用可能なQoS Levelを決定した後に、そのQoS Levelをクライアントに伝達する必要があります。
つまり、サーバはQoS 2をサポートしていないが、クライアントがQoS 2を要求してきたとき、利用可能なQoS Levelは1であることをSUBACK制御パケットでクライアントに伝達するわけです。

ただし実際には、必ずしもここで成立した最大QoS Levelでメッセージが送られてくるわけでは有りません。
これは既に紹介した内容ですが、購読者に届けられるそれぞれのメッセージは、購読が成立した際の最大QoS Levelと、発行者が指定したそれぞれのメッセージのそれぞれQoSの、低い方のQoSで受信することになるためです。

メッセージの転送を停止するには、UNSUBSCRIBE制御パケットを送ります。
サーバはUNSUBSCRIBE制御パケットを処理すると、UNSUBACK制御パケットで応答します。
その流れは下記図に示しています。

img

上記より、クライアントがメッセージを受信できる期間は、SUBSCRIBE制御パケットを処理してから、UNSUBSCRIBE処理を処理するまでの間となります。
下記図はそのイメージです。

img

このメッセージの伝送が有効な期間、具体的にメッセージがサーバからクライアントにどのように送られてくるか、若干混乱しがちです。
SUBSCRIBE制御パケット、SUBACK制御パケットにはメッセージの本文は含まれません。
あくまでもこれらは、購読手続きのための制御パケットであるためです。
具体的なメッセージの伝送は、前述のPUBLISH制御パケットを用いて行われます。
サーバとクライアントの立場が逆となりますが、基本的なメカニズムは同じとなります。

下記図は、QoS 0の購読におけるメッセージの伝送を示しています。

img

下記図は、QoS 1の購読におけるメッセージの伝送を示しています。

img

下記図は、QoS 2の購読におけるメッセージの伝送を示しています。

img

発行者と購読者をともに記載した例が下記図です。
発行者からのPUBLISH制御パケットを受け取ったサーバは、同じようにPUBLISH制御パケットを用いて購読者に届けます。
ただし、QoS Levelを除いて、両者のPUBLISH制御パケットのやり取りに直接的な関係はありません。

img

発行者側と購読者側のQoS Levelが一致しない場合の例を下記に示します。
発行者はQoS 1で発行しますが、購読者は最大QoS Levelを0として購読します。
この場合事実上購読者にはQoS 0で届くという意味合いになります。
サーバは、QoS 0で購読者にメッセージを届けます。
これがQoS Downgradeです。

img

SUBSCRIBE制御パケットにより興味のあるトピックを登録してはじめてメッセージを受信できるようになりますが、MQTTのセッションの機能を利用すれば、この購読の状態をコネクションをまたいで持続させることができます。
ただしこの状態は保持されますが、QoS 0のメッセージに関してはコネクションが切れている間の転送は失敗しており、かつ再送も行われないため、欠落することになります。
(正確にはMQTTの仕様上、明らかにまだ送っていないことがわかっているQoS 0のメッセージに関しては再送がオプションとなっており、具体的なMQTTサーバの実装に依存します。)
欠落を回避したい場合はQoS 1で購読する必要がありますが、元のメッセージの発行者側もQoS 1で発行していなければこれができません。
購読時のQoS Levelは発行時のそれを超えることができないためです。
ただし、プロダクトによってはそれを許可しているものもあります。
それができる場合、購読手続き時のQoS Levelに従って転送時のQoS Levelを上げます。
これがQoS Upgradeです。

死活監視

例えばプログラムのバグなどにより、MQTTに利用するコネクションは生きているけれども、実際のMQTT通信ができないような不正状態に陥ることがあります。
俗に言うゾンビ状態ですが、MQTTにはこれを監視するための仕組みがあります。

実は接続時に伝送されるCONNECT制御パケットには、Keep Aliveという情報も含まれます。
これがそのゾンビ状態を監視するために利用される情報で、クライアントとサーバ間の無通信状態をどのくらい猶予とするかを示しています。
つまり、Keep Aliveは時間を示す情報となります。
Keep Aliveは秒を単位とする数値であり、この時間を超えても一切の制御パケットが伝送されない場合、サーバはその接続がゾンビであると認識し、そのコネクションを切断します。
ただし実際には、Keep Alive値の1.5倍待ってからこの切断処理が行われます。
何かしらの制御パケットをサーバが受け取ったとき、そのコネクションの無通信状態カウントが0にリセットされ、そこからまた再計上されていきます。
サーバから切断されないようにするには、Keep Aliveの値を適切に設定して、このリセットを適切に制御する必要があります。

ただしKeep Aliveで指定できる秒数は、最大18時間12分15秒です。
これはKeep Alive値のデータ領域のバイト数に起因する最大値です。 MQTTを利用するケースで、コネクションは維持しつつもデータの通信がない状態がこの値を超えるケースはそうそうありませんが、一応、そういうケースのための手段が提供されています。
また、データの転送は周期が長いが、ゾンビ状態をもっと短いスパンで検知する必要がある際にも利用する機能です。

その死活監視のための手段が、PINGREQ制御パケットです。
アプリケーション、つまりクライアント側の事情でその期間に伝送すべきデータがない場合、PINGREQ制御パケットを送ることで無通信状態のカウントがリセットされ、サーバからの切断を回避できます。
また、PINGREQ制御パケットに対するサーバからの応答として、PINGRESP制御パケットが送られてきます。
もし仮にクライアントは正常で、PINGREQ制御パケットを送ったけれどもPINGRESP制御パケットが返ってこない場合は、サーバがゾンビ状態やほか不正な状態になっていることを示すため、クライアントはそれを検知して適切な処理を施す必要があります。
下記図は、PINGREQ制御パケット、PINGRESP制御パケットのやり取りを示しています。

img

またKeep Aliveの値を0に設定すると、この死活監視の機能が無効となり、これ起因でサーバから切断されないようになります。

切断

サーバに接続しているクライアントが正常終了する場合は、DISCONNECT制御パケットを送ります。
クライアントはDISCONNECT制御パケットを送った後、接続を閉じます。
下記図がそのイメージです。

img

MQTT ver3.1.1では、クライアントのみDISCONNECT制御パケットを送ることが許されています。
つまり、サーバからの切断はプロトコルエラー、あるいは死活監視によるもの以外には無いということです。

また、DISCONNECT制御パケットを伝送したことによるコネクションの切断は、正しく切断されることを意味するため、Willによるメッセージの発行は行われません。

MQTTを利用する

利用にあたって

MQTTを利用するにあたって、把握すべき細かな仕様を一部紹介します。
今回全てを紹介はしていないため、是非、正式な仕様を一読されることをおすすめします。

MQTT over WebSocket

MQTTをWebSocket上で利用するには、WebSocketのサブプロトコルに『mqtt』を指定する必要があります。

MQTTの制御パケットはWebSocketのバイナリフレームで処理されます。
WebSocketはあくまでMQTTのバイナリストリームをプロキシしているだけであり、WebSocketの各フレームにMQTTの各パケットがきれいに収まるような規定があるわけではないことに注意してください。

また、特にWebSocketのエンドポイントがルートである必要はないため、MQTT over WebSocketを/hogehogeのようなパスに実装することも可能です。

URIスキーム

現状、MQTT向けのオフィシャルなURIスキームは用意されていません。(参考)

慣習的に、暗号化なしの利用に『mqtt』を、暗号化ありの利用に『mqtts』を採用する傾向にあります。
実装によっては『tcp』『ssl』『tls』『tcps』なども利用されます。
WebSocket利用時には『ws』『wss』が、UNIXソケットによる場合は『unix』も利用されます。

まとめると下記のような状況となります。

URIスキーム プロトコル
mqtt TCP
mqtts TLS
tcp TCP
ssl TLS
tls TLS
tcps TLS
ws WebSocket
wss WebSocket Secure
unix UNIX Socket

ただしこれは、URIとしての可読性重視での採用が主であるため、具体的に利用するクライアントの実装で処理できるかどうかを確認する必要があります。

ポート

MQTTが利用するポート番号はIANAに登録されており、下記で参照することができます。
これによればTCP接続は1883番ポートを、TLS接続は8883番ポートを利用することになっています。

Service Name and Transport Protocol Port Number Registry

また、MQTT over WebSocketに関してはWEBに従います。
つまり、暗号化なしの接続は80番ポートを、暗号化を伴う接続は443番ポートを利用します。

まとめると下記となります。

ポート番号 プロトコル
1883 MQTT (TCP)
8883 Secure MQTT (TLS)
80 MQTT over WebSocket
443 MQTT over WebSocket Secure

ただし、サービスやプロダクトによっては上記とは異なることがあります。

クライアントの認証

MQTTを利用するクライアントの認証には、下記に挙げるようないくつかの方法があります。

  • MQTTのUsername/Passwordの認証を使う
  • TLSのクライアント認証を使う
  • MQTT over WebSocketの場合、WEBの認証を使う

AWSやAzure、GCPでもMQTTメッセージブローカーへの接続時の認証のアプローチが異なるため、利用する製品やクラウドサービスの仕様を把握する必要があります。

文字列のエンコードと長さ

PUBLISH制御パケットのメッセージ本体を格納する領域に相当するペイロードな部分以外の、クライアントIDやトピックなどのMQTTの文字列データ領域は、すべてUTF-8でエンコードされている必要があります。

また、各文字列データにはそれぞれ2バイトの長さを示すデータ領域が存在しますが、それ故に文字列の長さは最大65535文字で制限されます。

クライアントID

サーバ実装によっては、クライアントIDは空文字も設定できます。
その場合、サーバ側で内部的にはランダムな一意のIDが振られている状態となります。

また、セッションの管理はクライアントIDをキーとして行われます。
そのためクライアントIDに空文字を設定した場合、CleanSession=0で接続していたとしても再度同じクライアントIDで接続する手段がないため、結果的に再接続時にそのセッションに復帰することはできない、セッションは永続化されないことになります。
(実際には仕様上、クライアントIDが空文字の場合はCleanSession=1にしなければなりませんが。)

パケット識別子

パケット識別子は0ではない2バイトの符号なし整数です。
クライアントとサーバ間の接続ごとに独立で、メッセージ伝送の処理中のパケット識別子は全て一意な値となっています。
QoS Levelに則った伝送処理が正しく終了したとき、そこで利用されたパケット識別子は解放され、再利用可能となります。

パケット識別子は2バイトしかないため、同時にInflight可能な制御パケット数は65535個が上限となります。
それ以上の制御パケットがどのような扱いになるのかはMQTTの仕様に存在しないため、同時に大量の転送を行うようなケースで利用するのであれば、その実装を確認するようにしてください。

トピック名とワイルドカード

トピック名は空文字は許されておらず、1文字以上の長さである必要があります。 また、トピック名の長さを示す領域も2バイトであるため、65535文字が最大長です。

トピックのような論理空間は、階層的に指定できるとより効率的に管理することができます。
MQTTでは『/』を利用して、論理空間を階層化することができます。
また、購読手続きのトピックフィルタでは『+』と『#』によるワイルドカードを利用することができ、複数のトピックを対象としたメッセージを1つの購読で受信することができます。
ワイルドカード『+』はある位置の階層に関する任意の名前を指します。
ワイルドカード『#』はある位置以下の階層と、その1つ上の階層を指します。

間違えやすい例として、『/』の1文字のみのトピック名は有効です。
また、『hogehoge』と『『hogehoge/』はトピックとしては別物です。
トピックフィルタに『hogehoge/+』を指定したとき、『hogehoge/』や『hogehoge/fugafuga』は受信できますが、『hogehoge』は受信できません。
トピックフィルタに『hogehoge/#』を指定したとき、『hogehoge/』、『hogehoge/fugafuga』、『hogehoge』が受信できます。

すべてのメッセージを受信したいときは、トピックフィルタを『#』1文字で作成します。

制約

冒頭でも述べていたように、MQTTを利用するシーンはAWSやAzureのIoT向けメッセージブローカーを利用するためであるケースが多くなっています。

ただ現実には、これらのMQTTは一部制約であったり、仕様から逸脱している挙動があることがあります。
ここではAWSに関して、それらの一部をピックアップしてまとめています。
メッセージブローカーなエンドポイントへの同時接続可能数などのクォータ系はピックアップしていません。

AzureやGCPに関してはいつの日か追記するかもしれません。

AWSの制約

AWSにおけるMQTTメッセージブローカーの利用時の制約が、下記URLに記載されています。

要約すると、下記のようになります。

  • MQTTプロトコル上は約256MBまでのサイズのメッセージを送ることができますが、AWS IoTでは最大128 KBとなります
  • Keep Aliveの上限は20分です
  • Keep Aliveを30秒未満で指定すると、30秒に設定されます
  • WebSocketは最長で1日しか接続を維持しません
  • トピックの階層化は、最大8層("/"を最大7回)となります
  • QoS 2はサポートされていません
  • Retainはサポートされていません

上記のような制約を眺めると、もはやMQTTではないのではないか、と思われるかもしれません。
同じような制約はAzureやGCPにもあります。
要するにこれらのメッセージブローカーは、IoTでのデバイスとの常時接続で双方向な通信経路を確保するために、都合よく存在したMQTTの皮をかぶっているだけとも言えます。

よく陥る罠

クライアントライブラリの選択

実際にはクライアント側はセロベースでフルスクラッチするのではなく、既存のライブラリを利用するケースがほとんどでしょう。
MQTTの仕様は一見簡単なため、クライアントライブラリの実装も比較的楽に構築できます。

しかしそれ故か、中途半端な実装のものが多く、細かい挙動までをカバーしたライブラリ自体は少ないのもまた事実です。
MQTTの仕様では、例えばReservedな値が意図しないものであったとき、サーバ側が接続を閉じる必要があるという決まりも多々あるのですが、これらを適切にフォローしたライブラリはそう多く有りません。
そのためライブラリの選定を適切に行わなければ、これら起因で意図しないエラーや切断が発生するなどの障害を引き起こす可能性があります。

最終的にどのようなクライアントライブラリを利用するにせよ、まず動作検証の観点で最も実績のある下記のライブラリで試してみるのをおすすめします。

Eclipse Paho - MQTT and MQTT-SN software

ペイロード処理周りの実装

MQTTの仕様上、約256MBのメッセージが送ることができるわけですが、多くの実装はバイト配列としてペイロードを処理しています。
そのため、この規模のデータを受け取ると一気にメモリを消費してしまいます。

例えばGoだと、io.Writerio.Readerのようなストリーム処理を利用すればよりメモリに効率的に処理することができます。
もし仮にクライアントライブラリのようなものを利用する場合、どの程度のサイズのメッセージが流れうるのかユースケースを洗い出し、最適なライブラリを利用するように注意する必要があります。

ただし、よく利用されるMQTTメッセージブローカーは結局AWSらのものなので、上記で紹介したような制約でこの規模のメッセージを送れないようになっており、問題になることはあまりありませんが…
また、仮に仕様や制約上送れたとしても、MQTTでそれを送るのが最適かどうかという話もあります。

さいごに

正直MQTTの仕様そのものに思うところは大してないのですが、AWSらのMQTTメッセージブローカーが便利だなと言う事実が全てかと思います。
何かしらPub/Subなメッセージブローカーを自前で構築する必要があるのであれば、私ならNATSなどを立てると思うので、あくまでもIoTでAWSらの恩恵にあずかりたい場合にMQTTを利用する感じです。

オプティムでは、実際にプロダクトに利用するための技術の深掘りだけでなく、今後の見込みのある技術やプロダクト、新たな可能性を持った技術やプロダクトなどを広く調査し、実際に検証することも積極的に行っています。自分もやってみたい、興味のある、という方は、是非こちらをご覧ください。

www.optim.co.jp

参考文献