メインコンテンツまでスキップ

常用 Operators

Rx.NET 的強大之處在於豐富的 operator 組合能力。本頁按分類整理最常用的 operators,每個附上簡短範例。

所有 operator 都在 System.Reactive.Linq namespace 下。


分類總覽

實務上挑 operator 時,先想「這個 operator 屬於哪一類?」可以大幅縮小搜尋範圍。下圖把常用 operator 依用途分成五類,並標示它們在 pipeline 中的位置(介於 Source 與 Subscriber 之間,可任意串接):

各分類的選用時機:

分類「我想做什麼?」代表 operator
Transform「把每個值轉成另一個 / 拆解成多個」Select, SelectMany, Scan, Buffer, Window
Filter「只保留符合條件的值 / 去重」Where, DistinctUntilChanged, Take, Skip, Sample
Combine「把多個 Observable 合成一個」Merge, Zip, CombineLatest, Concat, Switch, Amb
Time「依時間節流 / 延遲 / 等待 / 加時間戳」Throttle, Debounce, Delay, Timeout, TimeInterval, Timestamp
Aggregate「對整段流做總和 / 折疊」Count, Sum, Min, Max, Aggregate

:本頁文件章節依照「建立 → 轉換 → 過濾 → 合併 → 聚合」的傳統順序組織(Creation 獨立成一節);上圖則以用途分類呈現,目的是幫助選擇 operator。下方各章節會詳列每個 operator 的參數與範例。


建立(Creation)

Observable.Create

手動建立 Observable,完全控制何時推送值。

var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnCompleted();
return Disposable.Empty;
});

Observable.Return

建立只發出單一值然後完成的 Observable。

var single = Observable.Return(42);
// 發出 42,然後 OnCompleted

Observable.Empty

建立不發出任何值、直接完成的 Observable。

var empty = Observable.Empty<int>();
// 直接 OnCompleted

Observable.Timer

延遲一段時間後發出單一值(0L),然後完成。

// 3 秒後發出 0L
var delayed = Observable.Timer(TimeSpan.FromSeconds(3));

Observable.Interval

每隔固定時間發出遞增的 long 值(0, 1, 2...),不會自動完成。

// 每 500ms 發出一個值
var ticker = Observable.Interval(TimeSpan.FromMilliseconds(500));

Observable.FromEventPattern

將 .NET 事件轉換為 Observable。

// 將按鈕 Click 事件轉為 Observable
var clicks = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
h => button.Click += h,
h => button.Click -= h);

clicks.Subscribe(ep => Console.WriteLine("按鈕被點擊"));

Observable.FromAsync

將 async 方法包裝為 Observable,每次 Subscribe 執行一次。

var result = Observable.FromAsync(() => ReadDeviceStatusAsync());
// 訂閱時執行 ReadDeviceStatusAsync,完成後發出結果

轉換(Transformation)

Select

對每個元素套用轉換函式(等同 LINQ 的 Select / map)。

Observable.Range(1, 5)
.Select(x => x * 10)
.Subscribe(x => Console.WriteLine(x));
// 10, 20, 30, 40, 50

SelectMany

將每個元素展開為一個 Observable,然後攤平合併。常用於非同步操作。

Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(() => ReadSensorAsync()))
.Subscribe(value => Console.WriteLine($"感測器: {value}"));

Scan

累積運算,每個元素都輸出當前累積值(類似 LINQ 的 Aggregate,但每步都有輸出)。

// 計算累積產量
productionEvents
.Scan(0, (total, batch) => total + batch.Quantity)
.Subscribe(total => Console.WriteLine($"累積產量: {total}"));

Buffer

將元素收集到緩衝區,按時間或數量分批輸出。

// 每 5 秒或每 100 筆收集一批
sensorData
.Buffer(TimeSpan.FromSeconds(5), 100)
.Subscribe(batch => Console.WriteLine($"收到 {batch.Count} 筆"));

Window

類似 Buffer,但輸出的是 IObservable<IObservable<T>>(巢狀 Observable),適合需要對每個子視窗套用 operator 的場景。

sensorData
.Window(TimeSpan.FromSeconds(10))
.SelectMany(window => window.Average())
.Subscribe(avg => Console.WriteLine($"10 秒平均值: {avg}"));

過濾(Filtering)

Where

過濾元素,只保留符合條件的。

temperatureStream
.Where(temp => temp > 80.0)
.Subscribe(temp => Console.WriteLine($"高溫警告: {temp}°C"));

DistinctUntilChanged

過濾連續重複值,只在值改變時推送。設備輪詢必備。

// PLC 每 500ms 讀一次,值沒變就不推送
plcPoller
.DistinctUntilChanged()
.Subscribe(value => UpdateDisplay(value));

Throttle

在元素之後等待一段靜默期,如果靜默期內有新元素就重新計時。只推送每段密集事件的最後一個

// 設備狀態快速變化時,只取穩定後的最終狀態
deviceStatus
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(status => LogStableStatus(status));

Debounce

Throttle 的別名。在 Rx.NET 中 Throttle 就是 Debounce 語意。

Sample

固定間隔取樣,取該間隔內的最新值。

// 高頻資料流每秒取樣一次
highFrequencyStream
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(value => UpdateChart(value));

Take / Skip

Take(n):只取前 n 個元素。Skip(n):跳過前 n 個元素。

// 開機後跳過前 5 筆不穩定的讀數
sensorStream
.Skip(5)
.Take(100)
.Subscribe(value => Calibrate(value));

合併(Combination)

Merge

將多個 Observable 合併為一個,誰先來就先推。

// 多台設備的警報合併到統一頻道
var allAlarms = Observable.Merge(plc1Alarms, plc2Alarms, plc3Alarms);
allAlarms.Subscribe(alarm => NotifyOperator(alarm));

CombineLatest

當任一來源有新值時,將所有來源的最新值組合推送。

// 組合溫度和壓力的最新讀數
temperatureStream.CombineLatest(pressureStream,
(temp, pressure) => new { Temperature = temp, Pressure = pressure })
.Subscribe(reading => UpdateDashboard(reading));

Zip

將多個來源依序配對,每個來源各取一個元素。

// 將命令和回應配對
commands.Zip(responses,
(cmd, resp) => new { Command = cmd, Response = resp })
.Subscribe(pair => LogCommandResult(pair));

Concat

依序串接多個 Observable,前一個完成後才開始下一個。

// 初始化流程:先連線,再校準,再開始讀值
var startup = connectStream
.Concat(calibrateStream)
.Concat(readStream);

Switch

訂閱最新的內部 Observable,自動取消前一個訂閱。

// 使用者切換監控設備時,自動切換資料流
selectedDevice
.Select(device => device.GetDataStream())
.Switch()
.Subscribe(data => UpdateDisplay(data));

Amb

多個來源競爭,只保留最先發出值的那個,其他全部取消。

// 主備伺服器,誰先回應用誰
var fastest = Observable.Amb(primaryServer, backupServer);

聚合(Aggregation)

聚合 operators 等待 Observable 完成後才輸出結果。適合有限序列。

Count

計算元素總數。

batchEvents
.Count()
.Subscribe(count => Console.WriteLine($"本批共 {count} 個事件"));

Sum

計算數值總和。

productionOutput
.Sum()
.Subscribe(total => Console.WriteLine($"總產量: {total}"));

Aggregate

自訂累積運算(類似 LINQ 的 Aggregate / reduce)。

// 計算最大偏差
measurements
.Aggregate(0.0, (maxDev, m) => Math.Max(maxDev, Math.Abs(m.Deviation)))
.Subscribe(maxDev => Console.WriteLine($"最大偏差: {maxDev}"));

快速查找表

需求Operator頁面
定時執行Interval, Timer本頁
值沒變就不推DistinctUntilChanged本頁
事件太快要節流Throttle, Sample本頁
多來源合併Merge本頁
非同步操作串接SelectMany本頁
批次收集Buffer本頁
錯誤重試Retry, Catch錯誤處理
切線程到 UIObserveOnScheduler
逾時控制Timeout錯誤處理

下一步錯誤處理 — 學習如何優雅處理資料流中的錯誤