Scheduler 與線程管理
Rx.NET 中的 Scheduler 控制「在哪個線程上」以及「在什麼時間點」執行工作。在 WPF 設備控制應用中,正確使用 Scheduler 是避免 UI 凍結和跨線程存取例外的關鍵。
什麼是 Scheduler
Scheduler 是 Rx 的抽象排程機制,負責:
- 決定工作在哪個線程執行(UI 線程?背景線程?線程池?)
- 控制時間相關操作(
Interval、Timer、Delay的計時) - 支援測試(TestScheduler 可以模擬時間流逝)
大部分帶時間參數的 operator 都有接受 IScheduler 的 overload:
// 使用預設 Scheduler
Observable.Interval(TimeSpan.FromSeconds(1));
// 明確指定 Scheduler(推薦,方便測試)
Observable.Interval(TimeSpan.FromSeconds(1), TaskPoolScheduler.Default);
內建 Scheduler
| Scheduler | 用途 | 特性 |
|---|---|---|
Scheduler.Default | 一般用途 | 平台最佳預設排程器 |
TaskPoolScheduler.Default | 背景工作 | 使用 TaskPool,適合 I/O 密集操作 |
NewThreadScheduler.Default | 專用線程 | 每次排程建立新線程,適合長時間獨佔操作 |
ThreadPoolScheduler.Instance | 線程池 | 使用 .NET ThreadPool |
CurrentThreadScheduler.Instance | 當前線程 | 在當前線程上排隊執行(trampoline) |
ImmediateScheduler.Instance | 立即執行 | 同步立即執行,不排隊 |
EventLoopScheduler | 事件迴圈 | 專用單線程,保證順序執行 |
選擇建議
需要背景執行 → TaskPoolScheduler.Default
需要專用線程(不阻塞) → EventLoopScheduler
需要 UI 更新 → DispatcherScheduler.Current
需要測試 → new TestScheduler()
WPF 整合:DispatcherScheduler
WPF 的 UI 元素只能在建立它的線程(UI Dispatcher Thread)上存取。在背景線程產生的資料要更新到 UI,必須切回 UI 線程。
安裝 WPF 專用套件:
dotnet add package System.Reactive.Windows.Threading
using System.Reactive.Concurrency;
// 方法 1:使用 DispatcherScheduler
observable
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => TextBlock1.Text = value.ToString());
// 方法 2:使用 ObserveOnDispatcher 擴充方法
observable
.ObserveOnDispatcher()
.Subscribe(value => TextBlock1.Text = value.ToString());
SubscribeOn vs ObserveOn
這是 Rx.NET 中最常搞混的概念。兩者都是切換線程,但影響的位置不同。
ObserveOn — 控制下游通知的執行線程
ObserveOn 影響的是它之後的 operator 和 Subscribe 回呼在哪個線程執行。
sourceObservable // 在任意線程產生資料
.Where(x => x > 100) // 在來源線程執行
.ObserveOn(DispatcherScheduler.Current) // ← 切換點
.Select(x => $"{x}°C") // 在 UI 線程執行
.Subscribe(text => Label.Content = text); // 在 UI 線程執行
SubscribeOn — 控制訂閱(啟動)的執行線程
SubscribeOn 影響的是整個訂閱鏈的啟動在哪個線程執行。這決定了 Observable.Create 內的程式碼、或事件的註冊在哪個線程發生。
Observable.Create<int>(observer =>
{
// 這段程式碼在 SubscribeOn 指定的線程執行
var value = ReadPlcRegister(); // 阻塞式 I/O
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
})
.SubscribeOn(TaskPoolScheduler.Default) // 訂閱在背景線程
.ObserveOn(DispatcherScheduler.Current) // 通知切到 UI 線程
.Subscribe(value => DisplayValue.Text = value.ToString());
關鍵差異一覽
| 項目 | SubscribeOn | ObserveOn |
|---|---|---|
| 影響什麼 | 訂閱(啟動)的線程 | 通知(OnNext/OnError/OnCompleted)的線程 |
| 放在哪裡都一樣? | 是,位置不影響效果 | 否,只影響它之後的 operator |
| 常用場景 | 將阻塞式 I/O 移到背景 | 將結果切回 UI 線程 |
| 使用頻率 | 較少(大多非同步操作已在背景) | 較多(幾乎每個 UI 綁定都需要) |
圖解
SubscribeOn(背景線程):
Subscribe() ─────────────────────────────── 背景線程
│
▼
Create/事件註冊 ─────────────────────────── 背景線程
│
▼
OnNext → Where → Select ─────────────────── 背景線程
│
▼
ObserveOn(UI 線程) ──────────────────── 切換點
│
▼
Subscribe 回呼 ──────────────────────────── UI 線程
最佳實踐
1. 只在最後一步切 UI 線程
將 ObserveOn(DispatcherScheduler.Current) 放在 Subscribe 之前,讓所有運算都在背景完成:
// ✅ 好:運算在背景,只有更新 UI 在 UI 線程
deviceStream
.Where(d => d.IsValid)
.Select(d => FormatReading(d))
.Buffer(TimeSpan.FromSeconds(1))
.ObserveOn(DispatcherScheduler.Current) // 最後才切
.Subscribe(batch => UpdateChart(batch));
// ❌ 差:太早切到 UI 線程,運算阻塞 UI
deviceStream
.ObserveOn(DispatcherScheduler.Current) // 太早!
.Where(d => d.IsValid) // 在 UI 線程過濾
.Select(d => FormatReading(d)) // 在 UI 線程轉換
.Buffer(TimeSpan.FromSeconds(1)) // 在 UI 線程緩衝
.Subscribe(batch => UpdateChart(batch));
2. 明確傳入 Scheduler 參數
讓時間相關的 operator 接受 Scheduler 參數,便於單元測試替換為 TestScheduler:
// ✅ 可測試:Scheduler 由外部傳入
public IObservable<DeviceReading> CreatePollingStream(
IScheduler scheduler = null)
{
scheduler ??= TaskPoolScheduler.Default;
return Observable.Interval(TimeSpan.FromMilliseconds(500), scheduler)
.SelectMany(_ => ReadDeviceAsync());
}
// 測試時
var testScheduler = new TestScheduler();
var stream = CreatePollingStream(testScheduler);
3. 長時間獨佔操作用 EventLoopScheduler
對於需要持續獨佔一個線程的操作(如持續的 serial port 讀取),使用 EventLoopScheduler 而非 NewThreadScheduler:
// 專用線程持續讀取設備,保證順序
var deviceScheduler = new EventLoopScheduler();
serialPortStream
.SubscribeOn(deviceScheduler)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(data => UpdateUI(data));
// 記得在結束時 Dispose
deviceScheduler.Dispose();
延伸閱讀:
- 測試 Rx 程式碼 — 用 TestScheduler 測試時間相關邏輯
- 工業場景 Pattern — Scheduler 在實際場景中的應用
- 最佳實踐 — 更多線程管理建議