工業場景常用 Pattern
本頁收集公司實際需求中最常用的 Rx Pattern。每個 pattern 說明場景、核心思路和完整程式碼。
Pattern 1:設備輪詢(Polling)
場景
定時讀取 PLC 暫存器或設備狀態,值沒有變化就不推送給下游。這是工業場景中最基本的 pattern。
核心思路
Interval 定時觸發 → SelectMany 執行非同步讀取 → DistinctUntilChanged 去除重複值
程式碼
/// <summary>
/// 建立設備輪詢 Observable。
/// </summary>
/// <param name="readFunc">讀取設備的非同步方法</param>
/// <param name="interval">輪詢間隔</param>
/// <param name="scheduler">排程器(預設 TaskPool,測試時可替換)</param>
public static IObservable<T> CreatePollingStream<T>(
Func<Task<T>> readFunc,
TimeSpan interval,
IScheduler scheduler = null)
{
scheduler ??= TaskPoolScheduler.Default;
return Observable.Interval(interval, scheduler)
.SelectMany(_ => Observable.FromAsync(readFunc))
.DistinctUntilChanged();
}
使用範例:
// 每 500ms 讀取 PLC D100 暫存器
var plcStream = CreatePollingStream(
() => plcClient.ReadRegisterAsync("D100"),
TimeSpan.FromMilliseconds(500));
plcStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => DisplayD100.Text = value.ToString());
進階:多暫存器同時輪詢
var d100 = CreatePollingStream(() => plc.ReadAsync("D100"), TimeSpan.FromMilliseconds(500));
var d200 = CreatePollingStream(() => plc.ReadAsync("D200"), TimeSpan.FromMilliseconds(500));
var d300 = CreatePollingStream(() => plc.ReadAsync("D300"), TimeSpan.FromSeconds(1));
// 組合最新值到 ViewModel
d100.CombineLatest(d200, d300, (a, b, c) => new PlcSnapshot(a, b, c))
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(snapshot => ViewModel.UpdateSnapshot(snapshot));
Pattern 2:多設備合併(Event Aggregation)
場景
工廠有多台設備,需要把所有設備的資料流合併到統一儀表板或告警系統。
核心思路
Merge 將多個 Observable 合併為單一流
程式碼
// 各設備獨立的資料流
IObservable<DeviceEvent> plc1Stream = plc1.GetEventStream();
IObservable<DeviceEvent> plc2Stream = plc2.GetEventStream();
IObservable<DeviceEvent> plc3Stream = plc3.GetEventStream();
// 合併到統一頻道
var allDeviceEvents = Observable.Merge(plc1Stream, plc2Stream, plc3Stream);
// 統一處理
allDeviceEvents
.Subscribe(evt => Console.WriteLine(
$"[{evt.DeviceId}] {evt.Timestamp:HH:mm:ss} - {evt.Message}"));
進階:動態設備清單
當設備數量不固定(動態新增/移除)時:
// 設備清單是 Observable<IObservable<DeviceEvent>>
IObservable<IObservable<DeviceEvent>> deviceStreams = deviceManager
.DeviceConnected
.Select(device => device.GetEventStream()
.TakeUntil(deviceManager.DeviceDisconnected
.Where(d => d.Id == device.Id)));
// Merge 動態串流
deviceStreams
.Merge()
.Subscribe(evt => Dashboard.AddEvent(evt));
分流處理
合併後可以用 GroupBy 將事件依設備 ID 分流:
allDeviceEvents
.GroupBy(evt => evt.DeviceId)
.Subscribe(group =>
{
group.Subscribe(evt =>
Logger.Info($"[{group.Key}] {evt.Message}"));
});
Pattern 3:資料緩衝批次寫入(Buffered Write)
場景
感測器資料流頻率很高,逐筆寫入資料庫效能差。需要累積一定數量或時間後批次寫入。
核心思路
Buffer 按時間窗口或元素數量收集,批次推送
程式碼
sensorDataStream
// 每 10 秒或累積 100 筆(先到者為準)
.Buffer(TimeSpan.FromSeconds(10), 100)
// 過濾空批次
.Where(batch => batch.Count > 0)
// 批次寫入資料庫
.SelectMany(batch => Observable.FromAsync(
() => repository.BulkInsertAsync(batch)))
.Subscribe(
rowsAffected => Logger.Debug($"已寫入 {rowsAffected} 筆"),
ex => Logger.Error($"批次寫入失敗: {ex.Message}"));
進階:依設備分組後分別緩衝
allSensorData
.GroupBy(d => d.DeviceId)
.SelectMany(group => group
.Buffer(TimeSpan.FromSeconds(10), 100)
.Where(batch => batch.Count > 0)
.SelectMany(batch => Observable.FromAsync(
() => repository.BulkInsertAsync(group.Key, batch))))
.Subscribe();
Pattern 4:斷線重連(Reconnection)
場景
設備通訊(Modbus TCP、SECS/GEM)隨時可能斷線。需要自動重連,且避免斷線期間瘋狂重試。
核心思路
Timeout 偵測斷線 → RetryWhen 搭配指數退避延遲重連
程式碼
public IObservable<DeviceData> CreateResilientStream(IDeviceConnection device)
{
return Observable.Defer(() =>
Observable.FromAsync(() => device.ConnectAsync())
.SelectMany(_ => device.GetDataStream()))
// 超過 5 秒沒資料 → 視為斷線
.Timeout(TimeSpan.FromSeconds(5))
// 指數退避重連
.RetryWhen(errors => errors
.Select((error, retryCount) => (error, retryCount))
.SelectMany(t =>
{
// 退避時間:2s, 4s, 8s, 16s, 32s, 上限 60s
var delay = TimeSpan.FromSeconds(
Math.Min(Math.Pow(2, t.retryCount + 1), 60));
Logger.Warn(
$"連線失敗 ({t.error.GetType().Name}): {t.error.Message}," +
$"{delay.TotalSeconds}s 後第 {t.retryCount + 1} 次重連");
return Observable.Timer(delay);
}));
}
使用範例:
var disposable = CreateResilientStream(modbusDevice)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(
data => UpdateUI(data),
ex => Logger.Error($"永久連線失敗: {ex.Message}"));
// 結束時取消訂閱
disposable.Dispose();
進階:搭配連線狀態通知
var connectionStatus = new BehaviorSubject<bool>(false);
CreateResilientStream(device)
.Do(
_ => { if (!connectionStatus.Value) connectionStatus.OnNext(true); },
_ => connectionStatus.OnNext(false))
.Subscribe(data => ProcessData(data));
// UI 綁定連線狀態
connectionStatus
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(connected =>
{
StatusIndicator.Fill = connected
? Brushes.Green
: Brushes.Red;
});
Pattern 5:去抖動(Debounce for UI)
場景
使用者在搜尋欄快速輸入,不應每次按鍵都觸發搜尋。只取使用者停止輸入後的最終文字。同樣適用於參數設定介面中的數值調整。
核心思路
Throttle(即 Debounce 語意)等待靜默期 → DistinctUntilChanged 避免重複搜尋 → SelectMany 執行搜尋 + Switch 取消前一次搜尋
程式碼
// 將 TextBox 的 TextChanged 事件轉為 Observable
var searchText = Observable.FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
h => SearchBox.TextChanged += h,
h => SearchBox.TextChanged -= h)
.Select(_ => SearchBox.Text);
searchText
// 停止輸入 300ms 後才觸發
.Throttle(TimeSpan.FromMilliseconds(300))
// 文字沒變就不搜尋
.DistinctUntilChanged()
// 新的搜尋自動取消前一次
.Select(text => Observable.FromAsync(() => SearchDevicesAsync(text)))
.Switch()
// 結果更新到 UI
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(results => DeviceList.ItemsSource = results);
進階:數值調整去抖動
// 使用者調整 Slider 設定設備參數,避免每一格都送命令
var sliderValues = Observable.FromEventPattern<RoutedPropertyChangedEventHandler<double>,
RoutedPropertyChangedEventArgs<double>>(
h => ParameterSlider.ValueChanged += h,
h => ParameterSlider.ValueChanged -= h)
.Select(ep => ep.EventArgs.NewValue);
sliderValues
.Throttle(TimeSpan.FromMilliseconds(500))
.DistinctUntilChanged()
.SelectMany(value => Observable.FromAsync(
() => device.SetParameterAsync("Speed", value)))
.Subscribe(
_ => Logger.Info("參數已更新"),
ex => Logger.Error($"參數設定失敗: {ex.Message}"));
Pattern 6:設備狀態機監控(State Change Detection)
場景
設備有多種狀態(Idle、Running、Error、Maintenance),需要偵測狀態轉換並記錄轉換事件,例如記錄設備何時從 Running 變成 Error。
核心思路
DistinctUntilChanged 偵測變化 → Scan 或 Buffer(2,1) 取得前後狀態
程式碼
public enum DeviceState { Idle, Running, Error, Maintenance }
public record StateTransition(
DeviceState Previous,
DeviceState Current,
DateTime Timestamp);
// 偵測狀態轉換
IObservable<StateTransition> stateTransitions = deviceStateStream
.DistinctUntilChanged()
// Buffer(2, 1) 產生滑動視窗:[A,B], [B,C], [C,D]...
.Buffer(2, 1)
.Where(pair => pair.Count == 2)
.Select(pair => new StateTransition(
pair[0], pair[1], DateTime.Now));
使用範例:
stateTransitions.Subscribe(transition =>
{
Logger.Info(
$"[狀態轉換] {transition.Previous} → {transition.Current} " +
$"({transition.Timestamp:HH:mm:ss})");
// 進入 Error 狀態時告警
if (transition.Current == DeviceState.Error)
{
AlarmService.Raise(
$"設備從 {transition.Previous} 進入 Error 狀態");
}
});
進階:狀態持續時間追蹤
// 計算每個狀態持續了多久
deviceStateStream
.DistinctUntilChanged()
.TimeInterval() // 加上與前一個值的時間間隔
.Subscribe(ti =>
{
Logger.Info(
$"狀態 {ti.Value} 持續了 {ti.Interval.TotalSeconds:F1}s");
});
進階:狀態停留超時告警
// 設備在 Error 狀態停留超過 30 秒就升級告警
deviceStateStream
.DistinctUntilChanged()
.SelectMany(state => state == DeviceState.Error
? Observable.Timer(TimeSpan.FromSeconds(30))
.Select(_ => state)
.TakeUntil(deviceStateStream
.Where(s => s != DeviceState.Error))
: Observable.Empty<DeviceState>())
.Subscribe(_ => AlarmService.Escalate("設備持續 Error 超過 30 秒"));
Pattern 速查表
| Pattern | 核心 Operators | 頁面參考 |
|---|---|---|
| 設備輪詢 | Interval + SelectMany + DistinctUntilChanged | 常用 Operators |
| 多設備合併 | Merge / Merge + GroupBy | 常用 Operators |
| 批次寫入 | Buffer + SelectMany | 常用 Operators |
| 斷線重連 | Timeout + RetryWhen + Timer | 錯誤處理 |
| 去抖動 | Throttle + DistinctUntilChanged + Switch | 常用 Operators |
| 狀態機 | DistinctUntilChanged + Buffer(2,1) / TimeInterval | 常用 Operators |
延伸閱讀:
- Scheduler 與線程管理 — ObserveOn 放在最後一步
- 錯誤處理 — 重連策略詳解
- 最佳實踐 — 生命週期管理與常見陷阱