跳至主要内容

最佳實踐與注意事項

本頁整理使用 Rx.NET 時最常踩的坑和對應的最佳實踐。


1. Subscribe 一定要管理生命週期

每次 Subscribe 都回傳一個 IDisposable。不 Dispose 就等於永遠監聽,造成 Memory Leak。

CompositeDisposable — 批次管理訂閱

public class DeviceMonitorViewModel : IDisposable
{
private readonly CompositeDisposable _disposables = new();

public void Initialize()
{
// 所有訂閱都加入 CompositeDisposable
temperatureStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(temp => Temperature = temp)
.DisposeWith(_disposables); // 擴充方法

pressureStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(p => Pressure = p)
.DisposeWith(_disposables);
}

public void Dispose()
{
// 一次清理所有訂閱
_disposables.Dispose();
}
}

DisposeWith 擴充方法:

public static class DisposableExtensions
{
public static T DisposeWith<T>(this T disposable, CompositeDisposable composite)
where T : IDisposable
{
composite.Add(disposable);
return disposable;
}
}

SerialDisposable — 替換式訂閱

當你需要「切換」訂閱(取消舊的、建立新的):

private readonly SerialDisposable _currentDevice = new();

public void SwitchDevice(IDevice device)
{
// 自動取消前一個訂閱
_currentDevice.Disposable = device.GetDataStream()
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(data => UpdateDisplay(data));
}

2. 避免在 Subscribe 裡做太多事

Subscribe 的回呼應該簡短。複雜的邏輯用 operator 組合來處理。

// ❌ 差:在 Subscribe 裡塞滿邏輯
deviceStream.Subscribe(data =>
{
if (data.Temperature > 80)
{
Logger.Warn($"高溫: {data.Temperature}");
AlarmService.Raise("HighTemp");
}
var formatted = $"{data.Temperature:F1}°C";
TemperatureLabel.Content = formatted;
Chart.AddPoint(data.Temperature);
});

// ✅ 好:用 operator 組合,Subscribe 只做最終動作
var tempStream = deviceStream
.Select(d => d.Temperature)
.Publish()
.RefCount();

// 告警管道
tempStream
.Where(t => t > 80)
.Subscribe(t =>
{
Logger.Warn($"高溫: {t}");
AlarmService.Raise("HighTemp");
})
.DisposeWith(_disposables);

// 顯示管道
tempStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(t => TemperatureLabel.Content = $"{t:F1}°C")
.DisposeWith(_disposables);

// 圖表管道
tempStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(t => Chart.AddPoint(t))
.DisposeWith(_disposables);

3. Hot vs Cold 陷阱

陷阱:多次 Subscribe Cold Observable = 多次執行

// ⚠️ 每次 Subscribe 都會啟動一個獨立的 Interval
var polling = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadDeviceAsync());

polling.Subscribe(x => UpdateUI(x)); // 每秒讀一次
polling.Subscribe(x => LogToFile(x)); // 又每秒讀一次 → 設備被讀兩次!

修正:用 Publish + RefCount 共享

var polling = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadDeviceAsync())
.Publish()
.RefCount(); // 第一個訂閱啟動,最後一個取消時停止

polling.Subscribe(x => UpdateUI(x)); // 共享同一個讀取
polling.Subscribe(x => LogToFile(x)); // 共享同一個讀取

何時需要 Publish + RefCount

情境需要?
多個 subscriber 消費同一個流
只有一個 subscriber不需要
來源本身就是 Hot(Subject、事件)不需要
來源有副作用(讀取設備、API 呼叫)

4. Memory Leak 常見原因

忘記 Dispose

最常見的原因。特別是在 WPF 中,ViewModel 被回收但訂閱還在:

// ❌ 離開頁面後訂閱仍然活著
public class MonitorPage
{
public MonitorPage()
{
deviceStream.Subscribe(data => UpdateUI(data));
// 即使 MonitorPage 被 GC,這個訂閱仍持有對 UpdateUI 的引用
}
}

FromEventPattern 沒搭配 Dispose

FromEventPattern 內部會用 += 註冊事件。如果不 Dispose,事件處理器永遠掛著:

// ✅ 記得 Dispose
var subscription = Observable.FromEventPattern<EventHandler, EventArgs>(
h => device.StatusChanged += h,
h => device.StatusChanged -= h)
.Subscribe(ep => HandleStatusChange(ep.EventArgs));

// 清理時
subscription.Dispose(); // 會呼叫 h => device.StatusChanged -= h

無限 Observable 沒有結束條件

// ⚠️ 永遠不會結束
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => DoSomething(x));

// ✅ 加上結束條件
Observable.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(shutdownSignal) // 收到關閉信號時結束
.Subscribe(x => DoSomething(x));

5. 不要在 Subscribe 裡 await

SubscribeonNext 回呼是 Action<T>,不是 Func<T, Task>。在裡面用 async void 會導致錯誤無法被 OnError 捕捉。

// ❌ 危險:async void,例外會遺失
deviceStream.Subscribe(async data =>
{
await SaveToDbAsync(data); // 如果拋出例外,不會進入 OnError
});

// ✅ 用 SelectMany 處理非同步操作
deviceStream
.SelectMany(data => Observable.FromAsync(() => SaveToDbAsync(data)))
.Subscribe(
_ => { },
ex => Logger.Error($"儲存失敗: {ex.Message}"));

如果需要限制同時執行的非同步操作數量:

// 最多同時 3 個寫入操作
deviceStream
.Select(data => Observable.FromAsync(() => SaveToDbAsync(data)))
.Merge(maxConcurrent: 3)
.Subscribe();

6. Subject 的使用原則

Subject<T> 同時是 IObservable<T>IObserver<T>,方便但容易濫用。

適合使用的場景

  • 作為元件的內部事件匯流排
  • 將非 Rx 的外部輸入橋接到 Observable 管道
  • BehaviorSubject<T> 作為有初始值的狀態持有者
// ✅ 適當使用:橋接外部輸入
private readonly Subject<DeviceCommand> _commands = new();

public void SendCommand(DeviceCommand cmd) => _commands.OnNext(cmd);

public IObservable<DeviceCommand> Commands => _commands.AsObservable();

避免的使用方式

// ❌ 不要把 Subject 當作可變變數來用
var subject = new Subject<int>();

// 在各處散落的 OnNext 呼叫讓資料流變得難以追蹤
void Method1() => subject.OnNext(1);
void Method2() => subject.OnNext(2);
void Method3() => subject.OnNext(3);

Subject 家族

類型特性適用場景
Subject<T>無初始值,不重播事件匯流排
BehaviorSubject<T>有初始值,新訂閱者收到最新值狀態持有(連線狀態、當前設定)
ReplaySubject<T>重播 N 個歷史值給新訂閱者需要歷史紀錄的場景
AsyncSubject<T>只在 OnCompleted 時發出最後一個值單次非同步結果

7. 常見錯誤速查

錯誤原因修正
UI 線程例外沒用 ObserveOn 切回 UI 線程.ObserveOn(DispatcherScheduler.Current)
設備被讀兩次Cold Observable 多次 Subscribe.Publish().RefCount()
Memory Leak忘記 DisposeCompositeDisposable 管理
例外吞掉了Subscribe 沒給 onError永遠提供 onError 處理器
async 例外遺失在 Subscribe 裡 async voidSelectMany + FromAsync
無限重試迴圈Retry() 沒有上限Retry(n)RetryWhen 加延遲
Throttle 沒效果搞混 Throttle 和 SampleThrottle = 靜默期後觸發;Sample = 固定取樣

延伸閱讀