こんにちは。プラットフォーム技術戦略室の相沢です。
前回のデバイス接続編に続き、Raspberry PiからOPTiM Cloud IoT OS(以下CIOS)にセンサーデータをアップロードする方法について紹介します。
CIOSでは認証やアクセス権限の制御機能を備えているため安全にデータを溜めることができます。また、データを蓄積しておくことで可視化したり、異常時に警告したり、学習に活用することができます。
今回はRaspberry Piのみで取得できるWi-Fiの電波状況をセンサーデータとしました。定期的にセンサーデータを投げて、確認してみようと思います。
前回の記事を読んでいない方は先にこちらをご覧ください。
1. データ保存に用いられるCIOSのAPI
1.1 Collection API
今回、CIOSのCollection APIを使って、センサーデータを登録します。
Collection APIとは、時系列データを活用するためのAPIです。IoTで求められる大量のデータ保存や高頻度かつ低遅延での参照が可能です。
以下にGrafanaで心拍数、温度、湿度、気圧を可視化するイメージを載せています。このようにIoTデバイスから一定期間毎にデータをアップロードする場合に向いています。
Collection APIを利用する場合には、OPTiMにご連絡ください。
1.2 Messaging API
Messaging APIでもセンサーデータの確認が可能です。
Messaging APIとは、websocket接続を行い、チャネルを通してメッセージの送受信行うAPIです。メッセージングの永続化を行うとdatastoreにメッセージを保存でき、履歴も確認することができます。websocketでは一度接続できてしまえば、APIを叩かずに受信できるのでリアルタイムに確認したい場合に向いています。
2. CIOSの設定
2.1 チャネル作成
センサーデータを登録するためのチャネルを作成します。
IoT Explorerを開き、チャネル一覧>新規作成へ進みます。チャネル名とリソースオーナーの設定を行います。メッセージングの永続化にチェックを入れてください。後ほど環境変数としてチャネルIDを使います。
https://utility-iot-explorer.pre.cios.dev
2.2 ACLの設定
Collection APIを有効にするためにACL Managerで割り当て状況>メンバー>指定のユーザーを選択し、一般ユーザーの権限があることを確認します。 含まれていない場合は一般ユーザーの権限にチェックをいれてください。
https://acl-manager.pre.cios.dev
2.3 OAuthクライアント作成
CIOSの認証を行うためにOAuthクライアントを作成します。
Developer Centerを開き、OAuthクライアント>新規登録へ進みます。クライアントの表示名、アプリケーションの種別、APIスコープ、リソースオーナーを選択します。今回、種別はクライアントで行いました。また、APIスコープではCollectionにチェックを入れておきます。後ほど、環境変数としてクライアントID、クライアントシークレット、scopeを使います。
https://utility-developer-center.pre.cios.dev
3. Raspberry Piでのプログラム作成
接続されているRaspberry Piにアクセスするためにリモートシェルを使います。
IoT Explorerを開き、登録済みのデバイスを選択します。右上からリモートシェルを開きます。sudo apt-get install vim
でRaspberry Piにvimをインストールしておきます。
sudo vi /etc/vim/vimrc
でvimの設定ファイルを開き、以下を追加してシンタックスハイライトや行番号、自動インデントなどを有効にしておくとvimが多少使いやすくなります。
syntax on set number if has("autocmd") filetype plugin indent on endif
3.1 環境変数の設定
sudo vi .env
で環境変数を記述しておくためのenvファイルを作成します。
以下の環境変数を.envファイルに書き込みます。Developer Centerで作成したOAuthクライアントのクライアントID、クライアントシークレット、スコープとIoT Explorerで作成したチャネルのチャネルIDを入力します。
CLIENT_ID=(Developer Centerで作成したクライアントのクライアントID) CLIENT_SECRET=(Developer Centerで作成したクライアントのクライアントシークレット) SCOPE=messaging.publish+messaging.subscribe+datastore.download+collection.data.read+collection.data.write AUTH_BODY=grant_type=client_credentials&client_id=${CLIENT_ID}&client_secret=${CLIENT_SECRET}&scope=${SCOPE} CHANNEL_ID=(IoT Explorerで作成したチャネルのチャネルID) COLLECTION_ID=(コレクション名) # Collection APIを使う場合 RESOURCE_OWNER_ID=(リソースオーナーID) # Collection APIを使う場合 SERIES_ID=(時系列データの識別ID) # Collection APIを使う場合 AUTH_URL=https://auth.pre.cios.dev/connect/token MESSAGING_URL=https://messaging.preapis.cios.dev/v2/messaging # Messaging APIを使う場合 DATASTORE_URL=https://messaging.preapis.cios.dev/v2/datastore/channels/${CHANNEL_ID}/object_latest # Messaging APIを使う場合 COLLECTION_URL=https://collections.preapis.cios.dev/v2/collections/${COLLECTION_ID} # Collection APIを使う場合
3.2 センサーデータ送信プログラム作成
PythonでCIOSの作成したチャネルにセンサーデータを送信するプログラムを作成します。
sudo vi sample.py
などでファイルを作成します。
以下にWi-FiのSSID、利用周波数、伝送レート、受信電力値をセンサーデータとするサンプルコードを載せています。インターバルで設定した秒数毎にsudo iwconfig wlan0
を実行してセンサーデータとし、APIを叩いてチャネルにセンサーデータを登録しています。
Messaging APIを使う場合には、コメントアウトしてあるMessaging APIの箇所を解除してCollection APIの箇所をコメントアウトして実行してください。
import time import datetime import subprocess as spc import requests import json import os from dotenv import load_dotenv import jwt class Sensor(): def __init__(self): load_dotenv() self.authBody = os.getenv('AUTH_BODY') self.authUrl = os.getenv('AUTH_URL') self.accessToken = '' # --- Messaging APIに必要なパラメータ --- self.messagingUrl = os.getenv('MESSAGING_URL') self.datastoreUrl = os.getenv('DATASTORE_URL') self.channelId = os.getenv('CHANNEL_ID') # --- Collection APIに必要なパラメータ --- self.collectionUrl = os.getenv('COLLECTION_URL') self.seriesId = os.getenv('SERIES_ID') self.resourceOwnerId = os.getenv('RESOURCE_OWNER_ID') self.timeRange = self.makeTimeRange() # センサーデータを整形して投げる def postWlanData(self): result = spc.check_output(["sudo", "iwconfig", "wlan0"]) list = result.split() ssid = list[3][7:len(list[3])-1].decode() frequency = list[5][10:len(list[5])].decode() rate = list[11][5:len(list[11])].decode() rssi = list[29][6:len(list[29])].decode() now = time.time() * 1000 body = {'measurements': {'ssid': ssid, 'frequency': float(frequency), 'rate': float(rate), 'rssi': float(rssi)}, 'series_id': self.seriesId, 'timestamp': int(now)} if self.checkExpireTime(self.accessToken): self.accessToken = self.getAccessToken() ## Messaging APIを使う場合 # self.postMessaging(body) # self.getDatastore() ## Collection APIを使う場合 self.postCollection(body) self.getCollection() # データ取得期間の設定 def makeTimeRange(self): now = datetime.datetime.now() todayStart = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) todayEnd = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) startTime = int(time.mktime(todayStart.timetuple()) * 1000) endTime = int(time.mktime(todayEnd.timetuple()) * 1000) timeRange = str(startTime) + ':' + str(endTime) return timeRange # AccessToken取得 def getAccessToken(self): body = self.authBody header = {'Content-Type': 'application/x-www-form-urlencoded'} response = requests.post(self.authUrl, body, headers=header) resDic = json.loads(response.text) return resDic['access_token'] # アクセストークンの期限を確認 def checkExpireTime(self, accessToken): if len(accessToken) == 0: return True else: token = jwt.decode(accessToken, verify=False) if time.time() < token['exp']: return False else: return True # Messaging APIでセンサーデータを登録 def postMessaging(self, body): header = {'Authorization': 'Bearer ' + self.accessToken} response = requests.post(self.messagingUrl+'?channel_id='+self.channelId, json=body, headers=header) print('--- Post Sensor Data ---') print(body) print('------------------------') # Collection APIでセンサーデータを登録 def postCollection(self, body): url = self.collectionUrl + '/series' header = {'Authorization': 'Bearer ' + self.accessToken} params = {'resource_owner_id': self.resourceOwnerId} response = requests.post(url, json=body, headers=header, params=params) print('--- Post Sensor Data ---') print(body) print('------------------------') # Messaging APIでセンサーデータを取得 def getDatastore(self): header = {'Authorization': 'Bearer ' + self.accessToken} response = requests.get(self.datastoreUrl, headers=header) print('--- Get Datastore ---') print(response.text) print('---------------------') # Collection APIでセンサーデータを取得 def getCollection(self): url = self.collectionUrl + '/series/' + self.seriesId header = {'Authorization': 'Bearer ' + self.accessToken} params = {'resource_owner_id': self.resourceOwnerId, 'time_range': self.timeRange} response = requests.get(url, headers=header, params=params) print('--- Get Collection ---') print(response.text) print('----------------------') # Main postInterval = 10 #[s] raspberryPi = Sensor() while True: raspberryPi.postWlanData() time.sleep(postInterval)
3.3 起動時の実行
Raspberry Piを起動時にsample.py
の実行を始めたい場合、systemdを設定します。/etc/systemd/system
にsampleboot.service
を作成します。
以下に簡易なsystemdのサンプルコードを載せています。ネットワークの後にsample.py
を起動しています。
[Unit] Description=sample.py activation After=network.target [Service] ExecStart=/usr/bin/python3 /home/cios_rshell/sample.py [Install] WantedBy=multi-user.target
sudo systemctl enable sampleboot.service
を実行してsampleboot.service
を有効化し、sudo systemctl start sampleboot.service
で開始します。
4. センサーデータの確認
Collection APIでデータを取得する度に登録したデータが追加されていることが確認できました。
Collection APIを使うと一定期間のデータがまとめて取得できるので、グラフなどで可視化する時に便利ですね。
5. 最後に
今回はCollection APIを使って、CIOSにデータのアップロードとアップロードの確認をしました。
次回こそはセンサーデータの可視化方法について紹介したいと思います。
オプティムではCIOSの開発を行うエンジニアを募集しています。