こんにちは。プラットフォーム技術戦略室で、OPTiM Cloud IoT OSの製品企画などを担当している和田です。
今回は、Edge側処理の同期呼び出しを実現する方式の一つとして、NATS Request-Replyを試してみた例を紹介します。(なお、プロダクション環境でそのまま利用しているものではありません)
はじめに
IoTの環境では、各現場(建設現場、圃場、施設、店舗などなど)で稼働するEdgeやdeviceは、通常 固定のpublic IPアドレスを持っていません。そのため、クラウドとの通信時は、Edge側からクラウド側に接続を張ることになります。
Edgeからクラウドにセンサーデータなどを送信するだけでなく、クラウドからEdgeに向けて制御指示を出したり設定を配布することがあります。
この場合、Edgeからクラウドに向けてWebSocketやMQTTなどのセッションを張っておくか、Edgeからクラウドを定期的にポーリングして、クラウドからの情報をEdgeに伝達するのが方法の一つになります。これらの方式ではクラウドからのEdge処理呼び出しは基本的に非同期になります。
何らかの事情でレスポンスを同期で取得したい場合、WebSocketなどの上に同期処理を実装することも可能ですが、requestとresponseの紐付け/待ち合わせ処理、タイムアウト処理などを独自実装することになります。今回、Edge処理の同期呼び出しを簡単に実装する方式の一つとして、NATS Request-Replyを利用する方法を検討しました。
なお、MQTT version5ではRequest / Responseの仕様も検討されているようです。
NATS
NATSは軽量にメッセージングシステムを実現するミドルウェアの一つです。NATSについてはこちらの記事でも紹介していますので、よろしければご覧ください。
NATSは、メッセージブローカーとしての使い方が一般的だと思いますが、Request-Replyのメッセージングモデルにも対応しています。(と、上記 記事の著者から教えてもらいました)
今回は、このNATSのRequest-Replyを利用してEdge処理を同期的に呼び出してみます。
実装例
ユーザーによるWebブラウザからの操作に対して、Edge処理を呼び出して同期でレスポンスを返すユースケースを想定します。
クラウド側にRest APIを用意し、APIを呼び出すと、クラウドからEdge処理を同期で呼び出して(ここでNATSのRequest-Replyを利用)、結果をAPIのレスポンスとして返却します。
以下では、Node.jsでnats clientライブラリを利用して実装しています。NATSのclientライブラリは様々な言語で提供されています。
Edge側
const NATS = require('nats'); const nats_url = process.env.NATS_URL || 'nats://172.17.0.1:4222'; const nats = NATS.connect({url: nats_url}); const edge_id = process.env.EDGE_ID || 'xxxxx'; nats.subscribe(`greeting-${edge_id}`, (request, replyTo) => { nats.publish(replyTo, `Hello ${request}!!`); });
環境変数でedge_idを与え、 greeting-{edge_id}
というsubjectをsubscribeします。このsubjectから名前を受け取ると、 Hello {名前}!!
を同期で返します。
API側
const express = require('express'); const app = express(); const port = process.env.PORT || 8888; const server = app.listen(port, ()=> { console.log("Node.js is listening to PORT:" + server.address().port); }); const NATS = require('nats'); const nats_url = process.env.NATS_URL || 'nats://172.17.0.1:4222'; const nats_reply_timeout = process.env.NATS_REPLY_TIEOUT || 1000; app.get('/api/edge/:edge_id/greeting/:name', (req, res) => { const edge_id = req.params.edge_id; const nats = NATS.connect({url: nats_url}); nats.requestOne(`greeting-${edge_id}`, req.params.name, {}, nats_reply_timeout, response => { if(response instanceof NATS.NatsError) { console.log(response); res.status(500); res.send('error'); nats.close(); return; } res.send(response); nats.close(); }); });
処理を呼び出し対象のEdgeのedge_idをpath parameterにつけたAPIエンドポイントを作成しています。 nats.requestOne()
がRequest-Replyの呼び出し部分です。
NATSとAPIサーバを起動します。docker-composeを利用するとこんな感じです。
FROM node:10 WORKDIR /app COPY package*.json ./ RUN npm install # .dockerignoreでnode_modulesは除外済 COPY . . EXPOSE 8080 CMD [ "node", "app.js" ]
version: '3' services: nats-server: image: nats ports: - 4222:4222 api: build: context: ./api dockerfile: Dockerfile ports: - 8080:8080 environment: PORT: 8080 NATS_URL: nats://nats-server:4222
動作確認
環境変数に EDGE_ID=123456
を指定して、Edgeのプログラムを起動します。
curlでAPIサーバのAPIを呼び出すと、無事に同期でレスポンスが返却されました。
curl http://localhost:8080/api/edge/123456/greeting/wada Hello wada!!
まとめ
NATSのRequest-Replyを利用することで、Edge処理の同期呼び出しを簡単に実装できました。 nats clientライブラリに、待ち合わせやタイムアウト処理を委ねられるため、独自実装が不要です。
なおプロダクション化するためには下記のような考慮が必要で、このまま利用することはできません。
- EdgeからNATSへの接続
- クライアント認証と通信暗号化 Authentication · NATS
- 重複しないedge_idの払い出し、edge_idをキーにした各subjectに対するEdgeからのアクセス制御
- API
- API認証
- 各edgeに対する処理呼び出しの認可制御
おわりに
オプティムでは、ユースケースごとに様々なミドルウェアを選定して利用しています。AWS/GCP/Azureなどが提供するマネジドサービスを利用するのはもちろん、必要があればVM上やkubernetes上でクラスタを組んで運用しています。
オプティムでは、枯れたOSSやミドルウェアを活用して、独自実装すべき領域に注力するエンジニアを募集しています。