MQTTnet MQTT 通訊指南
用途:輕量級 IoT 訊息協定的 .NET 客戶端 + Broker 實作
NuGet:
MQTTnet授權:MIT
支援:.NET 6+、.NET Framework 4.6.2+
GitHub:dotnet/MQTTnet
什麼是 MQTT
MQTT(Message Queuing Telemetry Transport)是一種超輕量的 IoT 訊息協定。它的核心設計目標是:低頻寬、低功耗、高延遲不可靠網路環境下也能可靠傳輸。
MQTT 採用 Publish/Subscribe(發佈/訂閱) 模式,與傳統的 Request/Response 完全不同。下圖以工廠監控為例展示典型拓撲:
- Publisher(發佈者):發送訊息到某個 Topic
- Subscriber(訂閱者):訂閱感興趣的 Topic(含萬用字元
+/#),自動收到匹配的訊息 - Broker(中介):負責接收訊息、依 Topic 比對訂閱者,並按 QoS 等級保證投遞
發佈者和訂閱者完全解耦 — 發佈者不知道誰在訂閱,訂閱者不知道誰在發佈,連結點都是 Broker 上的 Topic。
MQTTnet 是什麼
MQTTnet 是 .NET 生態最完整的 MQTT 實作,同時提供 Client 和 Server(Broker)功能。你可以用它連接外部 MQTT Broker(如 Mosquitto、EMQX),也可以在自己的應用程式中內嵌一個 Broker。
vs HTTP REST vs SignalR vs Modbus
| 比較項目 | MQTT | HTTP REST | SignalR | Modbus |
|---|---|---|---|---|
| 通訊模式 | Pub/Sub | Request/Response | Hub(雙向) | Master/Slave 輪詢 |
| 連線方式 | 長連線 | 短連線 | 長連線 | 輪詢 |
| 協定大小 | 極小(2 byte header) | 大(HTTP Header) | 中等 | 小 |
| 多對多 | ✅ 原生 | ❌ 需要輪詢 | ✅ 支援 | ❌ 點對點 |
| 離線訊息 | ✅ QoS + Retain | ❌ 無 | ❌ 無 | ❌ 無 |
| 適合裝置數 | 萬級 | 百級 | 千級 | 十級 |
| 適合場景 | IoT 大量設備、感測器 | Web API | 即時 Web UI | PLC 直連 |
什麼場景該用 MQTT
QoS 等級
MQTT 提供三種 Quality of Service 等級,平衡可靠性和效能:
| 等級 | 名稱 | 保證 | 適合場景 |
|---|---|---|---|
| QoS 0 | At Most Once | 最多一次(可能遺失) | 高頻感測器數據(丟一筆不影響) |
| QoS 1 | At Least Once | 至少一次(可能重複) | 告警、狀態變更(確保送達) |
| QoS 2 | Exactly Once | 剛好一次 | 控制命令(不能重複執行) |
公司場景建議
- 感測器數據上報:QoS 0 或 1(溫度、壓力等連續值,少一筆問題不大)
- 告警通知:QoS 1(確保警報送達,偶爾重複收到可以處理)
- 設備控制命令:QoS 2(啟動/停止命令不能重複執行)
Topic 命名慣例
Topic 是 MQTT 訊息的路由路徑,用 / 分隔層級。好的 Topic 設計讓訂閱更靈活:
{factory}/{line}/{device}/{datapoint}
範例:
gathertech/line1/plc01/temperature → PLC01 的溫度
gathertech/line1/plc01/pressure → PLC01 的壓力
gathertech/line1/plc02/temperature → PLC02 的溫度
gathertech/line1/+/temperature → Line1 所有設備的溫度(+ 是單層萬用字元)
gathertech/line1/# → Line1 所有數據(# 是多層萬用字元)
gathertech/alarm/critical → 所有 Critical 告警
gathertech/command/plc01 → PLC01 的控制命令
萬用字元:
+:匹配一層(line1/+/temperature匹配line1/plc01/temperature)#:匹配所有子層(line1/#匹配line1/plc01/temperature和line1/plc02/pressure)
安裝
dotnet add package MQTTnet
MQTTnet 單一套件同時包含 Client 和 Server 功能。
基本範例
連線 + 訂閱 + 發佈
using MQTTnet;
using MQTTnet.Protocol;
using System.Text;
using System.Text.Json;
// 建立客戶端
var mqttFactory = new MqttClientFactory();
using var mqttClient = mqttFactory.CreateMqttClient();
// 設定訊息處理(在連線前設定)
mqttClient.ApplicationMessageReceivedAsync += e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
Console.WriteLine($"收到: [{topic}] {payload}");
return Task.CompletedTask;
};
// 連線到 Broker
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithClientId("equipment-monitor-01")
.Build();
await mqttClient.ConnectAsync(options);
Console.WriteLine("已連線到 MQTT Broker");
// 訂閱 Topic
var subscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter("gathertech/line1/+/temperature",
qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await mqttClient.SubscribeAsync(subscribeOptions);
// 發佈訊息
var sensorData = new { Value = 25.3, Unit = "celsius", Timestamp = DateTime.UtcNow };
var message = new MqttApplicationMessageBuilder()
.WithTopic("gathertech/line1/plc01/temperature")
.WithPayload(JsonSerializer.Serialize(sensorData))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await mqttClient.PublishAsync(message);
// 中斷連線
await mqttClient.DisconnectAsync();
延伸閱讀
- 公司場景 Pattern — 設備數據上報、內嵌 Broker、告警廣播、搭配 Rx.NET、遺言機制
- gRPC 高效通訊 — 另一種服務間通訊方案
- Modbus 通訊整合 — PLC 直連通訊
- Rx.NET 響應式程式設計 — MQTT 訊息轉 Observable