MQTT Version 5.0 でできること

ご無沙汰しております。中野です。
ボトルマンを予約しそびれてしまって意気消沈気味な昨今です。

今回の記事では、MQTT Version 5.0の紹介を軽くしたいと思います。


MQTT Version 5.0とは

昨年、Version 3.1.1から約5年ぶりに仕様が更新された、MQTTの最新バージョンです。

docs.oasis-open.org

もともとMQTT自体はシンプルで軽量で容易に実装可能であることを謳ったプロトコルでしたが、流石にシンプルすぎたのか、5年の月日を経て登場した最新バージョンでは利便性の高い機能が汎用化され追加されました。今回の記事では新たに追加された機能を中心に見ていきたいと思います。


MQTT Version 5.0とVersion 3.1.1の違い (サマリ)

まず、MQTT Version 3.1.1のプロトコル詳細については下記の記事で紹介しています。

tech-blog.optim.co.jp

MQTT Version 5.0は、この3.1.1をベースにしています。
制御パケットのやり取りにはほぼ変更がないため、上記の記事でその詳細を紹介していますので、MQTTとはなんぞや、という方はそちらをご参照ください。

端的に言うとMQTT Version 5.0での変更の多くは、各制御パケットに新規の情報が追加されたことで、各操作がよりリッチになったものだといえます。

Version 3.1.1に対して、Version 5.0でも変わらない点、Version 5.0で変わった点を下記にリストアップしています。
また、なくなった機能はありません。

Version 5.0でも(大きくは)変わらない点

  • ブローカーありのPublish/Subscribe型メッセージング
  • TCP(TLS)やWebSocket(WebSocket Secure)などの上で動く
  • 伝達保証
    • At Most Once (最大1回)
    • At Least Once (少なくとも1回)
    • Exactly Once (確実に1回)
  • Willメッセージ
  • Retainメッセージ
  • 認証
    • ユーザ名、パスワードによる、MQTTプロトコルレベルのでベーシック認証
    • TLSレイヤーでのクライアント認証
    • WEBにおける認証 (MQTT over WebSocketのみ)
  • ワイルドカードなトピックフィルタによる購読
  • Durable Subscription
  • ヘルスチェック
    • Keep-Alive
    • PingReq/PingResp
  • URIスキーム
  • ポート番号

Version 5.0で変わった点

  • CONNECTパケットで指定するProtocol Levelが5になった
  • 可変ヘッダにある程度自由に利用できるメタデータ領域(プロパティ)が設けられた
  • 上記メタデータ領域を利用して、利便性の高い操作が新規に汎用化された
    • 拡張認証
    • 共有サブスクリプション
    • サブスクリプション識別子
    • セッションのTTL
    • メッセージのTTL
    • サーバリダイレクト
    • トピックエイリアス
    • フロー制御
    • 最大パケットサイズ制限
    • リクエスト・レスポンスパターン
    • Willの遅延
    • 任意で利用するサーバ機能の利用可否伝達
    • サーバによるクライアントID付与
    • Keep Aliveの強制
    • ペイロードのフォーマットの識別子
    • 上記以外に任意に利用できるメタデータ領域(ユーザプロパティ)
  • 拡張認証のために制御パケットタイプにAUTHパケットが追加された
  • 応答系の制御パケットで、正常系、異常系ともに詳細情報を返せるようになった
    • Reason Code
    • Reason Stringプロパティ
  • セッションの再作成周りに関する挙動が変更された
  • サーバがDISCONNECTパケットを送ってくるようになった
  • 自分が発行したメッセージを自分で受け取らないように設定できるようになった
  • Retainメッセージの転送時に関するRETAINフラグの扱い方を購読オプションで指定できるようになった

MQTT Version 5.0における変更点は、OASIS仕様のAppendix Cにて簡潔にまとめられています。
こちらも参照ください。

docs.oasis-open.org

余談ですが、Version 3.1.1ではあくまでもサーバ、クライアントという表現で記載され、Publisher、Subscriberのような言葉は利用されていませんでしたが、Version 5.0の仕様ではPublisher、Subscriberという言葉が多用されています。

[Appendix] 可変ヘッダのプロパティ領域に関するサマリとパケット構造

下記のような項目がプロパティ領域の各プロパティとして定義されました。

これにより、Version 5.0における全制御パケットと、全領域全要素の対応表は下記となります。

また、各要素は次のように分類することができます。前述の対応表にこの分類を適用したものが下記対応表となります。

  • Reserved ... 必要かつ、値まで決まっているもの
  • Required ... 必要で、値はユースケース毎に変わりうるもの
  • Optional ... 任意で利用されるもの

結局、何ができるようになったのか

前置きが長くなってしまいましたが、上記の変わった点により、ユーザが何ができるようになったのかに焦点を当てて紹介していきたいと思います。
また、今回は大きめのできるようになったことを紹介しておりますので、その他の内容については仕様をご覧いただければと思います。

より多彩な認証を利用できるようになった

初っ端マニアックな話ですが、後述の内容に関係してくるので、最初に持ってきました。

Version 5.0では、Version 3.1.1で利用可能だった認証方法に加え、他の認証方法をサポートするための『枠組み』が用意されました。
この枠組みとは、プロパティ領域のいくつかのプロパティと、新設されたAUTH制御パケットが関係します。
これはEnhanced Authentication、『拡張認証』と定義されています。

開発者はこの枠組みを利用して、ユースケースやサービスに適した認証方法を実装することができるようになりました。 この枠組みが求められるのは多くの場合、何かしらのチャレンジ・レスポンス認証を必要としている場合となるかと思います。

一般に多くの場合、SaaSのMQTTサービスを利用したり、OSSなMQTTブローカーを利用することが多いと思います。
これらのサービスやツールが拡張認証を利用した何かしらの認証方式を提供する場合、具体的なやり取りに関してはこのVersion 5.0の仕様の範疇外であり、実装次第となりますので、各種サービスやツールの実装やドキュメントを参照することになります。

また、基本的にこの拡張認証を利用するのはマニアックなケースです。
多くの場合ではMQTTのユーザ名・パスワードによるベーシック認証、TLSのクライアント認証、WebSocket接続時のWEB認証のいずれかでカバーできます。

また、API提供されているMQTTサービスを利用する際、アクセストークンのようなものを利用したいケースもあると思います。
MQTTのユーザ名・パスワードはベーシック認証と銘打っているものの、アクセストークンの送信にも利用できます。
Version 3.1.1ではユーザ名なしのパスワード送信は許可されていませんでしたが、Version 5.0ではユーザ名なしのパスワード送信が許可されているため、パスワード領域にアクセストークンを格納して送ることでカバーできます。

応答系の制御パケットで、正常系、異常系ともに詳細情報を返せるようになった

Version 3.1.1では、正常系、異常系ともに必要最低限の情報以外の情報を返すことはできませんでした。
下記は、Version 3.1.1における応答系で返せる情報をまとめたものです。

制御パケット 返せる情報
CONNACK Connect Return Code
- 接続を受け入れた(正常系)
- MQTTレベル未サポート(異常系)
- クライアントID不正(異常系)
- MQTTサービス利用不可(異常系)
- ユーザID・パスワード認証失敗(異常系)
- 接続未認可(異常系)
PUBACK、PUBREC、PUBREL、PUBCOMP、UNSUBACK このパケットの有無が可否となる。追加の情報は含まない
SUBACK SubAck Return Code
- QoS 0を保証
- QoS 0-1を保証
- QoS 0-2を保証
- 失敗

おそらく多くの人は、アプリケーションメッセージ内でエラー情報を返せるようにフォーマットを工夫したり、エラー情報を返す専用のトピックを設けたりしていたのではないかと思います。

これに対し、Version 5.0では通常のやり取りの中でより多くの情報を返せるようになっています。
具体的には、とりわけ異常系にて、なぜ失敗したかの詳細な状況が汎化され、仕様として用意されました。
Version 3.1.1ではアプリケーションメッセージ内で実装していたものが、それがある程度プロトコルレベルでカバーされたということです。

下記は、Version 5.0における応答系で返せる情報をまとめたものです。(AUTH制御パケット専用のReason Codeは省いています)
また、Version 3.1.1ではReturn Codeと呼ばれていたものが、Version 5.0ではReason Codeという名前に変わっているので注意が必要です。

正常系Reason Code

正常系Reason Codeは、0x80未満の値のコードを持ちます。また、0x00に関しては制御パケット毎に意味が変わります。

異常系Reason Code

異常系Reason Codeは、0x80以上の値のコードを持ちます。

またReason Codeに加え、可変ヘッダ内のプロパティ領域にてReason Stringプロパティという項目が利用でき、個別具体的なエラーメッセージを返すこともできるようになっています。
ただし、より多彩な情報を返せるようになった反面、パケットサイズがやや膨れ上がる可能性があるため、パケットサイズにシビアな環境下での利用は吟味が必要です。

どのサブスクリプションに一致したメッセージかを識別できるようになった

ワイルドカードを利用したトピックフィルタなどにより、あるトピックが複数のトピックフィルタに引っかかる場合があります。
例えば、あるクライアントが下記のトピックフィルタで購読を行っているとします。

  • hogehoge/1
  • hogehoge/+
  • hogehoge/#

このとき、hogehoge/1というトピックにメッセージが発行されたとします。このトピックは上記の3つのトピックに一致するため、Subscriberはこのメッセージを受け取ることになるのですが、Version 3.1.1ではこれらのどのトピックフィルタに一致した結果としてメッセージを受けとったのか、判定することができませんでした。

Version 5.0では、可変ヘッダのメタデータ領域にサブスクリプション識別子プロパティが新設され、SUBSCRIBE処理の際にサブスクリプション識別子を登録しておくことで、サーバからSubscriberにメッセージが送られる際にメッセージのプロパティ領域にサブスクリプション識別子が設定されるため、Subscriberはそのメッセージがどのサブスクリプションに一致した結果なのかを知ることができます。

共有サブスクリプションが利用できるようになった

共有サブスクリプションとはなにかご存知無い方もいるかと思います。
Shared Subscription、共有サブスクリプションとは、1つのSubscriptionを複数のSubscriberで共有するということです。
具体的には、ある共有サブスクリプションを共有している複数のSubscriberがいたとした場合、そのトピックフィルタに一致するパケットは、ただ1つのいずれかのSubscriberのみに送信され、他のSubscriberには送信されません。 共有サブスクリプションが利用されるケースとは、多くのメッセージが流れるケースでSubscriber側の負荷分散をしたい場合や、ミッションクリティカルなケースでSubsciberの冗長化をやりたいケースなどがありえます。
逆に言えば、Version 3.1.1では一工夫しなければこれができませんでした。

メッセージの分配は、よくある形としてはReceive Maximum(後述)などを加味した上でのラウンドロビンの分配になるかと思います。
しかしVersion 5.0ではこのあたりの分配アルゴリズムに関しては言及されていません。基本的にサーバ実装に委ねられています。
いずれはHTTPプロキシサーバのように、ユーザプロパティのサーバ独自のカスタムプロパティを利用して、最も処理に余裕のあるSubscriptionを選択するアルゴリズムなども出てくるかもしれません。

共有サブスクリプションの機能はNATSなどにもあるのですが、NATSなどと違い、トピック名に明確に違いがあります。
NATSなどでは、共有型でも非共有型でもトピック名に区別はありませんが、MQTTは共有サブスクリプションのトピックには明確なルールがあります。

MQTT Version 5.0では、共有サブスクリプションは下記のようなトピック名である必要があります。

$share/{ShareName}/{filter}

ShareNameは、このサブスクリプションを共有するSubscriber間での共通の名前、いわゆる共有グループの識別子を設定します。
filterは、通常のトピックフィルタに相当するものです。
これはつまり、通常のトピックフィルタに、$share/{ShareName}/というプレフィックスがついたものとも言えます。

セッションに有効期限を設けることができるようになった

Version 3.1.1ではClean Sessionしない限り、セッションは半永久的に残り続ける仕様でした。
ただ実際には、現実的な落とし所としてサーバやサービスの実装や仕様次第という状況でした。

Version 5.0では、Session Expiry Intervalを設定することで、クライアントが切断されてからの時間的猶予を設ける事ができます。
逆に言えば、この時間が過ぎたら自動的にセッションが破棄されることになります。

Session Expiry Intervalを設定しなかったり、0に設定した場合、クライアントが切断されてから即座にセッションが破棄されます。
これは正常終了、異常終了に関わらず削除されます。
逆に、Session Expiry Intervalを最大値0xFFFFFFFFに設定すると、セッションの破棄は無効となります。

したがって、Session Expiry Intervalが有効な時間は、0x1〜0xFFFFFFFEとなります。
約135年近い時間が設定できるので、ほぼ不都合なく利用できるはずです。

メッセージに有効期限を設けることができるようになった

セッションと同様に、メッセージにも有効期限を設定できるようになりました。
これにより、サーバ内でSubscriberに送られるのを待っているメッセージや、クライアントには送られたがキューに溜め込まれクライアント側のアプリケーションがまだ読み込んでいないメッセージに対して、有効期限が設けられるようになりました。

基本的にこのオプションはメッセージの再送時に利用されるオプションです。
例えばDurable Subscription内で、セッションの生存期間内ではあるものの、メッセージは時間経過で無効化したい場合などにこのオプションは有効です。

トピックにエイリアスを設定し、パケットサイズを抑制できるようになった

特定の同一のトピックに繰り返しメッセージを送る場合、毎度トピックを送るのはパケットサイズの観点から非効率的です。
Version 5.0では、特定のトピックに対しエイリアスを設定することができ、これによりトピック相当の情報を2バイトにまで抑制することができます。

しかしトピックエイリアスは無制限に付与できるわけではなく、Topic Alias Maximumで設定されている数しか設定できません。
Topic Alias Maximumは受信側が設定するもので、CONNECT制御パケットや、CONNACK制御パケットのプロパティに含まれます。
送信側はこの情報を参照し、トピックエイリアスの数を調整しなければなりません。

※送信側、受信側と書いているのは、Publisher→Server、Server→Subscriber間でそれぞれ機能するためです。

接続先のサーバをプロトコルレベルで誘導できるようになった

HTTPで言うところの、300系のリダイレクト相当のことができるようになりました。
これにより、サーバ負荷を鑑みた接続先の分散や、サーバ更新などにおける一時退避などがやりやすくなりました。

In-flightなパケット数を制限できるようになった

Version 3.1.1では、In-flightな、つまり同時に並列して処理できるメッセージの数を調節することはできませんでした。
事実上、QoS 1以上の場合はPacket Identifierの最大値である65536個までという縛りや、あるいはサーバ実装で独自に上限を設けていることはありましたが、少なくとも仕様上にその制限はありませんでした。
そのため、サーバや、Subscriber側など、メッセージの受信側の性能に合わせた調整などできませんでした。

Version 5.0では、接続時にReceive Maximumという設定を交換することで、この上限を個別に設定できるようになりました。

ただし注意が必要なのは、この設定で制限されるのはQoS 1とQoS 2のパケットだけだと言うことです。QoS 0のパケットは一切の制限を受けません。
この点に関しては、Version 3.1.1と変わらないことになります。

最大パケットサイズ制限により、個別にパケットサイズを制限できるようになった

Version 3.1.1では、MQTTクライアントやサーバは送信時、受信時双方で扱うパケットにサイズ制限を設けることはできませんでした。
そのため、とりわけサーバやSubscriberなMQTTクライアントは望まずとも約256MBまでのデータが送られてきてしまう状況にありました。

Version 5.0では接続毎に最大パケットサイズを設定できるようになりました。この設定は、サーバを介してクライアント間で最大パケットサイズを交換するためのものではなく、クライアントがサーバに個別に最大パケットサイズを通知するだけに過ぎません。各クライアントが設定する最大パケットサイズはそれぞれ独立しています。
また同様に、サーバもサーバが受信できる最大パケットサイズをクライアントに知らせることができるようになりました。これにより、クライアントは超えるサイズのパケットを送ることを事前に抑制することができるようになりました。

もし仮に、Publisher側は送ることができ、Subscriber側が受け取れないようなサイズのメッセージをサーバがPublisherから受け取ったとき、サーバはPublisherからは正常に受け取りますが、このメッセージがSubscriberに送られることはありません。送信不可のメッセージとして破棄されます。

もし共有サブスクリプションであり、それを受け取れるSubscriberと受け取れないSubscriberが混在していた場合、メッセージは破棄されるか、受け取れるSubscriberに転送されます。注意すべき点は、これはサーバが選択するべきことでクライアントが指示することはできない要素であり、事実上サーバ実装がどの様になっているかに依存するということです。

破棄されたメッセージをどう扱うかは、Version 5.0の範疇外とされています。この点もサーバ実装によりますので、利用時に確認する必要があります。

Willメッセージの発出を遅延できるようになった

Version 3.1.1では、Willメッセージが設定されている場合、即座に発出される仕様でした。
そのため、デバイスの再起動などの一時的な切断でも発出されてしまう可能性があり、これを抑止する方法がありませんでした。

Version 5.0では、Will Delay Intervalを設定することで、Willメッセージを即座に送信するのではなく、設定した時間遅延させて送信させることができるようになりました。
これにより、もし仮に待機時間内にクライアントが再接続を果たした場合、Willメッセージの送信を中止することができます。

ただし1点注意事項としては、設定したWill Delay Intervalが経過するよりも前に、Session Expiry Intervalが経過するような場合は、Session Expiry Intervalが経過した時点でWillメッセージが発出されることに注意が必要です。

任意で利用するサーバ機能の利用可否伝達

サーバ側の特定の機能は、特定のユースケースで不要であるため無効化されていたりする場合があります。
Version 3.1.1ではこのような場合、無効化されているかどうかをクライアントが知るすべはありませんでした。

Version 5.0では、下記の機能に関して、サーバが機能を提供しているかどうかをクライアントに通知する手段が設けられました。

  • Retainメッセージ利用可否
  • ワイルドカード購読利用可否
  • サブスクリプション識別子利用可否
  • 共有サブスクリプション利用可否

このパラメータを参照することで、クライアントは機能が限られたサーバ上でもよりベターな形で処理することができます。

セッションの再作成をより簡潔に実装できるようになった

セッションの再作成周りに関する挙動が変更されました。

Version 3.1.1ではセッションを再作成する場合、Clean Sessionを1にした揮発性のセッションで再接続した後に再度切断、Clean Session 0にしたセッションで再々接続する必要がありました。

Version 5.0ではまずこのClean Sessionという設定がClean Startという名前に変わりました。このClean Startを1にしたとき、新しいセッションで開始されます。
もし揮発性セッションを利用したい場合、このClean Start=1とSession Expiry Interval=0を組み合わせて実現します。

サーバがクライアントIDをふれるようになった

Version 3.1.1でもクライアント側が長さ0のクライアントIDを送ることで、サーバが内部的にランダムなクライアントIDをふることは可能でした。 しかし、このクライアントIDをサーバからクライアントへ伝達する手段はなく、基本的に一時的な接続用でしか利用することしかできませんでした。

Version 5.0では、同様に長さ0のクライアントIDを送ることで、サーバ側でクライアントIDを降ることができ、かつさらに、サーバからクライアントにクライアントIDを伝達する事ができるようになりました。
また、揮発性のセッションに限る用途にしか使えなかった制約もなくなっており、このクライアントIDに紐づくセッションの有効期限はSession Expiry Intervalに依存します。

リクエスト・レスポンスパターンを利用できるようになった

リクエスト・レスポンスパターンとはなにかについては、弊ブログで紹介した記事がありますので、そちらを参照ください。

tech-blog.optim.co.jp

まず、Version 3.1.1ではリクエスト・レスポンスパターンはMQTTプロトコルレベルではサポートされていませんでした。
そのため、Version 3.1.1で実現しようとした場合、アプリケーションメッセージ内で独自にフォーマットを作成し、擬似的に再現する必要がありました。

Version 5.0ではリクエスト・レスポンスパターンにMQTTプロトコルレベルで対応しました。これによりアプリケーションメッセージ内で独自に実装する必要はなくなりました。

ペイロードのフォーマットを伝達できるようになった

Version 3.1.1では、PUBLISHパケットのペイロード、いわゆるアプリケーションメッセージの内容をどう解読するかはクライアントアプリケーションに完全に委ねられていました。そのため、完全にPublisher、Subscriber間で取り決めを持っておく必要がありました。

Version 5.0では、2つのフォーマット識別要素が追加されました。Packet Format IndicatorとCotent Typeです。

Packet Format Indicatorは、ペイロードの内容が、UTF-8な文字列か否かを示す最もプリミティブな識別要素です。これにより、例えばロガーのような、少なくとも文字列を扱える汎用的なSubscriberを扱いやすくなりました。

Content Typeは、ペイロードの中身のタイプをより具体的に示す識別要素です。Packet Format Indicatorより具体的に表せる反面、Packet Format Indicatorは1バイトしか消費しないという省エネ設計だったため、よりパケットサイズ的なコストはかかる形になります。
Content Typeの具体的な値についてはMIME Typeなどが使われる可能性が高くはありますが、基本的にはクライアントアプリケーション感の取り決めに依存しており、サーバは一切関与しません。この点は、Version 3.1.1のときと同様です。

正確なKeep Aliveの値を取得できるようになった

Keep Aliveとは、ゾンビ化した接続を正常に切断するためのヘルスチェックのためのパラメータで、一定期間操作がない場合に切断するための閾値となる秒数の値です。

Version 3.1.1では、Keep Aliveの値はクライアントからサーバに一方的に渡すものでした。そのため、サーバ側にKeep Aliveに設定できる値に制限があり、クライアントがこの範囲を超えた値を設定しても、エラーや値の上書きをクライアントが知るすべはありませんでした。(何かしらの形でエラーを得られる設計になっていることはあります)
このような制約は珍しくなく、例えばAWS IoTのMQTTエンドポイントでは、30〜1200秒の範囲でしか設定できません。

Version 5.0では、クライアントからサーバにKeep Aliveを送れるのは同じですが、もしサーバ側で別の設定がありクライアントが送ったものとは異なる設定のKeep Aliveが存在する場合、その値がCONNACKパケットで返されます。クライアントはそれを受け取ることで正しいKeep ALiveの値を知ることができ、それに合わせた動作を行うことができます。むしろ、クライアントが送ったKeep Aliveを破棄し、サーバから送られたKeep Aliveを利用しなければなりません。

自分が発行したメッセージを自分で受け取らないように設定できるようになった

購読時のオプションとして、要求QoS以外の購読オプションが用意され、その中にno Localというオプションが新設されました。

これは、あるクライアントIDで発行されたメッセージを、同じクライアントIDで購読している場合はそれを受信しない、というオプションです。
同じクライアントIDを利用して複数の接続は不可能であるため、事実上、ある接続の上であるトピックを購読し、自分でそこに発行した場合、それを受け取るかどうかを決められるオプションとなります。

no Localが1、つまりTrueの場合、自身のサブスクリプションに自信が発行したメッセージは伝送されなくなります。


Version 5.0を試す

Version 5.0に対応したMQTTブローカーはいくつかありますが、今回はEMQを使います。
もちろん、みんな大好きMosquittoもVersion 5.0に対応しているのでそちらを利用されても構いません。
余談ですがNATSが2021年度のMQTTサポートを謳っているので、非常に楽しみです。

github.comhub.docker.com

Dockerを利用し、EMQ Brokerを起動します。

docker run -d --name emqx -p 1883:1883 -p 18083:18083 emqx/emqx

1883ポートはTCP上のMQTTサービス、18083ポートはHTTP上の管理用ダッシュボードサービスです。
他ポートにも他サービスを備えていますが、今回は利用しないので記載していません。
各種ポートの情報は下記を参照してください。

docs.emqx.io

Subscriberを接続する

Pythonで、定番ライブラリ『paho-mqtt』を利用して試してみようと思います。

github.com

import paho.mqtt.client as mqtt


def on_message(client, userdata, msg):
    print("topic=%s payload=%s" % (msg.topic, msg.payload.decode()))


if __name__ == "__main__":
    client = mqtt.Client(protocol=mqtt.MQTTv5)
    client.on_message = on_message
    client.connect("<EMQのIPアドレス>", 1883, 60)

    client.subscribe("hogehoge")
    client.loop_forever()

よくある実装ですが、1点、mqtt.Clientを初期化するときにprotocol=mqtt.MQTTv5を指定していると思います。
pahoはバージョンの違いを内部的に吸収しているため、どのバージョンを利用するか、指定する必要があります。
具体的には、Version 3.1、Version 3.1.1も含めると、Protocol Levelの指定は下記が候補となります。

Version Protocol Level
3.1 3 ※
3.1.1 4
5.0 5

※ Version 3.1ではProtocol Levelという名称ではなく、Protocol Versionという名称でした。

pahoも含め、まだまだ多くのクライアント実装ではデフォルトのProtocol Levelが3.1.1向けの4を利用しているため、
今回の例と同じように、Version 5.0を利用するには明示的にProtocol Levelを指定しなければならないことに注意が必要です。

Publisherを接続する

Subscriberと同様に、paho-mqttを利用します。

import paho.mqtt.client as mqtt


if __name__ == "__main__":
    client = mqtt.Client(protocol=mqtt.MQTTv5)
    client.connect("<EMQのIPアドレス>", 1883, 60)

    info = client.publish("hogehoge", "hello world")
    info.wait_for_publish()

Subscriberと同様に、明示的にProtocol Levelを指定しています。

このPublisherを実行するたびに、Subscriber側に受け取ったペイロードが出力されるはずです。


さいごに

この記事では、Version 5.0の新機能を利用する場合の実装例には触れませんでした。それに関してはまた別の記事にて紹介できる日があればと思います。

以前の記事で、MQTTを利用する理由について少し触れていましたが、今でもやはりその辺の事情は変わっていません。
AWS、Azure、GCP等、強力なベンダーが提供するIoTサービスのプロトコルとして利用されているためであり、それ以上でもそれ以下でもありません。

そういう意味で、上記サービスのVersion 5.0サポートが望まれるところですが、残念ながらまだサポートされておりません。 また、クライアント側もVersion 5.0をサポートできているものもまだまだ少数です。

今後、時間をかけて、Version 5.0が広がっていく、移行していくのでしょう。

最後に余談ですが、オプティムでは、MQTTなどIoT技術に長けたエンジニアを募集しています。
ご興味あればご覧いただけると幸いです。ではまた次の記事にて。

www.optim.co.jp