メインコンテンツまでスキップ

公司場景 Pattern

QoS 等級實際時序

選擇 QoS 等級前,先了解三個等級在 wire 上實際發生的訊息交換。下圖把 Publisher ↔ Broker 之間的 PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP 握手畫在一起對照:

時序揭示的取捨:

  • QoS 0:1 次封包;零開銷,但網路抖動就會丟訊息——高頻感測器首選
  • QoS 1:2 次封包(可能更多次重傳);保證送達但可能重複,Subscriber 必須冪等處理——告警 / 狀態變更首選
  • QoS 2:4 次封包;完整去重,但延遲與頻寬最高——只在「不能重複執行」的場景使用,例如控制命令金流相關訊息

實務提醒:QoS 是 Publisher↔Broker 與 Broker↔Subscriber 兩段獨立的協商;若 Subscriber 訂閱時用較低 QoS(如 0),Broker 會降級——也就是說「兩端的 QoS 取較小值」。


Pattern 1: 設備數據上報

場景:多台 PLC 透過 Gateway 將感測器數據上報到中央監控系統。每台設備以固定間隔 Publish 數據,監控端訂閱感興趣的設備。

Topic 設計

gathertech/{factory}/{line}/{device}/{datapoint}

範例:
gathertech/fab1/line1/plc01/temperature
gathertech/fab1/line1/plc01/pressure
gathertech/fab1/line1/plc01/status
gathertech/fab1/line1/plc02/temperature

設備端(Publisher)

public class PlcDataPublisher : BackgroundService
{
private readonly IMqttClient _mqttClient;
private readonly IModbusClient _modbusClient;
private readonly ILogger<PlcDataPublisher> _logger;
private readonly string _deviceId;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await ConnectMqttAsync();

while (!stoppingToken.IsCancellationRequested)
{
try
{
var registers = await _modbusClient.ReadHoldingRegistersAsync(0, 10);

var sensorData = new
{
Temperature = registers[0] * 0.1,
Pressure = registers[1] * 0.1,
FlowRate = registers[2],
Status = registers[3],
Timestamp = DateTimeOffset.UtcNow
};

var payload = JsonSerializer.Serialize(sensorData);

// 溫度、壓力用 QoS 0(高頻,偶爾遺失無妨)
await PublishAsync(
$"gathertech/fab1/line1/{_deviceId}/sensors",
payload,
MqttQualityOfServiceLevel.AtMostOnce);

// 狀態變更用 QoS 1(確保送達)
if (sensorData.Status != _lastStatus)
{
await PublishAsync(
$"gathertech/fab1/line1/{_deviceId}/status",
JsonSerializer.Serialize(new { Status = sensorData.Status }),
MqttQualityOfServiceLevel.AtLeastOnce);
_lastStatus = sensorData.Status;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to read/publish PLC data for {DeviceId}", _deviceId);
}

await Task.Delay(500, stoppingToken); // 每 500ms 上報
}
}

private async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.Build();

await _mqttClient.PublishAsync(message);
}
}

監控端(Subscriber)

public class CentralMonitorService : BackgroundService
{
private readonly IMqttClient _mqttClient;
private readonly ISensorDataRepository _repository;
private readonly ILogger<CentralMonitorService> _logger;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 訊息處理
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

_logger.LogDebug("MQTT received: [{Topic}] {Payload}", topic, payload);

// 解析 Topic 取得設備資訊
var parts = topic.Split('/');
var deviceId = parts[3]; // gathertech/fab1/line1/{deviceId}/sensors

var data = JsonSerializer.Deserialize<SensorData>(payload)!;
data.DeviceId = deviceId;

// 寫入資料庫
await _repository.InsertAsync(data);
};

// 連線
await ConnectMqttAsync();

// 訂閱所有 Line1 設備的感測器數據
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("gathertech/fab1/line1/+/sensors",
qualityOfServiceLevel: MqttQualityOfServiceLevel.AtMostOnce)
.WithTopicFilter("gathertech/fab1/line1/+/status",
qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce)
.Build();

await _mqttClient.SubscribeAsync(subscribeOptions, stoppingToken);

// 保持運行
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}

Pattern 2: 內嵌 MQTT Broker

場景:在本機或邊緣設備上內嵌一個 MQTT Broker,不需要安裝外部的 Mosquitto 或 EMQX。適合單機系統、邊緣計算、或離線環境。

public class EmbeddedBrokerService : BackgroundService
{
private readonly MqttServer _mqttServer;
private readonly ILogger<EmbeddedBrokerService> _logger;

public EmbeddedBrokerService(ILogger<EmbeddedBrokerService> logger)
{
_logger = logger;

var mqttServerFactory = new MqttServerFactory();
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint() // Port 1883
.WithDefaultEndpointPort(1883)
.Build();

_mqttServer = mqttServerFactory.CreateMqttServer(options);

// 客戶端連線事件
_mqttServer.ClientConnectedAsync += e =>
{
_logger.LogInformation("MQTT client connected: {ClientId}", e.ClientId);
return Task.CompletedTask;
};

// 客戶端斷線事件
_mqttServer.ClientDisconnectedAsync += e =>
{
_logger.LogInformation(
"MQTT client disconnected: {ClientId}, reason: {Reason}",
e.ClientId, e.DisconnectType);
return Task.CompletedTask;
};

// 訊息攔截(可做存取控制)
_mqttServer.InterceptingPublishAsync += e =>
{
_logger.LogDebug(
"Message from {ClientId}: [{Topic}]",
e.ClientId, e.ApplicationMessage.Topic);
return Task.CompletedTask;
};
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _mqttServer.StartAsync();
_logger.LogInformation("Embedded MQTT Broker started on port 1883");

await Task.Delay(Timeout.Infinite, stoppingToken);
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _mqttServer.StopAsync();
_logger.LogInformation("Embedded MQTT Broker stopped");
}
}
內嵌 vs 外部 Broker
  • 單機設備應用(WPF 控制程式):內嵌 Broker,零外部依賴
  • 多台設備 + 中央監控:使用外部 Broker(Mosquitto / EMQX),專業的叢集和管理功能
  • 邊緣閘道器:內嵌 Broker 做本地匯聚,再轉發到雲端

Pattern 3: 告警廣播

場景:設備告警 Publish 到 alarm/ topic,所有監控端同時收到通知。

public class AlarmBroadcastService
{
private readonly IMqttClient _mqttClient;

public async Task BroadcastAlarmAsync(Alarm alarm)
{
var payload = JsonSerializer.Serialize(new
{
alarm.DeviceId,
alarm.Severity,
alarm.Message,
alarm.Source,
Timestamp = DateTimeOffset.UtcNow
});

// 依嚴重程度分 Topic
var topic = alarm.Severity switch
{
AlarmSeverity.Critical => $"gathertech/alarm/critical/{alarm.DeviceId}",
AlarmSeverity.Warning => $"gathertech/alarm/warning/{alarm.DeviceId}",
_ => $"gathertech/alarm/info/{alarm.DeviceId}"
};

var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) // 告警必達
.WithRetainFlag(true) // Retain:新訂閱者立刻收到最新告警
.Build();

await _mqttClient.PublishAsync(message);
}
}

// 監控端訂閱
// 訂閱所有 Critical 告警
await mqttClient.SubscribeAsync("gathertech/alarm/critical/#",
MqttQualityOfServiceLevel.AtLeastOnce);

// 只訂閱特定設備的所有告警
await mqttClient.SubscribeAsync("gathertech/alarm/+/CMP-01",
MqttQualityOfServiceLevel.AtLeastOnce);

Pattern 4: 搭配 Rx.NET

場景:將 MQTT 接收到的訊息轉成 IObservable,用 Rx 做過濾、聚合、節流等串流處理。

public class MqttRxBridge : IDisposable
{
private readonly IMqttClient _mqttClient;
private readonly Subject<MqttSensorMessage> _sensorSubject = new();

public IObservable<MqttSensorMessage> SensorStream => _sensorSubject.AsObservable();

public MqttRxBridge(IMqttClient mqttClient)
{
_mqttClient = mqttClient;

// MQTT 訊息 → Rx Subject
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
try
{
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
var data = JsonSerializer.Deserialize<MqttSensorMessage>(payload)!;
data.Topic = e.ApplicationMessage.Topic;
_sensorSubject.OnNext(data);
}
catch (Exception ex)
{
_sensorSubject.OnError(ex);
}
return Task.CompletedTask;
};
}

public void Dispose() => _sensorSubject.Dispose();
}

使用 Rx 處理:

var bridge = new MqttRxBridge(mqttClient);

// 溫度異常過濾
bridge.SensorStream
.Where(d => d.Temperature > 80)
.Throttle(TimeSpan.FromSeconds(5)) // 5 秒內同一設備只觸發一次
.Subscribe(d =>
{
logger.LogWarning("High temperature on {DeviceId}: {Temperature}°C",
d.DeviceId, d.Temperature);
});

// 每 10 秒計算平均溫度
bridge.SensorStream
.Where(d => d.DeviceId == "CMP-01")
.Buffer(TimeSpan.FromSeconds(10))
.Where(batch => batch.Count > 0)
.Select(batch => new
{
AvgTemperature = batch.Average(d => d.Temperature),
MaxTemperature = batch.Max(d => d.Temperature),
SampleCount = batch.Count
})
.Subscribe(async stats =>
{
await repository.InsertAggregateAsync(stats);
});

Pattern 5: 遺言機制(Last Will and Testament)

場景:設備斷線時,Broker 自動發布「設備離線」訊息。不需要監控端去輪詢設備狀態 — 如果設備意外斷開,Broker 會替它「遺言」。

// 設備端連線時設定遺言
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 1883)
.WithClientId("plc-gateway-01")
// 遺言設定:斷線時 Broker 自動發布這則訊息
.WithWillTopic("gathertech/status/plc-gateway-01")
.WithWillPayload(JsonSerializer.Serialize(new
{
Status = "Offline",
Timestamp = DateTimeOffset.UtcNow,
Reason = "Unexpected disconnect"
}))
.WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithWillRetain(true) // 新訂閱者也能看到離線狀態
.Build();

await mqttClient.ConnectAsync(options);

// 正常運行時定期發布 Online 狀態
var onlineTimer = new PeriodicTimer(TimeSpan.FromSeconds(30));
while (await onlineTimer.WaitForNextTickAsync())
{
await mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic("gathertech/status/plc-gateway-01")
.WithPayload(JsonSerializer.Serialize(new
{
Status = "Online",
Timestamp = DateTimeOffset.UtcNow
}))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build());
}

監控端:訂閱 gathertech/status/# 就能即時知道所有設備的上線/離線狀態,不需要自己做心跳偵測。


與 Modbus 的共存策略

MQTT 和 Modbus 不是互斥的 — 它們解決的是不同層級的問題:

┌──────────────┐    MQTT     ┌───────────┐    MQTT     ┌──────────┐
│ PLC Gateway │ ──────────→ │ Broker │ ──────────→ │ 監控中心 │
│ (邊緣設備) │ │ │ │ 資料庫 │
└──────┬───────┘ └───────────┘ │ 告警服務 │
│ └──────────┘
Modbus

┌──────┴───────┐
│ PLC │
│ (現場控制器) │
└──────────────┘
  • Modbus:Gateway ↔ PLC 之間的直連通訊(讀寫暫存器)
  • MQTT:Gateway → Broker → 各服務之間的訊息分發(一對多)
// Gateway 的角色:Modbus 讀 PLC → MQTT 發佈到 Broker
public class PlcGatewayService : BackgroundService
{
private readonly IModbusClient _modbusClient; // 讀 PLC
private readonly IMqttClient _mqttClient; // 發佈到 Broker

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// 1. Modbus 讀取 PLC 資料
var registers = await _modbusClient.ReadHoldingRegistersAsync(0, 10);

// 2. 轉換為 MQTT 訊息發佈
var payload = ConvertToJson(registers);
await _mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic($"gathertech/fab1/line1/{_deviceId}/sensors")
.WithPayload(payload)
.Build());

await Task.Delay(500, stoppingToken);
}
}
}

安全性

TLS 加密

var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 8883) // TLS port
.WithTlsOptions(tls =>
{
tls.WithSslProtocols(System.Security.Authentication.SslProtocols.Tls12);
tls.WithCertificateValidationHandler(context =>
{
// 生產環境需要正確驗證憑證
return context.SslPolicyErrors == System.Net.Security.SslPolicyErrors.None;
});
})
.Build();

使用者認證

var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.local", 1883)
.WithCredentials("equipment-user", "secure-password")
.Build();

延伸閱讀