AWS IoT Core + MQTT + Protocol Buffersで実現するメッセージング

はじめに

はじめまして、フロントエンドエンジニアの菅原です。 普段はHardware My Portalという製品のフロントエンドやデバイスエージェントの開発を担当しています。

Hardware My Portalは名前の通り、ハードウェアに関連する要素を持つため、IoT的な側面も備えています。 例として、デバイスに接続したIoTゲートウェイ上でプログラムを実行し、データをクラウド上に送信する機能も実装しています。 そのような処理を実装するために使用したサービスや技術について、サンプルも交えながらお話しできればと思います!

技術紹介

Hardware My PortalのIoT関連の実装で利用している技術について、軽く紹介したいと思います。

MQTT

MQTTは、PubSub型のメッセージングプロトコルです。 メッセージを送受信するクライアント(Publisher/Subscriber)と、クライアント同士のやり取りを仲介するブローカーにより構成されます。 メッセージの送受信の際には、「トピック」と呼ばれるラベルのようなものを指定します。

device/01/sensor/temperature

Publisherはトピックを指定してメッセージを送信し、Subscriberはあらかじめメッセージを受信したいトピックをサブスクライブしておくことで当該トピックにメッセージが配信された時にそのメッセージを受信することができます。

TECH BLOGでは他の方が書かれたより詳しい解説記事があるので、興味のある方はそちらもご覧いただけますと幸いです!

AWS IoT Core

AWS IoT Coreは、IoTデバイスをクラウドに接続するための機能を提供するマネージドサービスです。 MQTTブローカーとしての機能を持っているため、AWS上でMQTTを使用したサービスを作成したいといった場合に利用できます。 AWS IoT Coreの特徴を一部挙げると、

  • デバイス証明書による接続・送受信の許可
  • ルールエンジンによるメッセージの処理

が可能となっています。 デバイス証明書に対してポリシーを設定しておくことでデバイスがメッセージを受信できるトピックを制限するなど、安全にメッセージをやり取りすることができます。 また、ルールエンジン(またはルールアクション)は特定のトピックに流れてきたメッセージをLambdaで処理する、データベースに格納するといったデータ利用のための仕組みとなっています。

Protocol Buffers

Protocol Buffersは、Googleが開発した構造化データをシリアライズするための仕組みです。

.protoファイルに対してメッセージ構造を定義し、その定義からさまざまな言語(Go、C++、Javaなど)向けの型定義を生成することができます。 型定義をもとにデータをやり取りすることで、送信・受信側の両方で共通の型定義を参照しながら型安全にメッセージをやり取りすることができます。 メッセージ構造の定義は比較的読みやすく、int64stringなどプログラミング経験がある方であれば事前知識がなくても理解しやすい内容です。

これらの技術を採用した理由

MQTT・AWS IoT Coreを採用した理由としては、証明書を持つデバイスのみが接続できるためセキュリティを確保しやすいと考えたことと、今回は紹介していませんがフリートプロビジョニング1という機能を用いることで、大量のデバイス追加に対応しやすいと考えたためです。

Protocol Buffersについては、バイナリ形式に変換できデータサイズ削減が期待できることや送受信側で共通のメッセージ定義を参照できることから採用しています。

前者については、IoTデバイスはSIMを使用して通信している場合などネットワークに制約が課されているケースがあるため、通信で使用するデータサイズを削減できることはコスト面、性能面でメリットがあると考えています。

後者については、デバイスから送信されたデータを別のプログラムで処理したいといったケースにおいて、共通のメッセージ定義を参照できるため「どんな構造でデータが送られてくるんだっけ?」という状況にならずに済みます。 実装面でも、メッセージが構造体などで表現されるため型がわかりやすい点も個人的には嬉しいです。

実際にやってみた

ここからはGoによるサンプルコードを交えながら実際にAWS IoTとProtocol Buffersを使ってみたいと思います。 作成するプログラムの処理としては、Protocol Buffersを使用してメッセージをシリアライズ、AWS IoT Coreに送信します。 送信したメッセージはルールアクションにより処理し、S3への保存とレスポンス用のトピックへの再送信を実行します。

構成

MQTTクライアント(Go言語)によりメッセージを送信→トピックに送信したメッセージをルールにより処理→S3とレスポンス用トピックに対して送信→MQTTクライアントで受信するという構成です。

トピック中に${ClientId}を含めていますが、ここにはMQTTクライアントを一意に識別するためのIDであるクライアントIDが入る想定です。 また、クライアントから送信されるメッセージはProtocol Buffersによりバイナリ形式に変換されており、そのままでは人が読むことができないためルールアクションによりJSONに変換してから出力します。

  • 送信用トピック: dt/optim/tech_book/iot/${ClientId}/message
  • 受信用トピック: dt/optim/tech_book/iot/${ClientId}/message/response

環境

開発には以下の環境を使用しています。

  • Mac OS: Sequoia 15.5
  • go: 1.25.0
  • buf 1.57.2

今回は、Protocol Buffers関連のツールとしてbufというツールを使用します。 .protoファイルの変換にはprotocと呼ばれるコンパイラが提供されていますが、bufも同様にコンパイルに使用できるほか、リンターやフォーマッターとしての機能も備えているため、こちらを採用しています。

ディレクトリ構成

最終的には以下のような構成になりました。

.
├── golang
│   ├── cert
│   │   ├── AmazonRootCA1.pem
│   │   ├── device.pem.crt
│   │   └── private.pem.key
│   ├── go.mod
│   ├── go.sum
│   └── main.go
├── proto
│   ├── buf.gen.yaml
│   ├── buf.yaml
│   ├── gen
│   │   └── golang
│   │       └── tech_book
│   │           └── v1
│   │               └── message.pb.go
│   ├── go.mod
│   ├── go.sum
│   ├── message.desc
│   └── tech_book
│       └── v1
│           └── message.proto
└── README.md

Protocol Buffersのメッセージ定義

protoディレクトリに移動し、必要なファイルを作成していきます。

cd proto
mkdir -p tech_book/v1
touch tech_book/v1/message.proto

メッセージ定義として、以下のようなmessage.protoを定義します。 headerとbodyから構成されるDeviceDataというメッセージをクライアントから送信します。

syntax = "proto3";

package tech_book.v1;

message Header {
    string id = 1;
    uint64 timestamp = 2;
}

message Body {
    string device_name = 1;
    uint32 cpu_usage = 2;
    uint32 memory_usage = 3;
}

message DeviceData {
    Header header = 1;
    Body body = 2;
}

続いて、bufの設定ファイルを作成します。 また、生成したファイルをGoから利用するため、go.modも併せて作成します。

buf config init
touch buf.gen.yaml
go mod init <YOUR_GIT_REPOSITORY>/proto

buf.yamlは自動で生成されたものをそのまま使用します。 buf.gen.yamlではGo言語のファイルを生成するための設定を記述しています。

# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml
version: v2
lint:
  use:
    - STANDARD
breaking:
  use:
    - FILE
version: v2
managed:
  enabled: true
  override:
    # パッケージ名をgitlabリポジトリのパスで上書き
    - file_option: go_package_prefix
      value: <YOUR_GIT_REPOSITORY>/proto/gen
plugins:
  - remote: buf.build/protocolbuffers/go
    out: gen/golang
    opt: paths=source_relative
inputs:
  - directory: ./
module <YOUR_GIT_REPOSITORY>/proto

go 1.25.0

require google.golang.org/protobuf v1.36.1

設定ファイル作成後、Goからパッケージとして読み込むためのファイルとAWS IoTルール上でのデコードに使用する記述子ファイルを作成します。

buf generate tech_book/v1/message.proto
buf build tech_book/v1/message.proto --as-file-descriptor-set -o message.desc

コマンドの実行が成功していれば、gen/tech_book/v1/message.pb.goというファイルとmessage.descというファイルが生成されていると思います。

以上でProtocol Buffers側の作業は完了になります。 作成したmessage.descは後ほどS3バケットに配置します。

MQTTクライアントの実装

続いて、Go言語側の作業を進めていきます。

cd ../golang
go mod init <YOUR_GIT_REPOSITORY>/golang
touch main.go

今回MQTTクライアントを実装するにあたり、paho.mqtt.golangを使用します。

MQTT関連の処理に加え、AWS IoT Coreに接続する際には後に作成する証明書が必要になるため、証明書を読み込むための処理も合わせて実装しています。 また、先ほどProtocol Buffers側の作業で生成したgen/tech_book/v1/message.pb.goをパッケージとして読み込んでいます。 gen/tech_book/v1/message.pb.goにはProtocol Buffersで定義したメッセージが構造体として定義されており、この構造体に値を格納・シリアライズすることでバイナリ形式に変換します。

package main

import (
    "crypto/tls"
    "crypto/x509"
    "errors"
    "fmt"
    "log/slog"
    "os"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/google/uuid"
    "google.golang.org/protobuf/proto"

    tech_book_v1 "<YOUR_GIT_REPOSITORY>/proto/gen/golang/tech_book/v1"
)

func main() {
    // 接続情報の読み込み
    endpoint := os.Getenv("AWS_IOT_ENDPOINT")
    clientID := os.Getenv("AWS_IOT_CLIENT_ID")
    certFile := os.Getenv("AWS_IOT_CERT_FILE")
    keyFile := os.Getenv("AWS_IOT_KEY_FILE")
    caFile := os.Getenv("AWS_IOT_CA_FILE")

    if endpoint == "" || clientID == "" || certFile == "" || keyFile == "" || caFile == "" {
        slog.Error("failed to get env")
        os.Exit(1)
    }

    // 証明書の読み込み
    tlsConfig, err := loadTLSConfig(certFile, keyFile, caFile)
    if err != nil {
        slog.Error("failed to load cert files", "error", err)
        os.Exit(1)
    }

    // MQTTクライアントの作成
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("ssl://%s:8883", endpoint)) // AWS IoTのエンドポイントは8883ポートを使用
    opts.SetClientID(clientID)
    opts.SetTLSConfig(tlsConfig)
    client := mqtt.NewClient(opts)

    // AWS IoT Coreへ接続
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        slog.Error("failed to connect", "error", token.Error())
        os.Exit(1)
    }

    // トピックをサブスクライブ
    responseTopic := fmt.Sprintf("dt/optim/tech_book/iot/%s/message/response", clientID)
    if token := client.Subscribe(responseTopic, 0, func(client mqtt.Client, msg mqtt.Message) {
        slog.Info("received message",
            "topic", msg.Topic(),
            "payload", string(msg.Payload()),
        )
    }); token.Wait() && token.Error() != nil {
        slog.Error("failed to subscribe", "error", token.Error())
        os.Exit(1)
    }

    // メッセージの構築
    reqId, err := uuid.NewRandom()
    if err != nil {
        slog.Error("failed to generate uuid", "error", err)
        os.Exit(1)
    }
    deviceData := &tech_book_v1.DeviceData{
        Header: &tech_book_v1.Header{
            Id:        reqId.String(),
            Timestamp: uint64(time.Now().Unix()),
        },
        Body: &tech_book_v1.Body{
            DeviceName:  "test-device-01",
            CpuUsage:    45,
            MemoryUsage: 78,
        },
    }

    // メッセージをシリアライズ
    messageBytes, err := proto.Marshal(deviceData)
    if err != nil {
        slog.Error("failed to marshal DeviceData", "error", err)
        os.Exit(1)
    }

    // トピックに対してメッセージを送信
    topic := fmt.Sprintf("dt/optim/tech_book/iot/%s/message", clientID)
    token := client.Publish(topic, 0, false, messageBytes)
    token.Wait()
    if token.Error() != nil {
        slog.Error("failed to publish message", "error", token.Error())
        os.Exit(1)
    }

    // レスポンス待ち
    time.Sleep(time.Second)

    // 切断
    client.Disconnect(250)
}

func loadTLSConfig(certFile, keyFile, caFile string) (*tls.Config, error) {
    // クライアント証明書の読み込み
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return nil, fmt.Errorf("failed to load client cert file: %w", err)
    }

    // CA証明書の読み込み
    caCert, err := os.ReadFile(caFile)
    if err != nil {
        return nil, fmt.Errorf("failed to load ca cert file: %w", err)
    }

    caCertPool := x509.NewCertPool()
    if !caCertPool.AppendCertsFromPEM(caCert) {
        return nil, errors.New("failed to append certs")
    }

    // TLS設定の構築
    tlsConfig := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
    }

    return tlsConfig, nil
}

AWSのリソース作成

ここからはAWS側の作業になります。

ポリシーの作成

デバイスからAWS IoT Coreを利用するために必要なポリシーを作成します。 AWSコンソールから「AWS IoT -> セキュリティ -> ポリシー -> ポリシーを作成」と移動して、tech_book_iot_policyという名前で作成しています。

付与するポリシーをJSONで表現すると以下のようになります。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:<YOUR_AWS_REGION>:<YOUR_AWS_ACCOUNT_ID>:client/*"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:iot:<YOUR_AWS_REGION>:<YOUR_AWS_ACCOUNT_ID>:topic/dt/optim/tech_book/iot/${iot:ClientId}/message"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": "arn:aws:iot:<YOUR_AWS_REGION>:<YOUR_AWS_ACCOUNT_ID>:topicfilter/dt/optim/tech_book/iot/${iot:ClientId}/message/response"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": "arn:aws:iot:<YOUR_AWS_REGION>:<YOUR_AWS_ACCOUNT_ID>:topic/dt/optim/tech_book/iot/${iot:ClientId}/message/response"
    }
  ]
}

今回必要になるのは以下の4つのポリシーです。

iot:Connect

任意のクライアントIDを持つデバイスがAWS IoT Coreに接続することを許可しています。

iot:Publish

dt/optim/tech_book/iot/${iot:ClientId}/messageというトピックに対するパブリッシュを許可しています。 また、リソース中にポリシー変数である${iot:ClientId}を含めています。 ${iot:ClientId}はクライアントIDに置き換えられるため、クライアント自身のIDを含んだトピックに対してのみパブリッシュが可能になります。

iot:Subscribe/iot:Receive

パブリッシュと同様、自身のクライアントIDを含んだdt/optim/tech_book/iot/${iot:ClientId}/message/responseというトピックに対するサブスクライブおよびメッセージ受信を許可しています。

モノ(Thing)の作成

AWS IoT Coreに接続するためのモノと証明書を作成します。

AWS IoT Coreではモノの名前をMQTTのクライアントIDとして使用することが推奨されているためUUIDなどの方が望ましいですが、今回はわかりやすさを重視しtech_book_example_thingとしています。

AWSから「AWS IoT Core -> 管理 -> すべてのデバイス -> モノ -> モノを作成 -> 1つのモノを作成」と進み、作成を進めていきます。

  • モノの名前: tech_book_example_thing
  • デバイス証明書を設定: 新しい証明書を自動生成
  • 証明書にポリシーをアタッチ: 先ほど作成したtech_book_iot_policyを選択

最後に、証明書とキーをダウンロードするダイアログが表示されるので忘れずにダウンロードしておきます。

S3バケットの作成

メッセージのデシリアライズで使用する記述子ファイルを配置するためのバケットを作成します。

「Amazon S3 -> バケット -> バケットを作成」と進み、tech-book-example-bucketとしてバケットを作成します。 他の設定は一旦デフォルトのままにしておきます。

作成後、AWS IoTからS3へのアクセスを許可するバケットポリシーを設定し、message.descファイルをバケットに配置します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowDescFileRead",
            "Effect": "Allow",
            "Principal": {
                "Service": "iot.amazonaws.com"
            },
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::tech-book-example-bucket/*.desc"
        }
    ]
}

ルール・アクションの作成

AWS IoTに送信したメッセージを処理するためのルールとアクションを作成します。

「AWS IoT -> メッセージのルーティング -> ルール -> ルールを作成」と進み、tech_book_example_ruleというルール名で作成を進めていきます。

SQLステートメントでは処理対象となるトピックの指定やメッセージに対する処理を記載していきます。

SELECT VALUE decode(*, 'proto', 'tech-book-example-bucket', 'message.desc', 'message', 'DeviceData')
FROM 'dt/optim/tech_book/iot/+/message'
  • 処理対象のトピック: dt/optim/tech_book/iot/+/message

    • +はワイルドカードであり、任意のクライアントIDを持つデバイスが送信したメッセージを処理できるようにしています
  • 処理: decode()関数によるバイナリ→JSONへのデシリアライズ

    • 引数はdecode(<ENCODED DATA>, '<ENCODE>, '<S3 BUCKET NAME>', '<S3 OBJECT KEY>', '<PROTO NAME>', '<MESSAGE TYPE>')のようになっているため、以下のように設定します
引数名 説明
ENCODED DATA *を指定してペイロード全体を処理対象とします
ENCODE Protocol Buffersなのでprotoを指定します
S3 BUCKET NAME 先ほど作成したtech-book-example-bucketを指定します
S3 OBJECT KEY S3上に配置した記述子ファイルのキーを指定します
PROTO NAME 記述子ファイルの拡張子を除いた名前を指定します
MESSAGE TYPE DeviceDataを指定します

アクションについては、以下の2つを設定します。

  • デコードしたデータをS3に格納する
    • バケットは先ほど作成したtech-book-example-bucketを使用します
    • キーはdata/${parse_time('yyyy/MM/dd', timestamp(), 'Asia/Tokyo')}/message.jsonとし、日付ごとに分けてファイルが配置されるようにします
  • dt/optim/tech_book/iot/${clientid()}/message/responseに対してメッセージをリパブリッシュ(再送信)
    • アクションではProtocol Buffersのデシリアライズはできますが、シリアライズすることはできないため今回はJSONのまま送信します
    • ${clientid()}はメッセージの送信元のクライアントIDに置き換えられます

IAMロールは新しくtech_book_example_roleというロールを作成します。 ロールを作成すると自動で必要なポリシーが割り当てられますが、先ほどリパブリッシュのアクションで指定したトピックに${clientid()}という文字が埋め込まれたまま作成されてしまうため、作成後に手動でワイルドカードに修正します。 最終的には以下のようなポリシーになっていれば問題ないです。

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::tech-book-example-bucket/*"
    }
}
{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "iot:Publish",
        "Resource": "arn:aws:iot:<YOUR_AWS_REGION>:<YOUR_AWS_ACCOUNT_ID>:topic/dt/optim/tech_book/iot/*/message/response"
    }
}

以上でAWS側の操作は終了です。

コードの実行

ソースコードの作成とAWSのリソースの準備が完了したので、実際にプログラムを動かしていきます。

ソースコード中の環境変数については、以下のように設定します。 合わせて、モノの作成時にダウンロードした証明書をgolang/cert/に配置します。

変数名 説明
AWS_IOT_ENDPOINT 「AWS IoT->ドメイン設定->iot:Data-ATS」から取得した値
AWS_IOT_CLIENT_ID tech_book_example_thing
AWS_IOT_CERT_FILE ダウンロードしたデバイス証明書のパス
AWS_IOT_KEY_FILE ダウンロードしたプライベートキーファイルのパス
AWS_IOT_CA_FILE ダウンロードしたAmazon ルートCA証明書のパス

環境変数の設定およびファイルの配置が完了したらプログラムを実行します。

go run main.go

送信したメッセージをJSONに変換したメッセージがレスポンスとして受信できていること、またS3に受信したメッセージが配置されていることが確認できました!

2025/10/17 13:24:12 INFO received message topic=dt/optim/tech_book/iot/tech_book_example_thing/message/response payload="{\"header\":{\"id\":\"1d7a8670-60ca-4294-800e-338456e148fd\",\"timestamp\":\"1760675052\"},\"body\":{\"deviceName\":\"test-device-01\",\"cpuUsage\":45,\"memoryUsage\":78}}"

やってみた感想

良かったところ

  • AWS IoT ルールの宛先にいろいろなサービスが指定できるため、自由度は高そうな印象
    • 対応していないサービスでも一旦LambdaやAPIを経由させるなど、回避策はありそう
  • Protocol Buffersのメッセージ定義に従って実装するため、型安全にデータをやり取りしやすい
  • Protocol Buffersで後方互換性を担保できるため、古いメッセージ定義に従って送信されたデータでもデシリアライズ可能

大変だったところ

  • AWS IoT Core独自の制限があり、事前に把握しておかないと送受信の際に失敗することがある
    • メッセージサイズが最大128 KB、トピックに含められる/の数が最大7つであり拡張に限界があるなど
  • メッセージ送受信のたびにシリアライズ/デシリアライズが必要となるため、動作確認は多少手間がかかる
    • JSONと相互変換するためのクライアントを自作したり、MQTTXというGUIクライアントを使用して対応

まとめ

個人では触れる機会の少ないAWS IoT関連のサービスを実際に利用することができ、とても勉強になりました! また、トピック設計やメッセージ設計で色々工夫の余地がありそうなので、この辺りを突き詰めてみるのも面白いのではないかと思います。

おわりに

当社では、AWSを使って様々なサービスの開発を行っています。技術的な挑戦を楽しみながら、一緒にプロダクトを成長させていける仲間を募集していますので、ご興味のある方は、ぜひご連絡ください。

www.optim.co.jp


  1. 専用の証明書を利用して、あらかじめ作成したテンプレートに従いデバイス証明書の作成、モノの作成、ポリシーのアタッチを実行してくれる機能。個別にデバイス証明書やモノを作成する手間が削減できる。