公司場景 Pattern
QoS 等級實際時序
選擇 QoS 等級前,先了解三個等級在 wire 上實際發生的訊息交換。下圖把 Publisher ↔ Broker 之間的 PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP 握手畫在一起對照:
時序揭示的取捨:
- QoS 0:1 次封包;零開銷,但網路抖動就會丟訊息——高頻感測器首選
- QoS 1:2 次封包(可能更多次重傳);保證送達但可能重複,Subscriber 必須冪等處理——告警 / 狀態變更首選
- QoS 2:4 次封包;完整去重,但延遲與頻寬最高——只在「不能重複執行」的場景使用,例如控制命令或金流相關訊息
實務提醒:QoS 是 Publisher↔Broker 與 Broker↔Subscriber 兩段獨立的協商;若 Subscriber 訂閱時用較低 QoS(如 0),Broker 會降級——也就是說「兩端的 QoS 取較小值」。
Pattern 1: 設備數據上報
場景:多台 PLC 透過 Gateway 將感測器數據上報到中央監控系統。每台設備以固定間隔 Publish 數據,監控端訂閱感興趣的設備。
Topic 設計
gathertech/{factory}/{line}/{device}/{datapoint}
範例:
gathertech/fab1/line1/plc01/temperature
gathertech/fab1/line1/plc01/pressure
gathertech/fab1/line1/plc01/status
gathertech/fab1/line1/plc02/temperature
設備端(Publisher)
public class PlcDataPublisher : BackgroundService
{
private readonly IMqttClient _mqttClient;
private readonly IModbusClient _modbusClient;
private readonly ILogger<PlcDataPublisher> _logger;
private readonly string _deviceId;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await ConnectMqttAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
var registers = await _modbusClient.ReadHoldingRegistersAsync(0, 10);
var sensorData = new
{
Temperature = registers[0] * 0.1,
Pressure = registers[1] * 0.1,
FlowRate = registers[2],
Status = registers[3],
Timestamp = DateTimeOffset.UtcNow
};
var payload = JsonSerializer.Serialize(sensorData);
// 溫度、壓力用 QoS 0(高頻,偶爾遺失無妨)
await PublishAsync(
$"gathertech/fab1/line1/{_deviceId}/sensors",
payload,
MqttQualityOfServiceLevel.AtMostOnce);
// 狀態變更用 QoS 1(確保送達)
if (sensorData.Status != _lastStatus)
{
await PublishAsync(
$"gathertech/fab1/line1/{_deviceId}/status",
JsonSerializer.Serialize(new { Status = sensorData.Status }),
MqttQualityOfServiceLevel.AtLeastOnce);
_lastStatus = sensorData.Status;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to read/publish PLC data for {DeviceId}", _deviceId);
}
await Task.Delay(500, stoppingToken); // 每 500ms 上報
}
}
private async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.Build();
await _mqttClient.PublishAsync(message);
}
}
監控端(Subscriber)
public class CentralMonitorService : BackgroundService
{
private readonly IMqttClient _mqttClient;
private readonly ISensorDataRepository _repository;
private readonly ILogger<CentralMonitorService> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 訊息處理
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
_logger.LogDebug("MQTT received: [{Topic}] {Payload}", topic, payload);
// 解析 Topic 取得設備資訊
var parts = topic.Split('/');
var deviceId = parts[3]; // gathertech/fab1/line1/{deviceId}/sensors
var data = JsonSerializer.Deserialize<SensorData>(payload)!;
data.DeviceId = deviceId;
// 寫入資料庫
await _repository.InsertAsync(data);
};
// 連線
await ConnectMqttAsync();
// 訂閱所有 Line1 設備的感測器數據
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("gathertech/fab1/line1/+/sensors",
qualityOfServiceLevel: MqttQualityOfServiceLevel.AtMostOnce)
.WithTopicFilter("gathertech/fab1/line1/+/status",
qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await _mqttClient.SubscribeAsync(subscribeOptions, stoppingToken);
// 保持運行
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
Pattern 2: 內嵌 MQTT Broker
場景:在本機或邊緣設備上內嵌一個 MQTT Broker,不需要安裝外部的 Mosquitto 或 EMQX。適合單機系統、邊緣計算、或離線環境。
public class EmbeddedBrokerService : BackgroundService
{
private readonly MqttServer _mqttServer;
private readonly ILogger<EmbeddedBrokerService> _logger;
public EmbeddedBrokerService(ILogger<EmbeddedBrokerService> logger)
{
_logger = logger;
var mqttServerFactory = new MqttServerFactory();
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint() // Port 1883
.WithDefaultEndpointPort(1883)
.Build();
_mqttServer = mqttServerFactory.CreateMqttServer(options);
// 客戶端連線事件
_mqttServer.ClientConnectedAsync += e =>
{
_logger.LogInformation("MQTT client connected: {ClientId}", e.ClientId);
return Task.CompletedTask;
};
// 客戶端斷線事件
_mqttServer.ClientDisconnectedAsync += e =>
{
_logger.LogInformation(
"MQTT client disconnected: {ClientId}, reason: {Reason}",
e.ClientId, e.DisconnectType);
return Task.CompletedTask;
};
// 訊息攔截(可做存取控制)
_mqttServer.InterceptingPublishAsync += e =>
{
_logger.LogDebug(
"Message from {ClientId}: [{Topic}]",
e.ClientId, e.ApplicationMessage.Topic);
return Task.CompletedTask;
};
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _mqttServer.StartAsync();
_logger.LogInformation("Embedded MQTT Broker started on port 1883");
await Task.Delay(Timeout.Infinite, stoppingToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _mqttServer.StopAsync();
_logger.LogInformation("Embedded MQTT Broker stopped");
}
}
內嵌 vs 外部 Broker
- 單機設備應用(WPF 控制程式):內嵌 Broker,零外部依賴
- 多台設備 + 中央監控:使用外部 Broker(Mosquitto / EMQX),專業的叢集和管理功能
- 邊緣閘道器:內嵌 Broker 做本地匯聚,再轉發到雲端
Pattern 3: 告警廣播
場景:設備告警 Publish 到 alarm/ topic,所有監控端同時收到通知。
public class AlarmBroadcastService
{
private readonly IMqttClient _mqttClient;
public async Task BroadcastAlarmAsync(Alarm alarm)
{
var payload = JsonSerializer.Serialize(new
{
alarm.DeviceId,
alarm.Severity,
alarm.Message,
alarm.Source,
Timestamp = DateTimeOffset.UtcNow
});
// 依嚴重程度分 Topic
var topic = alarm.Severity switch
{
AlarmSeverity.Critical => $"gathertech/alarm/critical/{alarm.DeviceId}",
AlarmSeverity.Warning => $"gathertech/alarm/warning/{alarm.DeviceId}",
_ => $"gathertech/alarm/info/{alarm.DeviceId}"
};
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) // 告警必達
.WithRetainFlag(true) // Retain:新訂閱者立刻收到最新告警
.Build();
await _mqttClient.PublishAsync(message);
}
}
// 監控端訂閱
// 訂閱所有 Critical 告警
await mqttClient.SubscribeAsync("gathertech/alarm/critical/#",
MqttQualityOfServiceLevel.AtLeastOnce);
// 只訂閱特定設備的所有告警
await mqttClient.SubscribeAsync("gathertech/alarm/+/CMP-01",
MqttQualityOfServiceLevel.AtLeastOnce);
Pattern 4: 搭配 Rx.NET
場景:將 MQTT 接收到的訊息轉成 IObservable,用 Rx 做過濾、聚合、節流等串流處理。
public class MqttRxBridge : IDisposable
{
private readonly IMqttClient _mqttClient;
private readonly Subject<MqttSensorMessage> _sensorSubject = new();
public IObservable<MqttSensorMessage> SensorStream => _sensorSubject.AsObservable();
public MqttRxBridge(IMqttClient mqttClient)
{
_mqttClient = mqttClient;
// MQTT 訊息 → Rx Subject
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
try
{
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
var data = JsonSerializer.Deserialize<MqttSensorMessage>(payload)!;
data.Topic = e.ApplicationMessage.Topic;
_sensorSubject.OnNext(data);
}
catch (Exception ex)
{
_sensorSubject.OnError(ex);
}
return Task.CompletedTask;
};
}
public void Dispose() => _sensorSubject.Dispose();
}
使用 Rx 處理:
var bridge = new MqttRxBridge(mqttClient);
// 溫度異常過濾
bridge.SensorStream
.Where(d => d.Temperature > 80)
.Throttle(TimeSpan.FromSeconds(5)) // 5 秒內同一設備只觸發一次
.Subscribe(d =>
{
logger.LogWarning("High temperature on {DeviceId}: {Temperature}°C",
d.DeviceId, d.Temperature);
});
// 每 10 秒計算平均溫度
bridge.SensorStream
.Where(d => d.DeviceId == "CMP-01")
.Buffer(TimeSpan.FromSeconds(10))
.Where(batch => batch.Count > 0)
.Select(batch => new
{
AvgTemperature = batch.Average(d => d.Temperature),
MaxTemperature = batch.Max(d => d.Temperature),
SampleCount = batch.Count
})
.Subscribe(async stats =>
{
await repository.InsertAggregateAsync(stats);
});
Pattern 5: 遺言機制(Last Will and Testament)
場景:設備斷線時,Broker 自動發布「設備離線」訊息。不需要監控端去輪詢設備狀態 — 如果設備意外斷開,Broker 會替它「遺言」。
// 設備端連線時設定遺言
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 1883)
.WithClientId("plc-gateway-01")
// 遺言設定:斷線時 Broker 自動發布這則訊息
.WithWillTopic("gathertech/status/plc-gateway-01")
.WithWillPayload(JsonSerializer.Serialize(new
{
Status = "Offline",
Timestamp = DateTimeOffset.UtcNow,
Reason = "Unexpected disconnect"
}))
.WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithWillRetain(true) // 新訂閱者也能看到離線狀態
.Build();
await mqttClient.ConnectAsync(options);
// 正常運行時定期發布 Online 狀態
var onlineTimer = new PeriodicTimer(TimeSpan.FromSeconds(30));
while (await onlineTimer.WaitForNextTickAsync())
{
await mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic("gathertech/status/plc-gateway-01")
.WithPayload(JsonSerializer.Serialize(new
{
Status = "Online",
Timestamp = DateTimeOffset.UtcNow
}))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build());
}
監控端:訂閱 gathertech/status/# 就能即時知道所有設備的上線/離線狀態,不需要自己做心跳偵測。
與 Modbus 的共存策略
MQTT 和 Modbus 不是互斥的 — 它們解決的是不同層級的問題:
┌──────────────┐ MQTT ┌───────────┐ MQTT ┌──────────┐
│ PLC Gateway │ ──────────→ │ Broker │ ──────────→ │ 監控中心 │
│ (邊緣設備) │ │ │ │ 資料庫 │
└──────┬───────┘ └───────────┘ │ 告警服務 │
│ └──────────┘
Modbus
│
┌──────┴───────┐
│ PLC │
│ (現場控制器) │
└──────────────┘
- Modbus:Gateway ↔ PLC 之間的直連通訊(讀寫暫存器)
- MQTT:Gateway → Broker → 各服務之間的訊息分發(一對多)
// Gateway 的角色:Modbus 讀 PLC → MQTT 發佈到 Broker
public class PlcGatewayService : BackgroundService
{
private readonly IModbusClient _modbusClient; // 讀 PLC
private readonly IMqttClient _mqttClient; // 發佈到 Broker
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// 1. Modbus 讀取 PLC 資料
var registers = await _modbusClient.ReadHoldingRegistersAsync(0, 10);
// 2. 轉換為 MQTT 訊息發佈
var payload = ConvertToJson(registers);
await _mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic($"gathertech/fab1/line1/{_deviceId}/sensors")
.WithPayload(payload)
.Build());
await Task.Delay(500, stoppingToken);
}
}
}
安全性
TLS 加密
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 8883) // TLS port
.WithTlsOptions(tls =>
{
tls.WithSslProtocols(System.Security.Authentication.SslProtocols.Tls12);
tls.WithCertificateValidationHandler(context =>
{
// 生產環境需要正確驗證憑證
return context.SslPolicyErrors == System.Net.Security.SslPolicyErrors.None;
});
})
.Build();
使用者認證
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 1883)
.WithCredentials("equipment-user", "secure-password")
.Build();
延伸閱讀
- 概觀 — MQTT 基礎、QoS、Topic 設計、安裝
- Rx.NET 響應式程式設計 — MQTT 訊息轉 Observable 做串流處理
- Modbus 通訊整合 — PLC 直連通訊(與 MQTT 互補)
- gRPC 高效通訊 — 服務間通訊的另一個選擇
- Polly 韌性策略 — MQTT 連線的重試策略