はじめに
本記事は、C#でMqttクライアント(M2Mqtt)を実装した時のメモ書きです。
今回は接続、切断などの基本的な部分の確認です。
M2Mqttライブラリをインストールするところから順番に確認していきます。
broker(サーバー側)はmosquittoを使います。
以下で紹介するコードの環境はWindows10でWPFアプリ(.Net Frameworl)を使っています。
mosquittoブローカー起動
インストールは以下から。
今回はWindows10なのでWindows64bit版をインストールします。
インストール後は、ディレクトリの移動
cd C:\Program Files\mosquitto
移動後、mosquittto起動
mosquitto.exe -v
起動に成功した場合、以下のような表示が続きに表示されます。
1680622327: mosquitto version 2.0.15 starting
1680622327: Using default config.
1680622327: Starting in local only mode. Connections will only be possible from clients running on this machine.
1680622327: Create a configuration file which defines a listener to allow remote access.
1680622327: For more details see https://mosquitto.org/documentation/authentication-methods/
1680622327: Opening ipv4 listen socket on port 1883.
1680622327: Opening ipv6 listen socket on port 1883.
1680622327: mosquitto version 2.0.15 running
ということでブローカー(サーバー側)の用意はこれで完了です。
次は、WPFでM2Mqttライブラリをインストールして接続する処理などを用意してきます。
プロジェクト作成
VisualStudioで新規の「WPFアプリ(.Net Frameworl)」を作成します。
M2Mqttインストール
NuGetの管理コンソールを開いて、「M2Mqtt」で検索しインストールを行います。
MqttClientクラス作成の基本構成
まずは、MqttClientクラスを作成。
クラス名は「MqttListener」としておきます。(適当なので各々クラス名は変更して)
※名前空間の追加忘れないでください(using uPLibrary.Networking.M2Mqtt;)
using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace WpfApp1
{
public class MqttListener
{
//Mqttクライアント
private MqttClient _client;
}
}
今回はMyMqttClientクラスに以下のメソッドを用意していきます。
- Connect()・・・接続
- Disconnect()・・・切断
- Subscribe()・・・受信
- Publish()・・・送信
- AddTopic()・・・トピック追加
- RemoveTopic()・・・トピック削除
- CheckConnectStatus()・・・接続状態確認
Connect()とDisconnect()
接続
/// <summary>
/// 接続
/// </summary>
/// <param name="host">ホスト名</param>
/// <param name="port">ポート番号</param>
/// <param name="username">ユーザー名</param>
/// <param name="userpass">ユーザーパス</param>
public bool Connect(string host, int port, string username, string userpass)
{
try
{
_client = new MqttClient(host, port, false, null, null, MqttSslProtocols.None);
if (_client == null)
{
return false;
}
byte ret = _client.Connect(Guid.NewGuid().ToString(), username, userpass);
//受信ハンドラ追加
//MqttReceived()は後述
_client.MqttMsgPublishReceived += MqttReceived;
return true;
}
catch
{
return false;
}
}
とりあえずここでは、接続までです。
受信とトピックの追加はあとで行います。
切断
/// <summary>
/// 切断
/// </summary>
public void Disconnect()
{
if (_client != null)
{
_client.Disconnect();
_client = null;
}
}
Subscribe()
受信したメッセージを処理する関数です。
/// <summary>
/// 受信ハンドラ
/// </summary>
private void MqttReceived(object sender, MqttMsgPublishEventArgs e)
{
string topic = e.Topic;
string message = Encoding.ASCII.GetString(e.Message);
//メッセージを使った処理
Console.WriteLine($"[topic]{topic} [msg]{message}");
}
AddTopic()とRemoveTopic()
トピック追加するときの気を付ける箇所が1つあります。
このあと、トピック追加するときにMqttQosというものを設定します。
MqttQosとは?
Mqttの通信の品質のことです。3段階あります。
- qos0
- qos1
- qos2
この後、利用するため簡単に以下で説明します。
Qos0
送信する回数は1回のみ(エラーで届かない可能性もある)。
メッセージが届くことが保証されていません。
Qos1
エラーなどでメッセージが届かなかった場合に再送される仕組みが含まれている
送信側はメッセージを送ると、受信側からACKが届くのを待つ。
メッセージは少なくとも1回は届く。
但し、重複チェックの機能はないため複数回メッセージが届いてしまう可能性もある。
Qos2
メッセージが必ず1回届く。そのために再送と重複チェックを行う仕組みが含まれている。
Qosまとめ
再送処理 | 重複チェック | m2mqtt | |
Qos0 | × | × | MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE |
Qos1 | 〇 | × | MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE |
Qos2 | 〇 | 〇 | MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE |
状況によって使い分けてください。
登録トピックの追加
ここでは、トピックの追加とあわせて、先ほど説明したMqttQosの設定も併せて行います。
以下では、qos0(MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE)をセットしています。
/// <summary>
/// トピック追加
/// </summary>
/// <param name="topic">追加トピック</param>
public bool AddTopic(string topic)
{
if (_client == null)
{
return false;
}
_client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
return true;
}
登録トピックの削除
/// <summary>
/// トピック削除
/// </summary>
/// <param name="topic">削除トピック</param>
public bool RemoveTopic(string topic)
{
if (_client == null)
{
return false;
}
var ret = _client.Unsubscribe(new string[] { topic });
return true;
}
Publish()
/// <summary>
/// 送信
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool Publish(string topic, string message)
{
if (_client == null)
{
return false;
}
_client.Publish(topic, Encoding.UTF8.GetBytes(message));
return true;
}
接続状態確認
/// <summary>
/// 接続状態チェック
/// </summary>
/// <returns></returns>
public bool CheckConnectStatus()
{
return _client.IsConnected;
}
MqttListenerクラス全体
上記で説明したものをまとめたものです。
例外処理やエラー処理などはほぼ入れていませんので利用される場合にはそれらを追加してください。
using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace WpfApp1
{
public class MqttListener
{
//Mqttクライアント
private MqttClient _client;
/// <summary>
/// コンストラクタ
/// </summary>
public MqttListener()
{
//処理なし
}
/// <summary>
/// 接続
/// </summary>
/// <param name="host">ホスト名</param>
/// <param name="port">ポート番号</param>
/// <param name="username">ユーザー名</param>
/// <param name="userpass">ユーザーパス</param>
public bool Connect(string host, int port, string username, string userpass)
{
try
{
_client = new MqttClient(host, port, false, null, null, MqttSslProtocols.None);
if (_client == null)
{
return false;
}
byte ret = _client.Connect(Guid.NewGuid().ToString(), username, userpass);
//受信ハンドラ追加
//MqttReceived()は後述
_client.MqttMsgPublishReceived += MqttReceived;
return true;
}
catch
{
return false;
}
}
/// <summary>
/// 切断
/// </summary>
public void Disconnect()
{
if (_client != null)
{
_client.Disconnect();
_client = null;
}
}
/// <summary>
/// 受信ハンドラ
/// </summary>
private void MqttReceived(object sender, MqttMsgPublishEventArgs e)
{
string topic = e.Topic;
string message = Encoding.ASCII.GetString(e.Message);
//メッセージを使った処理
Console.WriteLine($"[topic]{topic} [msg]{message}");
}
/// <summary>
/// トピック追加
/// </summary>
/// <param name="topic">追加トピック</param>
public bool AddTopic(string topic)
{
if (_client == null)
{
return false;
}
_client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
return true;
}
/// <summary>
/// トピック削除
/// </summary>
/// <param name="topic">削除トピック</param>
public bool RemoveTopic(string topic)
{
if (_client == null)
{
return false;
}
var ret = _client.Unsubscribe(new string[] { topic });
return true;
}
/// <summary>
/// 送信
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool Publish(string topic, string message)
{
if (_client == null)
{
return false;
}
_client.Publish(topic, Encoding.UTF8.GetBytes(message));
return true;
}
}
}
m2mqtt example
上記で紹介、作成したMqttListenerクラスの利用サンプルです。
bool ret;
//オブジェクト作成
MqttListener listener = new MqttListener();
//接続
//※ホスト名のところで指定しているIPアドレスなどは任意に書き換え
ret = listener.Connect("192.168.10.242", 1883, "username", "userpass");
if (!ret)
{
//接続失敗
return;
}
//トピック追加
listener.AddTopic("testA");
//トピックさらに追加
listener.AddTopic("testB");
//トピック削除したい
listener.RemoveTopic("testB");
//接続が切れるとループを抜ける
while (listener.CheckConnectStatus())
{
}
//切断
listener.Disconnect();
Mqttブローカーとの切断を検出
方法は2つあります。
1つは接続から別スレッドで行い、whileでIsConnectedプロパティをチェックする。
もう1つは、タイマー処理などで指定秒毎にIsConnectedプロパティをチェックする。
1つ目の別スレッドで接続して切断するまでの処理を行うのが無難かな…
【追記】MQTTnetライブラリの使い方
m2mqttライブラリの他にMQTTnetライブラリというものもありました。そちらでは非同期処理なども対応されているため個人的にはおすすめです。
コメント