Skip to main content

Scheduler 與線程管理

Rx.NET 中的 Scheduler 控制「在哪個線程上」以及「在什麼時間點」執行工作。在 WPF 設備控制應用中,正確使用 Scheduler 是避免 UI 凍結和跨線程存取例外的關鍵。


什麼是 Scheduler

Scheduler 是 Rx 的抽象排程機制,負責:

  1. 決定工作在哪個線程執行(UI 線程?背景線程?線程池?)
  2. 控制時間相關操作IntervalTimerDelay 的計時)
  3. 支援測試TestScheduler 可以模擬時間流逝)

大部分帶時間參數的 operator 都有接受 IScheduler 的 overload:

// 使用預設 Scheduler
Observable.Interval(TimeSpan.FromSeconds(1));

// 明確指定 Scheduler(推薦,方便測試)
Observable.Interval(TimeSpan.FromSeconds(1), TaskPoolScheduler.Default);

內建 Scheduler

Scheduler用途特性
Scheduler.Default一般用途平台最佳預設排程器
TaskPoolScheduler.Default背景工作使用 TaskPool,適合 I/O 密集操作
NewThreadScheduler.Default專用線程每次排程建立新線程,適合長時間獨佔操作
ThreadPoolScheduler.Instance線程池使用 .NET ThreadPool
CurrentThreadScheduler.Instance當前線程在當前線程上排隊執行(trampoline)
ImmediateScheduler.Instance立即執行同步立即執行,不排隊
EventLoopScheduler事件迴圈專用單線程,保證順序執行

選擇建議

需要背景執行          → TaskPoolScheduler.Default
需要專用線程(不阻塞) → EventLoopScheduler
需要 UI 更新 → DispatcherScheduler.Current
需要測試 → new TestScheduler()

WPF 整合:DispatcherScheduler

WPF 的 UI 元素只能在建立它的線程(UI Dispatcher Thread)上存取。在背景線程產生的資料要更新到 UI,必須切回 UI 線程。

安裝 WPF 專用套件:

dotnet add package System.Reactive.Windows.Threading
using System.Reactive.Concurrency;

// 方法 1:使用 DispatcherScheduler
observable
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(value => TextBlock1.Text = value.ToString());

// 方法 2:使用 ObserveOnDispatcher 擴充方法
observable
.ObserveOnDispatcher()
.Subscribe(value => TextBlock1.Text = value.ToString());

SubscribeOn vs ObserveOn

這是 Rx.NET 中最常搞混的概念。兩者都是切換線程,但影響的位置不同。

ObserveOn — 控制下游通知的執行線程

ObserveOn 影響的是它之後的 operator 和 Subscribe 回呼在哪個線程執行。

sourceObservable              // 在任意線程產生資料
.Where(x => x > 100) // 在來源線程執行
.ObserveOn(DispatcherScheduler.Current) // ← 切換點
.Select(x => $"{x}°C") // 在 UI 線程執行
.Subscribe(text => Label.Content = text); // 在 UI 線程執行

SubscribeOn — 控制訂閱(啟動)的執行線程

SubscribeOn 影響的是整個訂閱鏈的啟動在哪個線程執行。這決定了 Observable.Create 內的程式碼、或事件的註冊在哪個線程發生。

Observable.Create<int>(observer =>
{
// 這段程式碼在 SubscribeOn 指定的線程執行
var value = ReadPlcRegister(); // 阻塞式 I/O
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
})
.SubscribeOn(TaskPoolScheduler.Default) // 訂閱在背景線程
.ObserveOn(DispatcherScheduler.Current) // 通知切到 UI 線程
.Subscribe(value => DisplayValue.Text = value.ToString());

關鍵差異一覽

項目SubscribeOnObserveOn
影響什麼訂閱(啟動)的線程通知(OnNext/OnError/OnCompleted)的線程
放在哪裡都一樣?,位置不影響效果,只影響它之後的 operator
常用場景將阻塞式 I/O 移到背景將結果切回 UI 線程
使用頻率較少(大多非同步操作已在背景)較多(幾乎每個 UI 綁定都需要)

圖解

SubscribeOn(背景線程):
Subscribe() ─────────────────────────────── 背景線程


Create/事件註冊 ─────────────────────────── 背景線程


OnNext → Where → Select ─────────────────── 背景線程


ObserveOn(UI 線程) ──────────────────── 切換點


Subscribe 回呼 ──────────────────────────── UI 線程

最佳實踐

1. 只在最後一步切 UI 線程

ObserveOn(DispatcherScheduler.Current) 放在 Subscribe 之前,讓所有運算都在背景完成:

// ✅ 好:運算在背景,只有更新 UI 在 UI 線程
deviceStream
.Where(d => d.IsValid)
.Select(d => FormatReading(d))
.Buffer(TimeSpan.FromSeconds(1))
.ObserveOn(DispatcherScheduler.Current) // 最後才切
.Subscribe(batch => UpdateChart(batch));

// ❌ 差:太早切到 UI 線程,運算阻塞 UI
deviceStream
.ObserveOn(DispatcherScheduler.Current) // 太早!
.Where(d => d.IsValid) // 在 UI 線程過濾
.Select(d => FormatReading(d)) // 在 UI 線程轉換
.Buffer(TimeSpan.FromSeconds(1)) // 在 UI 線程緩衝
.Subscribe(batch => UpdateChart(batch));

2. 明確傳入 Scheduler 參數

讓時間相關的 operator 接受 Scheduler 參數,便於單元測試替換為 TestScheduler:

// ✅ 可測試:Scheduler 由外部傳入
public IObservable<DeviceReading> CreatePollingStream(
IScheduler scheduler = null)
{
scheduler ??= TaskPoolScheduler.Default;
return Observable.Interval(TimeSpan.FromMilliseconds(500), scheduler)
.SelectMany(_ => ReadDeviceAsync());
}

// 測試時
var testScheduler = new TestScheduler();
var stream = CreatePollingStream(testScheduler);

3. 長時間獨佔操作用 EventLoopScheduler

對於需要持續獨佔一個線程的操作(如持續的 serial port 讀取),使用 EventLoopScheduler 而非 NewThreadScheduler

// 專用線程持續讀取設備,保證順序
var deviceScheduler = new EventLoopScheduler();

serialPortStream
.SubscribeOn(deviceScheduler)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(data => UpdateUI(data));

// 記得在結束時 Dispose
deviceScheduler.Dispose();

延伸閱讀