Rx.NET 技術指南
範圍:所有使用 GST 框架進行設備控制與監控的開發人員
對象:需要處理即時資料流、事件驅動邏輯的開發人員
先備知識:基本 C# 與 .NET 知識、async/await 概念
預估閱讀時間:1.5–2 小時(完整指南)
System.Reactive 版本:6.1.0(穩定版)
什麼是 Reactive Extensions
Reactive Extensions for .NET(Rx.NET) 是一套用於組合非同步與事件驅動程式的函式庫。它以 IObservable<T> 和 IObserver<T> 為核心抽象,將事件流視為可查詢的資料序列,讓你用 LINQ 風格的 operator 來建立、過濾、轉換、合併資料流。
核心模型
Producer(資料來源) ──push──▶ IObservable<T> ──subscribe──▶ IObserver<T>(消費者)
兩個核心介面:
// 資料來源:可被訂閱的事件流
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
// 消費者:接收事件通知
public interface IObserver<in T>
{
void OnNext(T value); // 收到新值
void OnError(Exception e); // 發生錯誤(流結束)
void OnCompleted(); // 正常完成(流結束)
}
關鍵特性:
- Push-based:資料由來源主動推送,而非消費者拉取
- OnError / OnCompleted 是終止信號:一旦觸發,流就結束,不會再有 OnNext
- Subscribe 回傳 IDisposable:用來取消訂閱、釋放資源
為什麼選 Rx.NET
在工業自動化場景中,我們經常面對:PLC 數值變化、設備狀態事件、感測器資料流、通訊斷線重連。這些本質上都是「隨時間推移的事件序列」。
| 方案 | 適合場景 | 不適合場景 |
|---|---|---|
| 傳統 event | 簡單的一對一通知 | 多事件組合、背壓控制、錯誤重試 |
| async/await | 單次非同步操作 | 持續的資料流、多來源合併 |
| Timer + polling | 最簡單的定時讀取 | 去重、節流、條件組合、錯誤恢復 |
| Rx.NET | 事件流組合、多來源合併、背壓節流、自動重試 | 單次 request-response |
Rx.NET 的優勢在於 宣告式組合:你描述「要什麼」而不是「怎麼做」。
// 傳統做法:散落的狀態管理
Timer timer = new Timer(500);
string lastValue = null;
timer.Elapsed += (s, e) => {
var value = ReadPlcRegister();
if (value != lastValue) {
lastValue = value;
UpdateUI(value);
}
};
// Rx 做法:宣告式管道
Observable.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(_ => ReadPlcRegisterAsync())
.DistinctUntilChanged()
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => UpdateUI(value));
Operator Pipeline 訊號流
上面的 Rx 做法本質上是把資料源接上一連串 operator,訊號依序流經每個節點並被轉換。以 .Where → .Select → .Throttle → Subscriber 為例:
各節點的職責:
- Source:資料源(如
Observable.Interval、Subject<T>、FromEventPattern) .Where:過濾—只讓符合條件的值通過.Select:轉換—把每個值映射為新的值.Throttle:節流—在指定時間窗內只取最後一個值,避免下游被洗頻.Subscribe:終點—實際消費訊號(UI 更新、log、落地等)
關鍵心智模型:operator 都是 lazy 的。在你呼叫 .Subscribe 之前,整條管道只是一個設定,不會開始產生值。完整 operator 清單見常用 Operators。
Cold vs Hot Observable
這是 Rx.NET 最常搞混的概念之一。
Cold Observable
- 每次 Subscribe 都會啟動一個新的資料流
- 類似播放影片:每個觀眾從頭看起
- 大部分 factory method 建立的都是 Cold(
Observable.Create、Observable.Interval、Observable.Timer)
下圖展示兩位訂閱者在不同時間點訂閱同一個 Cold Observable,各自都從 0 開始收到完整序列:
對應的程式碼:
var cold = Observable.Interval(TimeSpan.FromSeconds(1));
// 訂閱者 A 和 B 各自獨立,各自從 0 開始
cold.Subscribe(x => Console.WriteLine($"A: {x}"));
await Task.Delay(3000);
cold.Subscribe(x => Console.WriteLine($"B: {x}")); // B 從 0 開始,不是從 3
Hot Observable
- 不論有沒有訂閱者都在產生資料
- 類似現場直播:晚加入的觀眾錯過前面的內容
Subject<T>、事件轉換(FromEventPattern)、Publish().RefCount()都是 Hot
同樣情境下,Hot Observable 的行為截然不同——後到的訂閱者只能收到訂閱之後的值:
對應的程式碼:
var subject = new Subject<int>();
subject.Subscribe(x => Console.WriteLine($"A: {x}"));
subject.OnNext(1); // A 收到 1
subject.OnNext(2); // A 收到 2
subject.Subscribe(x => Console.WriteLine($"B: {x}"));
subject.OnNext(3); // A 和 B 都收到 3,B 沒收到 1 和 2
Cold 轉 Hot
使用 Publish().RefCount() 將 Cold Observable 轉為 Hot,讓多個訂閱者共享同一個資料流:
var shared = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish()
.RefCount(); // 第一個訂閱者啟動,最後一個取消訂閱時停止
安裝方式
透過 NuGet 安裝:
dotnet add package System.Reactive
WPF 專案額外安裝(提供 DispatcherScheduler):
dotnet add package System.Reactive.Windows.Threading
常用 namespace:
using System.Reactive;
using System.Reactive.Linq; // 所有 operator
using System.Reactive.Disposables; // CompositeDisposable
using System.Reactive.Concurrency; // Scheduler
using System.Reactive.Subjects; // Subject<T>
Hello World
最簡單的範例——建立一個 Observable 並訂閱:
using System.Reactive.Linq;
// 建立一個發出 1, 2, 3 的 Observable
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Empty;
});
// 訂閱並印出
source.Subscribe(
value => Console.WriteLine($"收到: {value}"),
error => Console.WriteLine($"錯誤: {error.Message}"),
() => Console.WriteLine("完成")
);
// 輸出:
// 收到: 1
// 收到: 2
// 收到: 3
// 完成
一個更貼近實際場景的範例——每秒讀取設備溫度:
Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadTemperatureAsync())
.Where(temp => temp > 80.0)
.Subscribe(temp =>
Console.WriteLine($"[警告] 溫度過高: {temp}°C"));
本指南結構
| 頁面 | 內容 |
|---|---|
| 概觀(本頁) | 核心模型、為什麼選 Rx.NET、Cold vs Hot |
| 常用 Operators | 建立、轉換、過濾、合併、聚合 |
| 錯誤處理 | Catch、Retry、Timeout、斷線重連 |
| Scheduler 與線程管理 | SubscribeOn vs ObserveOn、WPF 整合 |
| 工業場景 Pattern | 輪詢、合併、緩衝、重連、去抖動、狀態機 |
| 測試 Rx 程式碼 | TestScheduler、虛擬時間測試 |
| 最佳實踐 | 生命週期管理、常見陷阱 |