Skip to main content

Rx.NET 技術指南

範圍:所有使用 GST 框架進行設備控制與監控的開發人員

對象:需要處理即時資料流、事件驅動邏輯的開發人員

先備知識:基本 C# 與 .NET 知識、async/await 概念

預估閱讀時間:1.5–2 小時(完整指南)

System.Reactive 版本:6.1.0(穩定版)


什麼是 Reactive Extensions

Reactive Extensions for .NET(Rx.NET) 是一套用於組合非同步與事件驅動程式的函式庫。它以 IObservable<T>IObserver<T> 為核心抽象,將事件流視為可查詢的資料序列,讓你用 LINQ 風格的 operator 來建立、過濾、轉換、合併資料流。

核心模型

Producer(資料來源)  ──push──▶  IObservable<T>  ──subscribe──▶  IObserver<T>(消費者)

兩個核心介面:

// 資料來源:可被訂閱的事件流
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}

// 消費者:接收事件通知
public interface IObserver<in T>
{
void OnNext(T value); // 收到新值
void OnError(Exception e); // 發生錯誤(流結束)
void OnCompleted(); // 正常完成(流結束)
}

關鍵特性:

  • Push-based:資料由來源主動推送,而非消費者拉取
  • OnError / OnCompleted 是終止信號:一旦觸發,流就結束,不會再有 OnNext
  • Subscribe 回傳 IDisposable:用來取消訂閱、釋放資源

為什麼選 Rx.NET

在工業自動化場景中,我們經常面對:PLC 數值變化、設備狀態事件、感測器資料流、通訊斷線重連。這些本質上都是「隨時間推移的事件序列」。

方案適合場景不適合場景
傳統 event簡單的一對一通知多事件組合、背壓控制、錯誤重試
async/await單次非同步操作持續的資料流、多來源合併
Timer + polling最簡單的定時讀取去重、節流、條件組合、錯誤恢復
Rx.NET事件流組合、多來源合併、背壓節流、自動重試單次 request-response

Rx.NET 的優勢在於 宣告式組合:你描述「要什麼」而不是「怎麼做」。

// 傳統做法:散落的狀態管理
Timer timer = new Timer(500);
string lastValue = null;
timer.Elapsed += (s, e) => {
var value = ReadPlcRegister();
if (value != lastValue) {
lastValue = value;
UpdateUI(value);
}
};

// Rx 做法:宣告式管道
Observable.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(_ => ReadPlcRegisterAsync())
.DistinctUntilChanged()
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => UpdateUI(value));

Operator Pipeline 訊號流

上面的 Rx 做法本質上是把資料源接上一連串 operator,訊號依序流經每個節點並被轉換。以 .Where → .Select → .Throttle → Subscriber 為例:

各節點的職責:

  • Source:資料源(如 Observable.IntervalSubject<T>FromEventPattern
  • .Where:過濾—只讓符合條件的值通過
  • .Select:轉換—把每個值映射為新的值
  • .Throttle:節流—在指定時間窗內只取最後一個值,避免下游被洗頻
  • .Subscribe:終點—實際消費訊號(UI 更新、log、落地等)

關鍵心智模型:operator 都是 lazy 的。在你呼叫 .Subscribe 之前,整條管道只是一個設定,不會開始產生值。完整 operator 清單見常用 Operators


Cold vs Hot Observable

這是 Rx.NET 最常搞混的概念之一。

Cold Observable

  • 每次 Subscribe 都會啟動一個新的資料流
  • 類似播放影片:每個觀眾從頭看起
  • 大部分 factory method 建立的都是 Cold(Observable.CreateObservable.IntervalObservable.Timer

下圖展示兩位訂閱者在不同時間點訂閱同一個 Cold Observable,各自都從 0 開始收到完整序列:

對應的程式碼:

var cold = Observable.Interval(TimeSpan.FromSeconds(1));

// 訂閱者 A 和 B 各自獨立,各自從 0 開始
cold.Subscribe(x => Console.WriteLine($"A: {x}"));
await Task.Delay(3000);
cold.Subscribe(x => Console.WriteLine($"B: {x}")); // B 從 0 開始,不是從 3

Hot Observable

  • 不論有沒有訂閱者都在產生資料
  • 類似現場直播:晚加入的觀眾錯過前面的內容
  • Subject<T>、事件轉換(FromEventPattern)、Publish().RefCount() 都是 Hot

同樣情境下,Hot Observable 的行為截然不同——後到的訂閱者只能收到訂閱之後的值:

對應的程式碼:

var subject = new Subject<int>();

subject.Subscribe(x => Console.WriteLine($"A: {x}"));
subject.OnNext(1); // A 收到 1
subject.OnNext(2); // A 收到 2

subject.Subscribe(x => Console.WriteLine($"B: {x}"));
subject.OnNext(3); // A 和 B 都收到 3,B 沒收到 1 和 2

Cold 轉 Hot

使用 Publish().RefCount() 將 Cold Observable 轉為 Hot,讓多個訂閱者共享同一個資料流:

var shared = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish()
.RefCount(); // 第一個訂閱者啟動,最後一個取消訂閱時停止

安裝方式

透過 NuGet 安裝:

dotnet add package System.Reactive

WPF 專案額外安裝(提供 DispatcherScheduler):

dotnet add package System.Reactive.Windows.Threading

常用 namespace:

using System.Reactive;
using System.Reactive.Linq; // 所有 operator
using System.Reactive.Disposables; // CompositeDisposable
using System.Reactive.Concurrency; // Scheduler
using System.Reactive.Subjects; // Subject<T>

Hello World

最簡單的範例——建立一個 Observable 並訂閱:

using System.Reactive.Linq;

// 建立一個發出 1, 2, 3 的 Observable
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Empty;
});

// 訂閱並印出
source.Subscribe(
value => Console.WriteLine($"收到: {value}"),
error => Console.WriteLine($"錯誤: {error.Message}"),
() => Console.WriteLine("完成")
);

// 輸出:
// 收到: 1
// 收到: 2
// 收到: 3
// 完成

一個更貼近實際場景的範例——每秒讀取設備溫度:

Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadTemperatureAsync())
.Where(temp => temp > 80.0)
.Subscribe(temp =>
Console.WriteLine($"[警告] 溫度過高: {temp}°C"));

本指南結構

頁面內容
概觀(本頁)核心模型、為什麼選 Rx.NET、Cold vs Hot
常用 Operators建立、轉換、過濾、合併、聚合
錯誤處理Catch、Retry、Timeout、斷線重連
Scheduler 與線程管理SubscribeOn vs ObserveOn、WPF 整合
工業場景 Pattern輪詢、合併、緩衝、重連、去抖動、狀態機
測試 Rx 程式碼TestScheduler、虛擬時間測試
最佳實踐生命週期管理、常見陷阱