【OPTiM Cloud IoT OS データ蓄積編】ラズパイのセンサーデータをCloud IoT OSにアップロードする方法

こんにちは。プラットフォーム技術戦略室の相沢です。
前回のデバイス接続編に続き、Raspberry PiからOPTiM Cloud IoT OS(以下CIOS)にセンサーデータをアップロードする方法について紹介します。 CIOSでは認証やアクセス権限の制御機能を備えているため安全にデータを溜めることができます。また、データを蓄積しておくことで可視化したり、異常時に警告したり、学習に活用することができます。
今回はRaspberry Piのみで取得できるWi-Fiの電波状況をセンサーデータとしました。定期的にセンサーデータを投げて、確認してみようと思います。
前回の記事を読んでいない方は先にこちらをご覧ください。

1. データ保存に用いられるCIOSのAPI

1.1 Collection API

今回、CIOSのCollection APIを使って、センサーデータを登録します。
Collection APIとは、時系列データを活用するためのAPIです。IoTで求められる大量のデータ保存や高頻度かつ低遅延での参照が可能です。 以下にGrafanaで心拍数、温度、湿度、気圧を可視化するイメージを載せています。このようにIoTデバイスから一定期間毎にデータをアップロードする場合に向いています。
Collection APIを利用する場合には、OPTiMにご連絡ください。

f:id:optim-tech:20201223113538p:plain www.optim.co.jp

1.2 Messaging API

Messaging APIでもセンサーデータの確認が可能です。
Messaging APIとは、websocket接続を行い、チャネルを通してメッセージの送受信行うAPIです。メッセージングの永続化を行うとdatastoreにメッセージを保存でき、履歴も確認することができます。websocketでは一度接続できてしまえば、APIを叩かずに受信できるのでリアルタイムに確認したい場合に向いています。

2. CIOSの設定

2.1 チャネル作成

センサーデータを登録するためのチャネルを作成します。
IoT Explorerを開き、チャネル一覧>新規作成へ進みます。チャネル名とリソースオーナーの設定を行います。メッセージングの永続化にチェックを入れてください。後ほど環境変数としてチャネルIDを使います。

f:id:optim-tech:20201218165657p:plain
f:id:optim-tech:20201221175021p:plain

https://utility-iot-explorer.pre.cios.dev

2.2 ACLの設定

Collection APIを有効にするためにACL Managerで割り当て状況>メンバー>指定のユーザーを選択し、一般ユーザーの権限があることを確認します。 含まれていない場合は一般ユーザーの権限にチェックをいれてください。

f:id:optim-tech:20201221175947p:plain

https://acl-manager.pre.cios.dev

2.3 OAuthクライアント作成

CIOSの認証を行うためにOAuthクライアントを作成します。
Developer Centerを開き、OAuthクライアント>新規登録へ進みます。クライアントの表示名、アプリケーションの種別、APIスコープ、リソースオーナーを選択します。今回、種別はクライアントで行いました。また、APIスコープではCollectionにチェックを入れておきます。後ほど、環境変数としてクライアントID、クライアントシークレット、scopeを使います。

f:id:optim-tech:20201221175012p:plain

https://utility-developer-center.pre.cios.dev

3. Raspberry Piでのプログラム作成

接続されているRaspberry Piにアクセスするためにリモートシェルを使います。
IoT Explorerを開き、登録済みのデバイスを選択します。右上からリモートシェルを開きます。sudo apt-get install vimでRaspberry Piにvimをインストールしておきます。
sudo vi /etc/vim/vimrcでvimの設定ファイルを開き、以下を追加してシンタックスハイライトや行番号、自動インデントなどを有効にしておくとvimが多少使いやすくなります。

syntax on
set number

if has("autocmd")
  filetype plugin indent on
endif

3.1 環境変数の設定

sudo vi .envで環境変数を記述しておくためのenvファイルを作成します。
以下の環境変数を.envファイルに書き込みます。Developer Centerで作成したOAuthクライアントのクライアントID、クライアントシークレット、スコープとIoT Explorerで作成したチャネルのチャネルIDを入力します。

CLIENT_ID=(Developer Centerで作成したクライアントのクライアントID)
CLIENT_SECRET=(Developer Centerで作成したクライアントのクライアントシークレット)
SCOPE=messaging.publish+messaging.subscribe+datastore.download+collection.data.read+collection.data.write
AUTH_BODY=grant_type=client_credentials&client_id=${CLIENT_ID}&client_secret=${CLIENT_SECRET}&scope=${SCOPE}
CHANNEL_ID=(IoT Explorerで作成したチャネルのチャネルID)
COLLECTION_ID=(コレクション名) # Collection APIを使う場合
RESOURCE_OWNER_ID=(リソースオーナーID) # Collection APIを使う場合
SERIES_ID=(時系列データの識別ID) # Collection APIを使う場合
AUTH_URL=https://auth.pre.cios.dev/connect/token
MESSAGING_URL=https://messaging.preapis.cios.dev/v2/messaging # Messaging APIを使う場合
DATASTORE_URL=https://messaging.preapis.cios.dev/v2/datastore/channels/${CHANNEL_ID}/object_latest # Messaging APIを使う場合
COLLECTION_URL=https://collections.preapis.cios.dev/v2/collections/${COLLECTION_ID} # Collection APIを使う場合

3.2 センサーデータ送信プログラム作成

PythonでCIOSの作成したチャネルにセンサーデータを送信するプログラムを作成します。
sudo vi sample.pyなどでファイルを作成します。 以下にWi-FiのSSID、利用周波数、伝送レート、受信電力値をセンサーデータとするサンプルコードを載せています。インターバルで設定した秒数毎にsudo iwconfig wlan0を実行してセンサーデータとし、APIを叩いてチャネルにセンサーデータを登録しています。
Messaging APIを使う場合には、コメントアウトしてあるMessaging APIの箇所を解除してCollection APIの箇所をコメントアウトして実行してください。

import time
import datetime
import subprocess as spc
import requests
import json
import os
from dotenv import load_dotenv
import jwt


class Sensor():
    def __init__(self):
        load_dotenv()
        self.authBody = os.getenv('AUTH_BODY')
        self.authUrl = os.getenv('AUTH_URL')
        self.accessToken = ''
        # --- Messaging APIに必要なパラメータ ---
        self.messagingUrl = os.getenv('MESSAGING_URL')
        self.datastoreUrl = os.getenv('DATASTORE_URL')
        self.channelId = os.getenv('CHANNEL_ID')
        # --- Collection APIに必要なパラメータ ---
        self.collectionUrl = os.getenv('COLLECTION_URL')
        self.seriesId = os.getenv('SERIES_ID')
        self.resourceOwnerId = os.getenv('RESOURCE_OWNER_ID')
        self.timeRange = self.makeTimeRange()


    # センサーデータを整形して投げる
    def postWlanData(self):
        result = spc.check_output(["sudo", "iwconfig", "wlan0"])
        list = result.split()
        ssid = list[3][7:len(list[3])-1].decode()
        frequency = list[5][10:len(list[5])].decode()
        rate = list[11][5:len(list[11])].decode()
        rssi = list[29][6:len(list[29])].decode()
        now = time.time() * 1000
        body = {'measurements': {'ssid': ssid, 'frequency': float(frequency), 'rate': float(rate), 'rssi': float(rssi)}, 'series_id': self.seriesId, 'timestamp': int(now)}

        if self.checkExpireTime(self.accessToken):
            self.accessToken = self.getAccessToken()

        ## Messaging APIを使う場合
        # self.postMessaging(body)
        # self.getDatastore()
        ## Collection APIを使う場合
        self.postCollection(body)
        self.getCollection()


    # データ取得期間の設定
    def makeTimeRange(self):
        now = datetime.datetime.now()
        todayStart = datetime.datetime(now.year, now.month, now.day, 0, 0, 0)
        todayEnd = datetime.datetime(now.year, now.month, now.day, 23, 59, 59)

        startTime = int(time.mktime(todayStart.timetuple()) * 1000)
        endTime = int(time.mktime(todayEnd.timetuple()) * 1000)
        timeRange = str(startTime) + ':' + str(endTime)

        return timeRange


    # AccessToken取得
    def getAccessToken(self):
        body = self.authBody
        header = {'Content-Type': 'application/x-www-form-urlencoded'}
        response = requests.post(self.authUrl, body, headers=header)
        resDic = json.loads(response.text)

        return resDic['access_token']


    # アクセストークンの期限を確認
    def checkExpireTime(self, accessToken):
        if len(accessToken) == 0:
            return True
        else:
            token = jwt.decode(accessToken, verify=False)
            if time.time() < token['exp']:
                return False
            else:
                return True


    # Messaging APIでセンサーデータを登録
    def postMessaging(self, body):
        header = {'Authorization': 'Bearer ' + self.accessToken}
        response = requests.post(self.messagingUrl+'?channel_id='+self.channelId, json=body, headers=header)
        print('--- Post Sensor Data ---')
        print(body)
        print('------------------------')


    # Collection APIでセンサーデータを登録
    def postCollection(self, body):
        url = self.collectionUrl + '/series'
        header = {'Authorization': 'Bearer ' + self.accessToken}
        params = {'resource_owner_id': self.resourceOwnerId}
        response = requests.post(url, json=body, headers=header, params=params)
        print('--- Post Sensor Data ---')
        print(body)
        print('------------------------')


    # Messaging APIでセンサーデータを取得
    def getDatastore(self):
        header = {'Authorization': 'Bearer ' + self.accessToken}
        response = requests.get(self.datastoreUrl, headers=header)
        print('--- Get Datastore ---')
        print(response.text)
        print('---------------------')


    # Collection APIでセンサーデータを取得
    def getCollection(self):
        url = self.collectionUrl + '/series/' + self.seriesId
        header = {'Authorization': 'Bearer ' + self.accessToken}
        params = {'resource_owner_id': self.resourceOwnerId, 'time_range': self.timeRange}
        response = requests.get(url, headers=header, params=params)
        print('--- Get Collection ---')
        print(response.text)
        print('----------------------')


# Main
postInterval = 10 #[s]
raspberryPi = Sensor()

while True:
    raspberryPi.postWlanData()
    time.sleep(postInterval)

3.3 起動時の実行

Raspberry Piを起動時にsample.pyの実行を始めたい場合、systemdを設定します。/etc/systemd/systemsampleboot.serviceを作成します。 以下に簡易なsystemdのサンプルコードを載せています。ネットワークの後にsample.pyを起動しています。

[Unit]
Description=sample.py activation
After=network.target

[Service]
ExecStart=/usr/bin/python3 /home/cios_rshell/sample.py

[Install]
WantedBy=multi-user.target

sudo systemctl enable sampleboot.serviceを実行してsampleboot.serviceを有効化し、sudo systemctl start sampleboot.serviceで開始します。

4. センサーデータの確認

f:id:optim-tech:20201218165716p:plain Collection APIでデータを取得する度に登録したデータが追加されていることが確認できました。
Collection APIを使うと一定期間のデータがまとめて取得できるので、グラフなどで可視化する時に便利ですね。

5. 最後に

今回はCollection APIを使って、CIOSにデータのアップロードとアップロードの確認をしました。
次回こそはセンサーデータの可視化方法について紹介したいと思います。

オプティムではCIOSの開発を行うエンジニアを募集しています。