常用 Operators
Rx.NET 的強大之處在於豐富的 operator 組合能力。本頁按分類整理最常用的 operators,每個附上簡短範例。
所有 operator 都在
System.Reactive.Linqnamespace 下。
分類總覽
實務上挑 operator 時,先想「這個 operator 屬於哪一類?」可以大幅縮小搜尋範圍。下圖把常用 operator 依用途分成五類,並標示它們在 pipeline 中的位置(介於 Source 與 Subscriber 之間,可任意串接):
各分類的選用時機:
| 分類 | 「我想做什麼?」 | 代表 operator |
|---|---|---|
| Transform | 「把每個值轉成另一個 / 拆解成多個」 | Select, SelectMany, Scan, Buffer, Window |
| Filter | 「只保留符合條件的值 / 去重」 | Where, DistinctUntilChanged, Take, Skip, Sample |
| Combine | 「把多個 Observable 合成一個」 | Merge, Zip, CombineLatest, Concat, Switch, Amb |
| Time | 「依時間節流 / 延遲 / 等待 / 加時間戳」 | Throttle, Debounce, Delay, Timeout, TimeInterval, Timestamp |
| Aggregate | 「對整段流做總和 / 折疊」 | Count, Sum, Min, Max, Aggregate |
註:本頁文件章節依照「建立 → 轉換 → 過濾 → 合併 → 聚合」的傳統順序組織(Creation 獨立成一節);上圖則以用途分類呈現,目的是幫助選擇 operator。下方各章節會詳列每個 operator 的參數與範例。
建立(Creation)
Observable.Create
手動建立 Observable,完全控制何時推送值。
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnCompleted();
return Disposable.Empty;
});
Observable.Return
建立只發出單一值然後完成的 Observable。
var single = Observable.Return(42);
// 發出 42,然後 OnCompleted
Observable.Empty
建立不發出任何值、直接完成的 Observable。
var empty = Observable.Empty<int>();
// 直接 OnCompleted
Observable.Timer
延遲一段時間後發出單一值(0L),然後完成。
// 3 秒後發出 0L
var delayed = Observable.Timer(TimeSpan.FromSeconds(3));
Observable.Interval
每隔固定時間發出遞增的 long 值(0, 1, 2...),不會自動完成。
// 每 500ms 發出一個值
var ticker = Observable.Interval(TimeSpan.FromMilliseconds(500));
Observable.FromEventPattern
將 .NET 事件轉換為 Observable。
// 將按鈕 Click 事件轉為 Observable
var clicks = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
h => button.Click += h,
h => button.Click -= h);
clicks.Subscribe(ep => Console.WriteLine("按鈕被點擊"));
Observable.FromAsync
將 async 方法包裝為 Observable,每次 Subscribe 執行一次。
var result = Observable.FromAsync(() => ReadDeviceStatusAsync());
// 訂閱時執行 ReadDeviceStatusAsync,完成後發出結果
轉換(Transformation)
Select
對每個元素套用轉換函式(等同 LINQ 的 Select / map)。
Observable.Range(1, 5)
.Select(x => x * 10)
.Subscribe(x => Console.WriteLine(x));
// 10, 20, 30, 40, 50
SelectMany
將每個元素展開為一個 Observable,然後攤平合併。常用於非同步操作。
Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(() => ReadSensorAsync()))
.Subscribe(value => Console.WriteLine($"感測器: {value}"));
Scan
累積運算,每個元素都輸出當前累積值(類似 LINQ 的 Aggregate,但每步都有輸出)。
// 計算累積產量
productionEvents
.Scan(0, (total, batch) => total + batch.Quantity)
.Subscribe(total => Console.WriteLine($"累積產量: {total}"));
Buffer
將元素收集到緩衝區,按時間或數量分批輸出。
// 每 5 秒或每 100 筆收集一批
sensorData
.Buffer(TimeSpan.FromSeconds(5), 100)
.Subscribe(batch => Console.WriteLine($"收到 {batch.Count} 筆"));
Window
類似 Buffer,但輸出的是 IObservable<IObservable<T>>(巢狀 Observable),適合需要對每個子視窗套用 operator 的場景。
sensorData
.Window(TimeSpan.FromSeconds(10))
.SelectMany(window => window.Average())
.Subscribe(avg => Console.WriteLine($"10 秒平均值: {avg}"));
過濾(Filtering)
Where
過濾元素,只保留符合條件的。
temperatureStream
.Where(temp => temp > 80.0)
.Subscribe(temp => Console.WriteLine($"高溫警告: {temp}°C"));
DistinctUntilChanged
過濾連續重複值,只在值改變時推送。設備輪詢必備。
// PLC 每 500ms 讀一次,值沒變就不推送
plcPoller
.DistinctUntilChanged()
.Subscribe(value => UpdateDisplay(value));
Throttle
在元素之後等待一段靜默期,如果靜默期內有新元素就重新計時。只推送每段密集事件的最後一個。
// 設備狀態快速變化時,只取穩定後的最終狀態
deviceStatus
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(status => LogStableStatus(status));
Debounce
Throttle 的別名。在 Rx.NET 中 Throttle 就是 Debounce 語意。
Sample
固定間隔取樣,取該間隔內的最新值。
// 高頻資料流每秒取樣一次
highFrequencyStream
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(value => UpdateChart(value));
Take / Skip
Take(n):只取前 n 個元素。Skip(n):跳過前 n 個元素。
// 開機後跳過前 5 筆不穩定的讀數
sensorStream
.Skip(5)
.Take(100)
.Subscribe(value => Calibrate(value));
合併(Combination)
Merge
將多個 Observable 合併為一個,誰先來就先推。
// 多台設備的警報合併到統一頻道
var allAlarms = Observable.Merge(plc1Alarms, plc2Alarms, plc3Alarms);
allAlarms.Subscribe(alarm => NotifyOperator(alarm));
CombineLatest
當任一來源有新值時,將所有來源的最新值組合推送。
// 組合溫度和壓力的最新讀數
temperatureStream.CombineLatest(pressureStream,
(temp, pressure) => new { Temperature = temp, Pressure = pressure })
.Subscribe(reading => UpdateDashboard(reading));
Zip
將多個來源依序配對,每個來源各取一個元素。
// 將命令和回應配對
commands.Zip(responses,
(cmd, resp) => new { Command = cmd, Response = resp })
.Subscribe(pair => LogCommandResult(pair));
Concat
依序串接多個 Observable,前一個完成後才開始下一個。
// 初始化流程:先連線,再校準,再開始讀值
var startup = connectStream
.Concat(calibrateStream)
.Concat(readStream);
Switch
訂閱最新的內部 Observable,自動取消前一個訂閱。
// 使用者切換監控設備時,自動切換資料流
selectedDevice
.Select(device => device.GetDataStream())
.Switch()
.Subscribe(data => UpdateDisplay(data));
Amb
多個來源競爭,只保留最先發出值的那個,其他全部取消。
// 主備伺服器,誰先回應用誰
var fastest = Observable.Amb(primaryServer, backupServer);
聚合(Aggregation)
聚合 operators 等待 Observable 完成後才輸出結果。適合有限序列。
Count
計算元素總數。
batchEvents
.Count()
.Subscribe(count => Console.WriteLine($"本批共 {count} 個事件"));
Sum
計算數值總和。
productionOutput
.Sum()
.Subscribe(total => Console.WriteLine($"總產量: {total}"));
Aggregate
自訂累積運算(類似 LINQ 的 Aggregate / reduce)。
// 計算最大偏差
measurements
.Aggregate(0.0, (maxDev, m) => Math.Max(maxDev, Math.Abs(m.Deviation)))
.Subscribe(maxDev => Console.WriteLine($"最大偏差: {maxDev}"));
快速查找表
| 需求 | Operator | 頁面 |
|---|---|---|
| 定時執行 | Interval, Timer | 本頁 |
| 值沒變就不推 | DistinctUntilChanged | 本頁 |
| 事件太快要節流 | Throttle, Sample | 本頁 |
| 多來源合併 | Merge | 本頁 |
| 非同步操作串接 | SelectMany | 本頁 |
| 批次收集 | Buffer | 本頁 |
| 錯誤重試 | Retry, Catch | 錯誤處理 |
| 切線程到 UI | ObserveOn | Scheduler |
| 逾時控制 | Timeout | 錯誤處理 |
下一步:錯誤處理 — 學習如何優雅處理資料流中的錯誤