跳至主要内容

工業場景常用 Pattern

本頁收集公司實際需求中最常用的 Rx Pattern。每個 pattern 說明場景、核心思路和完整程式碼。


Pattern 1:設備輪詢(Polling)

場景

定時讀取 PLC 暫存器或設備狀態,值沒有變化就不推送給下游。這是工業場景中最基本的 pattern。

核心思路

Interval 定時觸發 → SelectMany 執行非同步讀取 → DistinctUntilChanged 去除重複值

程式碼

/// <summary>
/// 建立設備輪詢 Observable。
/// </summary>
/// <param name="readFunc">讀取設備的非同步方法</param>
/// <param name="interval">輪詢間隔</param>
/// <param name="scheduler">排程器(預設 TaskPool,測試時可替換)</param>
public static IObservable<T> CreatePollingStream<T>(
Func<Task<T>> readFunc,
TimeSpan interval,
IScheduler scheduler = null)
{
scheduler ??= TaskPoolScheduler.Default;

return Observable.Interval(interval, scheduler)
.SelectMany(_ => Observable.FromAsync(readFunc))
.DistinctUntilChanged();
}

使用範例:

// 每 500ms 讀取 PLC D100 暫存器
var plcStream = CreatePollingStream(
() => plcClient.ReadRegisterAsync("D100"),
TimeSpan.FromMilliseconds(500));

plcStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => DisplayD100.Text = value.ToString());

進階:多暫存器同時輪詢

var d100 = CreatePollingStream(() => plc.ReadAsync("D100"), TimeSpan.FromMilliseconds(500));
var d200 = CreatePollingStream(() => plc.ReadAsync("D200"), TimeSpan.FromMilliseconds(500));
var d300 = CreatePollingStream(() => plc.ReadAsync("D300"), TimeSpan.FromSeconds(1));

// 組合最新值到 ViewModel
d100.CombineLatest(d200, d300, (a, b, c) => new PlcSnapshot(a, b, c))
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(snapshot => ViewModel.UpdateSnapshot(snapshot));

Pattern 2:多設備合併(Event Aggregation)

場景

工廠有多台設備,需要把所有設備的資料流合併到統一儀表板或告警系統。

核心思路

Merge 將多個 Observable 合併為單一流

程式碼

// 各設備獨立的資料流
IObservable<DeviceEvent> plc1Stream = plc1.GetEventStream();
IObservable<DeviceEvent> plc2Stream = plc2.GetEventStream();
IObservable<DeviceEvent> plc3Stream = plc3.GetEventStream();

// 合併到統一頻道
var allDeviceEvents = Observable.Merge(plc1Stream, plc2Stream, plc3Stream);

// 統一處理
allDeviceEvents
.Subscribe(evt => Console.WriteLine(
$"[{evt.DeviceId}] {evt.Timestamp:HH:mm:ss} - {evt.Message}"));

進階:動態設備清單

當設備數量不固定(動態新增/移除)時:

// 設備清單是 Observable<IObservable<DeviceEvent>>
IObservable<IObservable<DeviceEvent>> deviceStreams = deviceManager
.DeviceConnected
.Select(device => device.GetEventStream()
.TakeUntil(deviceManager.DeviceDisconnected
.Where(d => d.Id == device.Id)));

// Merge 動態串流
deviceStreams
.Merge()
.Subscribe(evt => Dashboard.AddEvent(evt));

分流處理

合併後可以用 GroupBy 將事件依設備 ID 分流:

allDeviceEvents
.GroupBy(evt => evt.DeviceId)
.Subscribe(group =>
{
group.Subscribe(evt =>
Logger.Info($"[{group.Key}] {evt.Message}"));
});

Pattern 3:資料緩衝批次寫入(Buffered Write)

場景

感測器資料流頻率很高,逐筆寫入資料庫效能差。需要累積一定數量或時間後批次寫入。

核心思路

Buffer 按時間窗口或元素數量收集,批次推送

程式碼

sensorDataStream
// 每 10 秒或累積 100 筆(先到者為準)
.Buffer(TimeSpan.FromSeconds(10), 100)
// 過濾空批次
.Where(batch => batch.Count > 0)
// 批次寫入資料庫
.SelectMany(batch => Observable.FromAsync(
() => repository.BulkInsertAsync(batch)))
.Subscribe(
rowsAffected => Logger.Debug($"已寫入 {rowsAffected} 筆"),
ex => Logger.Error($"批次寫入失敗: {ex.Message}"));

進階:依設備分組後分別緩衝

allSensorData
.GroupBy(d => d.DeviceId)
.SelectMany(group => group
.Buffer(TimeSpan.FromSeconds(10), 100)
.Where(batch => batch.Count > 0)
.SelectMany(batch => Observable.FromAsync(
() => repository.BulkInsertAsync(group.Key, batch))))
.Subscribe();

Pattern 4:斷線重連(Reconnection)

場景

設備通訊(Modbus TCP、SECS/GEM)隨時可能斷線。需要自動重連,且避免斷線期間瘋狂重試。

核心思路

Timeout 偵測斷線 → RetryWhen 搭配指數退避延遲重連

程式碼

public IObservable<DeviceData> CreateResilientStream(IDeviceConnection device)
{
return Observable.Defer(() =>
Observable.FromAsync(() => device.ConnectAsync())
.SelectMany(_ => device.GetDataStream()))
// 超過 5 秒沒資料 → 視為斷線
.Timeout(TimeSpan.FromSeconds(5))
// 指數退避重連
.RetryWhen(errors => errors
.Select((error, retryCount) => (error, retryCount))
.SelectMany(t =>
{
// 退避時間:2s, 4s, 8s, 16s, 32s, 上限 60s
var delay = TimeSpan.FromSeconds(
Math.Min(Math.Pow(2, t.retryCount + 1), 60));

Logger.Warn(
$"連線失敗 ({t.error.GetType().Name}): {t.error.Message}," +
$"{delay.TotalSeconds}s 後第 {t.retryCount + 1} 次重連");

return Observable.Timer(delay);
}));
}

使用範例:

var disposable = CreateResilientStream(modbusDevice)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(
data => UpdateUI(data),
ex => Logger.Error($"永久連線失敗: {ex.Message}"));

// 結束時取消訂閱
disposable.Dispose();

進階:搭配連線狀態通知

var connectionStatus = new BehaviorSubject<bool>(false);

CreateResilientStream(device)
.Do(
_ => { if (!connectionStatus.Value) connectionStatus.OnNext(true); },
_ => connectionStatus.OnNext(false))
.Subscribe(data => ProcessData(data));

// UI 綁定連線狀態
connectionStatus
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(connected =>
{
StatusIndicator.Fill = connected
? Brushes.Green
: Brushes.Red;
});

Pattern 5:去抖動(Debounce for UI)

場景

使用者在搜尋欄快速輸入,不應每次按鍵都觸發搜尋。只取使用者停止輸入後的最終文字。同樣適用於參數設定介面中的數值調整。

核心思路

Throttle(即 Debounce 語意)等待靜默期 → DistinctUntilChanged 避免重複搜尋 → SelectMany 執行搜尋 + Switch 取消前一次搜尋

程式碼

// 將 TextBox 的 TextChanged 事件轉為 Observable
var searchText = Observable.FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
h => SearchBox.TextChanged += h,
h => SearchBox.TextChanged -= h)
.Select(_ => SearchBox.Text);

searchText
// 停止輸入 300ms 後才觸發
.Throttle(TimeSpan.FromMilliseconds(300))
// 文字沒變就不搜尋
.DistinctUntilChanged()
// 新的搜尋自動取消前一次
.Select(text => Observable.FromAsync(() => SearchDevicesAsync(text)))
.Switch()
// 結果更新到 UI
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(results => DeviceList.ItemsSource = results);

進階:數值調整去抖動

// 使用者調整 Slider 設定設備參數,避免每一格都送命令
var sliderValues = Observable.FromEventPattern<RoutedPropertyChangedEventHandler<double>,
RoutedPropertyChangedEventArgs<double>>(
h => ParameterSlider.ValueChanged += h,
h => ParameterSlider.ValueChanged -= h)
.Select(ep => ep.EventArgs.NewValue);

sliderValues
.Throttle(TimeSpan.FromMilliseconds(500))
.DistinctUntilChanged()
.SelectMany(value => Observable.FromAsync(
() => device.SetParameterAsync("Speed", value)))
.Subscribe(
_ => Logger.Info("參數已更新"),
ex => Logger.Error($"參數設定失敗: {ex.Message}"));

Pattern 6:設備狀態機監控(State Change Detection)

場景

設備有多種狀態(Idle、Running、Error、Maintenance),需要偵測狀態轉換並記錄轉換事件,例如記錄設備何時從 Running 變成 Error。

核心思路

DistinctUntilChanged 偵測變化 → ScanBuffer(2,1) 取得前後狀態

程式碼

public enum DeviceState { Idle, Running, Error, Maintenance }

public record StateTransition(
DeviceState Previous,
DeviceState Current,
DateTime Timestamp);

// 偵測狀態轉換
IObservable<StateTransition> stateTransitions = deviceStateStream
.DistinctUntilChanged()
// Buffer(2, 1) 產生滑動視窗:[A,B], [B,C], [C,D]...
.Buffer(2, 1)
.Where(pair => pair.Count == 2)
.Select(pair => new StateTransition(
pair[0], pair[1], DateTime.Now));

使用範例:

stateTransitions.Subscribe(transition =>
{
Logger.Info(
$"[狀態轉換] {transition.Previous} → {transition.Current} " +
$"({transition.Timestamp:HH:mm:ss})");

// 進入 Error 狀態時告警
if (transition.Current == DeviceState.Error)
{
AlarmService.Raise(
$"設備從 {transition.Previous} 進入 Error 狀態");
}
});

進階:狀態持續時間追蹤

// 計算每個狀態持續了多久
deviceStateStream
.DistinctUntilChanged()
.TimeInterval() // 加上與前一個值的時間間隔
.Subscribe(ti =>
{
Logger.Info(
$"狀態 {ti.Value} 持續了 {ti.Interval.TotalSeconds:F1}s");
});

進階:狀態停留超時告警

// 設備在 Error 狀態停留超過 30 秒就升級告警
deviceStateStream
.DistinctUntilChanged()
.SelectMany(state => state == DeviceState.Error
? Observable.Timer(TimeSpan.FromSeconds(30))
.Select(_ => state)
.TakeUntil(deviceStateStream
.Where(s => s != DeviceState.Error))
: Observable.Empty<DeviceState>())
.Subscribe(_ => AlarmService.Escalate("設備持續 Error 超過 30 秒"));

Pattern 速查表

Pattern核心 Operators頁面參考
設備輪詢Interval + SelectMany + DistinctUntilChanged常用 Operators
多設備合併Merge / Merge + GroupBy常用 Operators
批次寫入Buffer + SelectMany常用 Operators
斷線重連Timeout + RetryWhen + Timer錯誤處理
去抖動Throttle + DistinctUntilChanged + Switch常用 Operators
狀態機DistinctUntilChanged + Buffer(2,1) / TimeInterval常用 Operators

延伸閱讀