【ラズパイ x IoTネットワーク】MQTTをローカルネットワーク間で通信してみる・その2 〜 ブローカーの構築と動作確認


2020/09/11

前回までは、パブリッシャーからサブスクライバーまでで最低限のMQTTによる通信を解説しました。

ただし、パブリッシャー兼ブローカーとして利用してたので、クライアントからの双方通信という形にはなっていませんでした。

今回はブローカーはブローカーのみの機能を持たせた独立したMQTTサーバーとして立てておいて、そのブローカーを通じて2つのクライアント間のMQTT通信させるやり方を検証します。


自宅でできるMQTTネットワークの構築

今回のMQTTネットワークの構成は以下のようになります。

合同会社タコスキングダム|蛸壺の中の工作室

前回の内容からラズパイをもう一つ増やして、これをMQTTサーバーとして利用します。もちろん余分なラズパイがない場合には、別のパソコンでも利用できます。

図のように今回のMQTTでは、ブローカー(MQTTサーバー)を介してメッセージをやり取りします。とりあえず通信の方向から、送信者側のクライアントをパブリッシャー、受信側のクライアントをサブスクライバーと呼んで区別します。

通常MQTTクライアントはパブリッシャーにもサブスクライバーの両方になることができます。

MQTTネットワーク内にいる複数のパブリッシャーやサブスクライバーは、トピックという電文につけた文字列を使って自分の欲しいトピックを探します。

合同会社タコスキングダム|蛸壺の中の工作室

上の図の例ではパブリッシャーAがhogeというトピック名のメッセージを送信した時、サブスクライバーBはhogeというトピックには興味がない(piyoが欲しい)のでhogeは受け取りません。hogeに興味があるのはサブスクライバーAなので、hogeを含むメッセージはそちらに流れます。

他方、サブスクライバーBの欲しいpiyoを含むメッセージはパブリッシャーBから送信されますので、そちらはサブスクライバーBに流され、サブスクライバーAはスキップされます。

ということで、ブローカーはトピックに応じてパブリッシャーのメッセージを仕分けして、サブスクライバーの興味のあるトピックに合わせて転送する役割を持ったサーバーといえます。

開発環境 vs. 本番環境

本ブログ内容は自宅のローカルなインターネット環境で構築して無料のMQTT開発環境を構築するのが狙いですが、実際にはIoT専用の保護機能のない家庭内ネットワークはセキュリティ的に不安な面が多いのが実情です。

よって最終的な本番環境(=プロダクト)というのは、よりセキュアなパブリッククラウド上にMQTTサーバー機能を構築することを当初から意識しておいた方が良いと思います。

ほとんどの場合にはファイヤーウォールなどの設定を除くと、ローカルネットワークで開発・検修したものをそのままプロダクトとして利用できるはずです。最初はセキュリティーのことはあまり気にせず、MQTTで動くモノを作ることに専念しましょう。


Moscaを使う

Moscaは、Node.jsで使用できるブローカー用の高機能ライブラリです。

前回は
mosquittoを利用してLinux内で常駐するデーモンとして利用してきましたが、javascriptコードからMQTTサーバーを柔軟に設定・起動したい場合にはMoscaを使って拡張することができます。

※注: 2020/9月時点でMoscaの開発は残念ながら終了しています。

Moscaは
Aedesというより進化した別のプロジェクトへ移行しました。ということで、この記事の鮮度は古くなるのですが、Aedesは使い方がMoscaより複雑化しており、また別の機会に説明したいとおもいます。

インストール

先程も述べたように既にMosca自体が古いので、現行のnode環境では使えなくなりつつあります。公式にも説明書きがあるように、Mosca2.0.0+ targets node v6, v4 and v0.12ですが、すでに依存パッケージのいくつかでnode-gypのエラーが発生して動かなくなっているので要注意です。

インストールはブローカーとして扱う個体に、npmで以下のように行います。

            $ npm install mosca --save
        

ブローカーの実装

まずはブローカーの準備から行います。

以下のようなファイルで
broker.jsという名前で中身の編集/保存しておきます。

            const mosca = require('mosca');
const server = new mosca.Server({
    port: 1883,
});

server.on('ready', function() {
    console.log('Server is ready.');
});

server.on('clientConnected', function(client) {
    console.log('Client connected.', 'Client ID:', client.id);
});

server.on('clientDisconnected', function(client) {
    console.log('Client disconnected.', 'Client ID:', client.id);
});

server.on('subscribed', function(topic, client) {
    console.log('Client subscribed.', 'Client ID:', client.id, ', TOPIC:', topic);
});

server.on('unsubscribed', function(topic, client) {
    console.log('Client unsubscribed.', 'Client ID:', client.id);
});

server.on('published', function(packet, client) {
    if (/\/new\//.test(packet.topic) || /\/disconnect\//.test(packet.topic)){
        return;
    }
    console.log('Client published.', 'C ID:', client.id);
});

const http = require('http');
const httpServer = http.createServer();
server.attachHttpServer(httpServer);

const hostname = '192.168.0.200'; //👈サーバー機のIPアドレス
const port = 3000;
httpServer.listen(port, hostname, () => {
    console.log(`Server running at http://${hostname}:${port}/`);
});
        
まずnew mosca.Server()でインスタンス生成します。ポート番号はこの場合に任意に割り振ることができます。

基本的には
server.onメソッドの第一引数で各コールバック関数の処理を記述していきます。コールバックの固有名と機能は以下のようになります。

            ready:
    ブローカーが立ち上がったときの処理
clientConnected:
    クライアントが接続した場合に処理
clientDisconnected:
    クライアントが切断された時に処理
subscribed:
    サブスクライバーが購読をリクエストした時の処理
    第二引数にとる関数には、トピックを含める
unsubscribed:
    サブスクライバーが購読を取り止める時に処理
    第二引数にとる関数には、トピックを含める
published:
    パブリッシャーがメッセージをブローカーに送信した時の処理
    第二引数にとる関数には、メッセージパケットを含める
    この場合トピックはpacket.topicから取り出し可能
        

サーバーの起動

先程のbroker.jsのコードの下の方にあった、

            const http = require('http');
const httpServer = http.createServer();
server.attachHttpServer(httpServer);

const hostname = '192.168.0.200'; //👈サーバー機のIPアドレス
const port = 3000;
httpServer.listen(port, hostname, () => {
    console.log(`Server running at http://${hostname}:${port}/`);
});
        
の部分に着目します。

この時
serverは、MQTTサーバのインスタンスです。attachHttpServer()でHTTP サーバのインスタンスhttpServerと結びつけます。

ここで指定しているホストのIP
192.168.0.200は、著者の手元のサーバーに割り振っているアドレス値なので、ここはお手元のサーバー機のアドレスに変更する必要があります。

今回のコードでは、MQTTサーバーの外部へのサービスポートとして、3000番を指定して
httpServer.listen()で外部に公開しています。指定するポート番号はクライアント側に晒しているポートですので、パブリッシャー・サブスクライバー側に実装するコードとも整合させないといけませんのでご注意ください。

さっそくブローカーの起動を行います。サーバー機のコンソールから以下を実行します。

            $ node broker.js
Server running at http://192.168.0.200:3000/
Server is ready.
        
これでブラウザのパブリッシャーでメッセージ送信すると、ブラウザのサブスクライバーでメッセージ受信するはずです。


パブリッシャーとサブスクライバーの実装

前回の内容で紹介したソースコードを少し手直しして利用します。

サブスクライバー

先にサブスクライバー側として利用する個体からブローカーに接続してみます。

前回の
subscrive.jsを以下のように手直しします。

            const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://192.168.0.200:3000');
const topic = 'hoge/piyo/fuga';

client.on('connect', function() {
    console.log('subscriber connected.');
});

client.subscribe(topic, function(err, granted) {
    console.log('subscriber subscribed.');
});

client.on('message', function(topic_, message) {
    console.log('subscriber received topic:', topic_, 'message:', message.toString());
});
        
...前とほとんど同じですが、MQTTクライアントの接続先がmqtt://192.168.0.200からmqtt://192.168.0.200:3000が変わっています。

これは先程注意点に挙げた、MQTTのサービスポート番号を追記する必要があるためです。

サブスクライバーを起動してみます。

            $ node subscrive.js
subscriber subscribed.
subscriber connected.
        
正常ならば、ブローカーに接続されているレスポンスは返ってきます。

パブリッシャー

パブリッシャー側の個体からも設定していきます。

こちらも前回とほぼ同じソースコードです。

MQTTクライアントの接続先が
mqtt://192.168.0.200:3000に変わっただけです。

            const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://192.168.0.200:3000');

const message = 'SAY HELLO TO MQTT';
const topic = 'hoge/piyo/fuga'

client.on('connect', function() {
    console.log('publisher connected.');
    client.publish(topic, message);
    console.log('send topic:', topic, ', message:', message);
});
        
パブリッシャーを起動・実行します。

            $ node pubish.js
publisher connected.
send topic: hoge/piyo/fuga , message: SAY HELLO TO MQTT
        
正常にサーバーに接続したら、メッセージが送信されています。

ブローカーの動作

参考までにですが、データのまとめ役になっているブローカーが監視している状態になっているので、クライアントの動作履歴がコンソールの出力は以下のように出力されていました。

            $ node broker.js
Server running at http://192.168.0.200:3000/
Server is ready.
Client connected. Client ID: mqttjs_cc488bf5
Client subscribed. Client ID: mqttjs_cc488bf5 , TOPIC: hoge/piyo/fuga
Client connected. Client ID: mqttjs_8be2d518
Client published. C ID: mqttjs_8be2d518
Client disconnected. Client ID: mqttjs_8be2d518
Client unsubscribed. Client ID: mqttjs_cc488bf5
Client disconnected. Client ID: mqttjs_cc488bf5
        


まとめ

今回はmosquittoを使わずオールnode.js体制で、パブリッシャー/サブスクライバー/ブローカーを構築しました。ブローカーもnode.jsでサーバー化できたことにより、開発者がカスタマイズできる機能が増えて、MQTTでできる応用の幅が色々と広がりそうです。

今回はMoscaを利用しましたが、次回はAedesの方へ移行する記事を特集します。

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

電子工作を身近に知っていただけるように、材料調達からDIYのハウツーまで気になったところをできるだけ細かく記事にしてブログ配信してます。