今回の記事はC#でMQTTプロトコルについてです。
環境は.NetFramewordk4.8です。
WPFを使用しました。
以前にC#のM2MQTTライブラリを使ってMQTT通信について書きました。
しかし、上記の記事を書いた後しばらく利用していたらいくつか困ったことがありました。
上記理由から他に利用できそうなライブラリがないのか?と思い見つけたのが以下で説明するMQTTnetライブラリです。
上記の理由から今回はMQTTnetを使ってみます。
MQTTnetではAmazonAWSへの接続やセキュリティー面を考慮したTLSを使った接続など用意されてます。
事前準備
nugetから「mqtt」を検索して「MQTTnet」をインストール
※MITライセンスなので商用利用も可能です。
MQTTnetライブラリの各API
今回は、最も簡単な通常接続(セキュリティーなど考慮しない)のサンプルです。
MQTTのバージョンは3.1.1を使います。
クライアントの生成
MqttFactory()クラスを使ってMQTTクライアントを作成します。
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;
var _mqttfactory = new MqttFactory();
var _client = _mqttfactory.CreateMqttClient();
クライアントオプション
接続処理を実行する前にMQTTクライアントのオプション設定を行います。(必要最低限です。)
//設定読み込み
//クライアントハンドル作成
//ホスト名とポート番号を設定
//ユーザー名とパスワード
var mqttoptions = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid().ToString())
.WithTcpServer(host, port)
.WithCredentials("username", "password");
.Build();
イベントハンドラをセット
いくつかのイベントハンドラ処理を紹介します。
サンプルソースコードは後述。
イベント関数 | 役割 |
_client.ConnectedAsync | 接続成功後のイベントハンドラ処理 |
_client.DisconnectedAsync | 切断後のイベントハンドラ処理 |
_client.ApplicationMessageReceivedAsync | メッセージ受信のイベントハンドラ処理 |
接続処理
ここまでで、MQTTクライアントのオプションとイベントハンドラの説明はできたので、次に接続処理についてです。
try
{
//接続処理
await _client.ConnectAsync(mqttoptions, CancellationToken.None);
}
catch
{
//ブローカーが起動していない場合、例外発生.
Console.WriteLine("### CONNECTING FAILED (SERVER ERROR) ###");
}
切断
//切断処理
var mqttDisconnectOptions = _mqttfactory.CreateClientDisconnectOptionsBuilder();
await _client.DisconnectAsync(mqttDisconnectOptions.Build(), CancellationToken.None);
オブジェクト使用後はDisposeも忘れずに
//オブジェクト破棄
_client.Dispose();
MqttClientListenerクラスを実装してみた
上記で説明したイベントハンドラは以下のクラスでまとめて使用しているので以下を見て下さい。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;
using System.Threading;
using System.Runtime.CompilerServices;
namespace mqtt_sample.Models
{
public class MqttClientListener
{
public string host { get; set; }
public int port { get; set; }
public double timeout_sec { get; set; }
public string topic { get; set; }
private IMqttClient _client;
private MqttFactory _mqttfactory;
//コンストラクタ
public MqttClientListener()
{
//デフォルト設定
host = "localhost";
port = 1883;
timeout_sec = 10;
topic = "#";
//client生成
_mqttfactory = new MqttFactory();
_client = _mqttfactory.CreateMqttClient();
}
public async Task Connect()
{
//設定読み込み
var mqttoptions = new MqttClientOptionsBuilder();
mqttoptions.WithClientId(Guid.NewGuid().ToString());
mqttoptions.WithTcpServer(host, port);
mqttoptions.WithTimeout(TimeSpan.FromSeconds(timeout_sec));
//イベントハンドラセット
_client.ConnectedAsync += ConnectedAsync;
_client.DisconnectedAsync += DisconnectedAsync;
_client.ApplicationMessageReceivedAsync += ApplicationMessageReceivedAsync;
try
{
//接続処理
await _client.ConnectAsync(mqttoptions.Build(), CancellationToken.None);
}
catch
{
//ブローカーが起動していない場合、例外発生.
Console.WriteLine("### CONNECTING FAILED (SERVER ERROR) ###");
}
}
public async Task Disconnect()
{
//切断処理
var mqttDisconnectOptions = _mqttfactory.CreateClientDisconnectOptionsBuilder();
await _client.DisconnectAsync(mqttDisconnectOptions.Build(), CancellationToken.None);
}
public void Dispose()
{
//オブジェクト破棄
_client.Dispose();
}
public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel securelevel = MqttQualityOfServiceLevel.AtLeastOnce)
{
if (_client == null || !_client.IsConnected)
{
Console.WriteLine("### PUBLISH FAILED ###");
return;
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(securelevel)
.WithRetainFlag(false)
.Build();
await _client.PublishAsync(message, CancellationToken.None).ConfigureAwait(false);
}
private Task ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine("### MQTT MESSAGER RECEVED ###");
Console.WriteLine($"+topic:{arg.ApplicationMessage.Topic}");
Console.WriteLine($"+payload:{arg.ApplicationMessage.ConvertPayloadToString()}");
Console.WriteLine($"+Qos:{arg.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+Retain:{arg.ApplicationMessage.Retain}");
return Task.CompletedTask;
}
private async Task ConnectedAsync(MqttClientConnectedEventArgs arg)
{
//トピックをセット
await _client.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
}
private async Task DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
//接続状況をチェック
if (_client.IsConnected) { return; }
//未接続が継続している場合は以下を続行.
Console.WriteLine("### DISCONNECTING FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(timeout_sec));
try
{
var mqttoptions = new MqttClientOptionsBuilder();
mqttoptions.WithClientId(Guid.NewGuid().ToString());
mqttoptions.WithTcpServer(host, port);
mqttoptions.WithTimeout(TimeSpan.FromSeconds(timeout_sec));
//再接続
await _client.ConnectAsync(mqttoptions.Build(), CancellationToken.None);
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
}
}
}
コメント