NATS Request-Replyを利用してEdge処理を同期で呼び出す

こんにちは。プラットフォーム技術戦略室で、OPTiM Cloud IoT OSの製品企画などを担当している和田です。

今回は、Edge側処理の同期呼び出しを実現する方式の一つとして、NATS Request-Replyを試してみた例を紹介します。(なお、プロダクション環境でそのまま利用しているものではありません)

はじめに

IoTの環境では、各現場(建設現場、圃場、施設、店舗などなど)で稼働するEdgeやdeviceは、通常 固定のpublic IPアドレスを持っていません。そのため、クラウドとの通信時は、Edge側からクラウド側に接続を張ることになります。

Edgeからクラウドにセンサーデータなどを送信するだけでなく、クラウドからEdgeに向けて制御指示を出したり設定を配布することがあります。

f:id:optim-tech:20191102221810p:plain

この場合、Edgeからクラウドに向けてWebSocketやMQTTなどのセッションを張っておくか、Edgeからクラウドを定期的にポーリングして、クラウドからの情報をEdgeに伝達するのが方法の一つになります。これらの方式ではクラウドからのEdge処理呼び出しは基本的に非同期になります。

何らかの事情でレスポンスを同期で取得したい場合、WebSocketなどの上に同期処理を実装することも可能ですが、requestとresponseの紐付け/待ち合わせ処理、タイムアウト処理などを独自実装することになります。今回、Edge処理の同期呼び出しを簡単に実装する方式の一つとして、NATS Request-Replyを利用する方法を検討しました。

なお、MQTT version5ではRequest / Responseの仕様も検討されているようです。

NATS

NATSは軽量にメッセージングシステムを実現するミドルウェアの一つです。NATSについてはこちらの記事でも紹介していますので、よろしければご覧ください。

tech-blog.optim.co.jp

NATSは、メッセージブローカーとしての使い方が一般的だと思いますが、Request-Replyのメッセージングモデルにも対応しています。(と、上記 記事の著者から教えてもらいました)

今回は、このNATSのRequest-Replyを利用してEdge処理を同期的に呼び出してみます。

実装例

f:id:optim-tech:20191102221813p:plain

ユーザーによるWebブラウザからの操作に対して、Edge処理を呼び出して同期でレスポンスを返すユースケースを想定します。

クラウド側にRest APIを用意し、APIを呼び出すと、クラウドからEdge処理を同期で呼び出して(ここでNATSのRequest-Replyを利用)、結果をAPIのレスポンスとして返却します。

以下では、Node.jsでnats clientライブラリを利用して実装しています。NATSのclientライブラリは様々な言語で提供されています。

github.com

Edge側
const NATS = require('nats');
const nats_url = process.env.NATS_URL || 'nats://172.17.0.1:4222';
const nats = NATS.connect({url: nats_url});

const edge_id = process.env.EDGE_ID || 'xxxxx';

nats.subscribe(`greeting-${edge_id}`, (request, replyTo) => {
    nats.publish(replyTo, `Hello ${request}!!`);
});

環境変数でedge_idを与え、 greeting-{edge_id} というsubjectをsubscribeします。このsubjectから名前を受け取ると、 Hello {名前}!! を同期で返します。

API側
const express = require('express');
const app = express();
const port = process.env.PORT || 8888;
const server = app.listen(port, ()=> {
    console.log("Node.js is listening to PORT:" + server.address().port);
});

const NATS = require('nats');
const nats_url = process.env.NATS_URL || 'nats://172.17.0.1:4222';

const nats_reply_timeout = process.env.NATS_REPLY_TIEOUT || 1000;

app.get('/api/edge/:edge_id/greeting/:name', (req, res) => {
    const edge_id = req.params.edge_id;

    const nats = NATS.connect({url: nats_url});
    nats.requestOne(`greeting-${edge_id}`, req.params.name, {}, nats_reply_timeout, response => {
        if(response instanceof NATS.NatsError) {
            console.log(response);
            res.status(500);
            res.send('error');
            nats.close();
            return;
        }
        res.send(response);
        nats.close();
    });
});

処理を呼び出し対象のEdgeのedge_idをpath parameterにつけたAPIエンドポイントを作成しています。 nats.requestOne() がRequest-Replyの呼び出し部分です。

NATSとAPIサーバを起動します。docker-composeを利用するとこんな感じです。

FROM node:10
WORKDIR /app
COPY package*.json ./
RUN npm install
# .dockerignoreでnode_modulesは除外済
COPY . .
EXPOSE 8080
CMD [ "node", "app.js" ]
version: '3'

services:
  nats-server:
    image: nats
    ports:
      - 4222:4222

  api:
    build:
      context: ./api
      dockerfile: Dockerfile
    ports:
      - 8080:8080
    environment:
      PORT: 8080
      NATS_URL: nats://nats-server:4222
動作確認

環境変数に EDGE_ID=123456 を指定して、Edgeのプログラムを起動します。

curlでAPIサーバのAPIを呼び出すと、無事に同期でレスポンスが返却されました。

curl http://localhost:8080/api/edge/123456/greeting/wada
Hello wada!!

まとめ

NATSのRequest-Replyを利用することで、Edge処理の同期呼び出しを簡単に実装できました。 nats clientライブラリに、待ち合わせやタイムアウト処理を委ねられるため、独自実装が不要です。

なおプロダクション化するためには下記のような考慮が必要で、このまま利用することはできません。

  • EdgeからNATSへの接続
    • クライアント認証と通信暗号化 Authentication · NATS
    • 重複しないedge_idの払い出し、edge_idをキーにした各subjectに対するEdgeからのアクセス制御
  • API
    • API認証
    • 各edgeに対する処理呼び出しの認可制御

おわりに

オプティムでは、ユースケースごとに様々なミドルウェアを選定して利用しています。AWS/GCP/Azureなどが提供するマネジドサービスを利用するのはもちろん、必要があればVM上やkubernetes上でクラスタを組んで運用しています。

オプティムでは、枯れたOSSやミドルウェアを活用して、独自実装すべき領域に注力するエンジニアを募集しています。

www.optim.co.jp