【ラズパイ x IoTネットワーク】MQTTをローカルネットワーク間で通信してみる・その2 〜 ブローカーの構築と動作確認
※ 当ページには【広告/PR】を含む場合があります。
2020/09/11
ただし、パブリッシャー兼ブローカーとして利用してたので、クライアントからの双方通信という形にはなっていませんでした。
今回はブローカーはブローカーのみの機能を持たせた独立したMQTTサーバーとして立てておいて、そのブローカーを通じて2つのクライアント間のMQTT通信させるやり方を検証します。
自宅でできるMQTTネットワークの構築
今回のMQTTネットワークの構成は以下のようになります。

前回の内容からラズパイをもう一つ増やして、これをMQTTサーバーとして利用します。 もちろん余分なラズパイがない場合には、別のパソコンでも利用できます。
図のように今回のMQTTでは、ブローカー(MQTTサーバー)を介してメッセージをやり取りします。 とりあえず通信の方向から、送信者側のクライアントをパブリッシャー、受信側のクライアントをサブスクライバーと呼んで区別します。
通常MQTTクライアントはパブリッシャーにもサブスクライバーの両方になることができます。
MQTTネットワーク内にいる複数のパブリッシャーやサブスクライバーは、トピックという電文につけた文字列を使って自分の欲しいトピックを探します。

上の図の例ではパブリッシャーAがhogeというトピック名のメッセージを送信した時、サブスクライバーBはhogeというトピックには興味がない(piyoが欲しい)のでhogeは受け取りません。 hogeに興味があるのはサブスクライバーAなので、hogeを含むメッセージはそちらに流れます。
他方、サブスクライバーBの欲しいpiyoを含むメッセージはパブリッシャーBから送信されますので、そちらはサブスクライバーBに流され、サブスクライバーAはスキップされます。
ということで、ブローカーはトピックに応じてパブリッシャーのメッセージを仕分けして、サブスクライバーの興味のあるトピックに合わせて転送する役割を持ったサーバーといえます。
開発環境 vs. 本番環境
本ブログ内容は自宅のローカルなインターネット環境で構築して無料のMQTT開発環境を構築するのが狙いですが、実際にはIoT専用の保護機能のない家庭内ネットワークはセキュリティ的に不安な面が多いのが実情です。
よって最終的な本番環境(=プロダクト)というのは、よりセキュアなパブリッククラウド上にMQTTサーバー機能を構築することを当初から意識しておいた方が良いと思います。
ほとんどの場合にはファイヤーウォールなどの設定を除くと、ローカルネットワークで開発・検修したものをそのままプロダクトとして利用できるはずです。 最初はセキュリティーのことはあまり気にせず、MQTTで動くモノを作ることに専念しましょう。
Moscaを使う
前回は
mosquitto
Mosca
※注: 2020/9月時点でMoscaの開発は残念ながら終了しています。
Moscaは
インストール
先程も述べたように既に
Mosca
Mosca2.0.0+ targets node v6, v4 and v0.12
インストールはブローカーとして扱う個体に、npmで以下のように行います。
$ npm install mosca --save
ブローカーの実装
まずはブローカーの準備から行います。
以下のようなファイルで
broker.js
const mosca = require('mosca');
const server = new mosca.Server({
port: 1883,
});
server.on('ready', function() {
console.log('Server is ready.');
});
server.on('clientConnected', function(client) {
console.log('Client connected.', 'Client ID:', client.id);
});
server.on('clientDisconnected', function(client) {
console.log('Client disconnected.', 'Client ID:', client.id);
});
server.on('subscribed', function(topic, client) {
console.log('Client subscribed.', 'Client ID:', client.id, ', TOPIC:', topic);
});
server.on('unsubscribed', function(topic, client) {
console.log('Client unsubscribed.', 'Client ID:', client.id);
});
server.on('published', function(packet, client) {
if (/\/new\//.test(packet.topic) || /\/disconnect\//.test(packet.topic)){
return;
}
console.log('Client published.', 'C ID:', client.id);
});
const http = require('http');
const httpServer = http.createServer();
server.attachHttpServer(httpServer);
const hostname = '192.168.0.200'; //👈サーバー機のIPアドレス
const port = 3000;
httpServer.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`);
});
まず
new mosca.Server()
基本的には
server.on
ready:
ブローカーが立ち上がったときの処理
clientConnected:
クライアントが接続した場合に処理
clientDisconnected:
クライアントが切断された時に処理
subscribed:
サブスクライバーが購読をリクエストした時の処理
第二引数にとる関数には、トピックを含める
unsubscribed:
サブスクライバーが購読を取り止める時に処理
第二引数にとる関数には、トピックを含める
published:
パブリッシャーがメッセージをブローカーに送信した時の処理
第二引数にとる関数には、メッセージパケットを含める
この場合トピックはpacket.topicから取り出し可能
サーバーの起動
先程の
broker.js
const http = require('http');
const httpServer = http.createServer();
server.attachHttpServer(httpServer);
const hostname = '192.168.0.200'; //👈サーバー機のIPアドレス
const port = 3000;
httpServer.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`);
});
の部分に着目します。
この時
server
attachHttpServer()
httpServer
ここで指定しているホストのIP
192.168.0.200
今回のコードでは、MQTTサーバーの外部へのサービスポートとして、3000番を指定して
httpServer.listen()
さっそくブローカーの起動を行います。 サーバー機のコンソールから以下を実行します。
$ node broker.js
Server running at http://192.168.0.200:3000/
Server is ready.
これでブラウザのパブリッシャーでメッセージ送信すると、ブラウザのサブスクライバーでメッセージ受信するはずです。
パブリッシャーとサブスクライバーの実装
サブスクライバー
先にサブスクライバー側として利用する個体からブローカーに接続してみます。
前回の
subscrive.js
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://192.168.0.200:3000');
const topic = 'hoge/piyo/fuga';
client.on('connect', function() {
console.log('subscriber connected.');
});
client.subscribe(topic, function(err, granted) {
console.log('subscriber subscribed.');
});
client.on('message', function(topic_, message) {
console.log('subscriber received topic:', topic_, 'message:', message.toString());
});
...前とほとんど同じですが、MQTTクライアントの接続先が
mqtt://192.168.0.200
mqtt://192.168.0.200:3000
これは先程注意点に挙げた、MQTTのサービスポート番号を追記する必要があるためです。
サブスクライバーを起動してみます。
$ node subscrive.js
subscriber subscribed.
subscriber connected.
正常ならば、ブローカーに接続されているレスポンスは返ってきます。
パブリッシャー
パブリッシャー側の個体からも設定していきます。
こちらも前回とほぼ同じソースコードです。
MQTTクライアントの接続先が
mqtt://192.168.0.200:3000
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://192.168.0.200:3000');
const message = 'SAY HELLO TO MQTT';
const topic = 'hoge/piyo/fuga'
client.on('connect', function() {
console.log('publisher connected.');
client.publish(topic, message);
console.log('send topic:', topic, ', message:', message);
});
パブリッシャーを起動・実行します。
$ node pubish.js
publisher connected.
send topic: hoge/piyo/fuga , message: SAY HELLO TO MQTT
正常にサーバーに接続したら、メッセージが送信されています。
ブローカーの動作
参考までにですが、データのまとめ役になっているブローカーが監視している状態になっているので、クライアントの動作履歴がコンソールの出力は以下のように出力されていました。
$ node broker.js
Server running at http://192.168.0.200:3000/
Server is ready.
Client connected. Client ID: mqttjs_cc488bf5
Client subscribed. Client ID: mqttjs_cc488bf5 , TOPIC: hoge/piyo/fuga
Client connected. Client ID: mqttjs_8be2d518
Client published. C ID: mqttjs_8be2d518
Client disconnected. Client ID: mqttjs_8be2d518
Client unsubscribed. Client ID: mqttjs_cc488bf5
Client disconnected. Client ID: mqttjs_cc488bf5
まとめ
今回はmosquittoを使わずオールnode.js体制で、パブリッシャー/サブスクライバー/ブローカーを構築しました。 ブローカーもnode.jsでサーバー化できたことにより、開発者がカスタマイズできる機能が増えて、MQTTでできる応用の幅が色々と広がりそうです。
今回はMoscaを利用しましたが、次回はAedesの方へ移行する記事を特集します。