【C#,WPF】MQTTクライアントの基本(MQTTnet)

.Net FrameWork

今回の記事はC#でMQTTプロトコルについてです。
環境は.NetFramewordk4.8です。
WPFを使用しました。
以前にC#のM2MQTTライブラリを使ってMQTT通信について書きました。

しかし、上記の記事を書いた後しばらく利用していたらいくつか困ったことがありました。

M2MQTTライブラリについて感想

M2MQTTライブラリを使った感想です。
▶ Mqttブローカー(サーバー側)へ接続する処理で非同期処理がない
▶ 接続失敗時のタイムアウト処理に関する時間設定ができない
▶ ライブラリがしばらく更新されていない(2023年に見た時)

上記理由から他に利用できそうなライブラリがないのか?と思い見つけたのが以下で説明するMQTTnetライブラリです。

MQTTnetライブラリの特徴

以下の理由でMQTTnetライブラリに決めました。
▶ MQTTに関する処理が非同期処理に対応している
▶ クライアント側は再接続処理が可能?
▶ 2023年現在も更新されているためMQTTバージョン5.0まで対応

上記の理由から今回はMQTTnetを使ってみます。
MQTTnetではAmazonAWSへの接続やセキュリティー面を考慮したTLSを使った接続など用意されてます。

事前準備

nugetから「mqtt」を検索して「MQTTnet」をインストール
※MITライセンスなので商用利用も可能です。

MQTTnetライブラリの各API

今回は、最も簡単な通常接続(セキュリティーなど考慮しない)のサンプルです。
MQTTのバージョンは3.1.1を使います。

クライアントの生成

MqttFactory()クラスを使ってMQTTクライアントを作成します。

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;
var _mqttfactory = new MqttFactory();
var _client             = _mqttfactory.CreateMqttClient();

クライアントオプション

接続処理を実行する前にMQTTクライアントのオプション設定を行います。(必要最低限です。)

//設定読み込み
//クライアントハンドル作成
//ホスト名とポート番号を設定
//ユーザー名とパスワード
var mqttoptions = new MqttClientOptionsBuilder()
        .WithClientId(Guid.NewGuid().ToString()) 
        .WithTcpServer(host, port)
        .WithCredentials("username", "password");
        .Build();
ポイント1

接続失敗時のタイムアウト設定
以下では10秒でタイムアウトするように設定しています。

mqttoptions.WithTimeout(TimeSpan.FromSeconds(10));
注意1

オプション設定時にMQTTバージョン3.1.1に対応していないものもあるので注意。
以下参考例

//MQTTversion5で対応
mqttoptions.WithUserProperty("name", "pass");

ユーザー情報をセットできると思い利用したが、例外発生して接続できずになるため注意。

イベントハンドラをセット

いくつかのイベントハンドラ処理を紹介します。
サンプルソースコードは後述。

イベント関数役割
_client.ConnectedAsync接続成功後のイベントハンドラ処理
_client.DisconnectedAsync切断後のイベントハンドラ処理
_client.ApplicationMessageReceivedAsyncメッセージ受信のイベントハンドラ処理

接続処理

ここまでで、MQTTクライアントのオプションとイベントハンドラの説明はできたので、次に接続処理についてです。

try
{
        //接続処理
        await _client.ConnectAsync(mqttoptions, CancellationToken.None);
}
catch
{
        //ブローカーが起動していない場合、例外発生.
        Console.WriteLine("### CONNECTING FAILED (SERVER ERROR) ###");
}
補足

CancellationTokenとは?
上記ではNoneとしていますが、セットすることも可能です。
非同期処理をキャンセルするための仕組みです。

切断

//切断処理
var mqttDisconnectOptions = _mqttfactory.CreateClientDisconnectOptionsBuilder();
await _client.DisconnectAsync(mqttDisconnectOptions.Build(), CancellationToken.None);
ポイント

切断処理のみだけであれば再接続可能です。
Dispose()まで行ってしまうとオブジェクト破棄になるため再接続処理はできません。

オブジェクト使用後はDisposeも忘れずに

//オブジェクト破棄
_client.Dispose();

MqttClientListenerクラスを実装してみた

上記で説明したイベントハンドラは以下のクラスでまとめて使用しているので以下を見て下さい。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;
using System.Threading;
using System.Runtime.CompilerServices;
namespace mqtt_sample.Models
{
    public class MqttClientListener
    {
        public string host { get; set; }
        public int port { get; set; }
        public double timeout_sec { get; set; }
        public string topic { get; set; }
        private IMqttClient _client;
        private MqttFactory _mqttfactory;
        //コンストラクタ
        public MqttClientListener()
        {
            //デフォルト設定
            host           = "localhost";
            port           = 1883;
            timeout_sec    = 10;
            topic          = "#";
            //client生成
            _mqttfactory = new MqttFactory();
            _client = _mqttfactory.CreateMqttClient();
        }
        public async Task Connect()
        {
            //設定読み込み
            var mqttoptions = new MqttClientOptionsBuilder();
            mqttoptions.WithClientId(Guid.NewGuid().ToString());
            mqttoptions.WithTcpServer(host, port);
            mqttoptions.WithTimeout(TimeSpan.FromSeconds(timeout_sec));           
            //イベントハンドラセット
            _client.ConnectedAsync                  += ConnectedAsync;
            _client.DisconnectedAsync               += DisconnectedAsync;
            _client.ApplicationMessageReceivedAsync += ApplicationMessageReceivedAsync;
            
            try
            {
                //接続処理
                await _client.ConnectAsync(mqttoptions.Build(), CancellationToken.None);
            }
            catch
            {
                //ブローカーが起動していない場合、例外発生.
                Console.WriteLine("### CONNECTING FAILED (SERVER ERROR) ###");
            }
        }
        
        public async Task  Disconnect()
        {
            //切断処理
            var mqttDisconnectOptions = _mqttfactory.CreateClientDisconnectOptionsBuilder();
            await _client.DisconnectAsync(mqttDisconnectOptions.Build(), CancellationToken.None);
        }
        public void Dispose()
        {
            //オブジェクト破棄
            _client.Dispose();
        }
        public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel securelevel = MqttQualityOfServiceLevel.AtLeastOnce)
        {
            if (_client == null || !_client.IsConnected)
            {
                Console.WriteLine("### PUBLISH FAILED ###");
                return;
            }
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(securelevel)
                .WithRetainFlag(false)
                .Build();
            await _client.PublishAsync(message, CancellationToken.None).ConfigureAwait(false);
        }
       
        private Task ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine("### MQTT MESSAGER RECEVED ###");
            Console.WriteLine($"+topic:{arg.ApplicationMessage.Topic}");
            Console.WriteLine($"+payload:{arg.ApplicationMessage.ConvertPayloadToString()}");
            Console.WriteLine($"+Qos:{arg.ApplicationMessage.QualityOfServiceLevel}");
            Console.WriteLine($"+Retain:{arg.ApplicationMessage.Retain}");
            return Task.CompletedTask;
        }
        private async Task ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            //トピックをセット
            await _client.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
        }
        private async Task DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            //接続状況をチェック
            if (_client.IsConnected) { return; }
            //未接続が継続している場合は以下を続行.
            Console.WriteLine("### DISCONNECTING FROM SERVER ###");
            await Task.Delay(TimeSpan.FromSeconds(timeout_sec));
          
            try
            {
                var mqttoptions = new MqttClientOptionsBuilder();
                mqttoptions.WithClientId(Guid.NewGuid().ToString());
                mqttoptions.WithTcpServer(host, port);
                mqttoptions.WithTimeout(TimeSpan.FromSeconds(timeout_sec));
                //再接続
                await _client.ConnectAsync(mqttoptions.Build(), CancellationToken.None);
            }
            catch
            {
                Console.WriteLine("### RECONNECTING FAILED ###");
            }
        }
        
    }
}

コメント

タイトルとURLをコピーしました