最佳實踐與注意事項
本頁整理使用 Rx.NET 時最常踩的坑和對應的最佳實踐。
1. Subscribe 一定要管理生命週期
每次 Subscribe 都回傳一個 IDisposable。不 Dispose 就等於永遠監聽,造成 Memory Leak。
CompositeDisposable — 批次管理訂閱
public class DeviceMonitorViewModel : IDisposable
{
private readonly CompositeDisposable _disposables = new();
public void Initialize()
{
// 所有訂閱都加入 CompositeDisposable
temperatureStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(temp => Temperature = temp)
.DisposeWith(_disposables); // 擴充方法
pressureStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(p => Pressure = p)
.DisposeWith(_disposables);
}
public void Dispose()
{
// 一次清理所有訂閱
_disposables.Dispose();
}
}
DisposeWith 擴充方法:
public static class DisposableExtensions
{
public static T DisposeWith<T>(this T disposable, CompositeDisposable composite)
where T : IDisposable
{
composite.Add(disposable);
return disposable;
}
}
SerialDisposable — 替換式訂閱
當你需要「切換」訂閱(取消舊的、建立新的):
private readonly SerialDisposable _currentDevice = new();
public void SwitchDevice(IDevice device)
{
// 自動取消前一個訂閱
_currentDevice.Disposable = device.GetDataStream()
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(data => UpdateDisplay(data));
}
2. 避免在 Subscribe 裡做太多事
Subscribe 的回呼應該簡短。複雜的邏輯用 operator 組合來處理。
// ❌ 差:在 Subscribe 裡塞滿邏輯
deviceStream.Subscribe(data =>
{
if (data.Temperature > 80)
{
Logger.Warn($"高溫: {data.Temperature}");
AlarmService.Raise("HighTemp");
}
var formatted = $"{data.Temperature:F1}°C";
TemperatureLabel.Content = formatted;
Chart.AddPoint(data.Temperature);
});
// ✅ 好:用 operator 組合,Subscribe 只做最終動作
var tempStream = deviceStream
.Select(d => d.Temperature)
.Publish()
.RefCount();
// 告警管道
tempStream
.Where(t => t > 80)
.Subscribe(t =>
{
Logger.Warn($"高溫: {t}");
AlarmService.Raise("HighTemp");
})
.DisposeWith(_disposables);
// 顯示管道
tempStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(t => TemperatureLabel.Content = $"{t:F1}°C")
.DisposeWith(_disposables);
// 圖表管道
tempStream
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(t => Chart.AddPoint(t))
.DisposeWith(_disposables);
3. Hot vs Cold 陷阱
陷阱:多次 Subscribe Cold Observable = 多次執行
// ⚠️ 每次 Subscribe 都會啟動一個獨立的 Interval
var polling = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadDeviceAsync());
polling.Subscribe(x => UpdateUI(x)); // 每秒讀一次
polling.Subscribe(x => LogToFile(x)); // 又每秒讀一次 → 設備被讀兩次!
修正:用 Publish + RefCount 共享
var polling = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => ReadDeviceAsync())
.Publish()
.RefCount(); // 第一個訂閱啟動,最後一個取消時停止
polling.Subscribe(x => UpdateUI(x)); // 共享同一個讀取
polling.Subscribe(x => LogToFile(x)); // 共享同一個讀取
何時需要 Publish + RefCount
| 情境 | 需要? |
|---|---|
| 多個 subscriber 消費同一個流 | 是 |
| 只有一個 subscriber | 不需要 |
| 來源本身就是 Hot(Subject、事件) | 不需要 |
| 來源有副作用(讀取設備、API 呼叫) | 是 |
4. Memory Leak 常見原因
忘記 Dispose
最常見的原因。特別是在 WPF 中,ViewModel 被回收但訂閱還在:
// ❌ 離開頁面後訂閱仍然活著
public class MonitorPage
{
public MonitorPage()
{
deviceStream.Subscribe(data => UpdateUI(data));
// 即使 MonitorPage 被 GC,這個訂閱仍持有對 UpdateUI 的引用
}
}
FromEventPattern 沒搭配 Dispose
FromEventPattern 內部會用 += 註冊事件。如果不 Dispose,事件處理器永遠掛著:
// ✅ 記得 Dispose
var subscription = Observable.FromEventPattern<EventHandler, EventArgs>(
h => device.StatusChanged += h,
h => device.StatusChanged -= h)
.Subscribe(ep => HandleStatusChange(ep.EventArgs));
// 清理時
subscription.Dispose(); // 會呼叫 h => device.StatusChanged -= h
無限 Observable 沒有結束條件
// ⚠️ 永遠不會結束
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => DoSomething(x));
// ✅ 加上結束條件
Observable.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(shutdownSignal) // 收到關閉信號時結束
.Subscribe(x => DoSomething(x));
5. 不要在 Subscribe 裡 await
Subscribe 的 onNext 回呼是 Action<T>,不是 Func<T, Task>。在裡面用 async void 會導致錯誤無法被 OnError 捕捉。
// ❌ 危險:async void,例外會遺失
deviceStream.Subscribe(async data =>
{
await SaveToDbAsync(data); // 如果拋出例外,不會進入 OnError
});
// ✅ 用 SelectMany 處理非同步操作
deviceStream
.SelectMany(data => Observable.FromAsync(() => SaveToDbAsync(data)))
.Subscribe(
_ => { },
ex => Logger.Error($"儲存失敗: {ex.Message}"));
如果需要限制同時執行的非同步操作數量:
// 最多同時 3 個寫入操作
deviceStream
.Select(data => Observable.FromAsync(() => SaveToDbAsync(data)))
.Merge(maxConcurrent: 3)
.Subscribe();
6. Subject 的使用原則
Subject<T> 同時是 IObservable<T> 和 IObserver<T>,方便但容易濫用。
適合使用的場景
- 作為元件的內部事件匯流排
- 將非 Rx 的外部輸入橋接到 Observable 管道
BehaviorSubject<T>作為有初始值的狀態持有者
// ✅ 適當使用:橋接外部輸入
private readonly Subject<DeviceCommand> _commands = new();
public void SendCommand(DeviceCommand cmd) => _commands.OnNext(cmd);
public IObservable<DeviceCommand> Commands => _commands.AsObservable();
避免的使用方式
// ❌ 不要把 Subject 當作可變變數來用
var subject = new Subject<int>();
// 在各處散落的 OnNext 呼叫讓資料流變得難以追蹤
void Method1() => subject.OnNext(1);
void Method2() => subject.OnNext(2);
void Method3() => subject.OnNext(3);
Subject 家族
| 類型 | 特性 | 適用場景 |
|---|---|---|
Subject<T> | 無初始值,不重播 | 事件匯流排 |
BehaviorSubject<T> | 有初始值,新訂閱者收到最新值 | 狀態持有(連線狀態、當前設定) |
ReplaySubject<T> | 重播 N 個歷史值給新訂閱者 | 需要歷史紀錄的場景 |
AsyncSubject<T> | 只在 OnCompleted 時發出最後一個值 | 單次非同步結果 |
7. 常見錯誤速查
| 錯誤 | 原因 | 修正 |
|---|---|---|
| UI 線程例外 | 沒用 ObserveOn 切回 UI 線程 | 加 .ObserveOn(DispatcherScheduler.Current) |
| 設備被讀兩次 | Cold Observable 多次 Subscribe | 用 .Publish().RefCount() |
| Memory Leak | 忘記 Dispose | 用 CompositeDisposable 管理 |
| 例外吞掉了 | Subscribe 沒給 onError | 永遠提供 onError 處理器 |
| async 例外遺失 | 在 Subscribe 裡 async void | 用 SelectMany + FromAsync |
| 無限重試迴圈 | Retry() 沒有上限 | 用 Retry(n) 或 RetryWhen 加延遲 |
| Throttle 沒效果 | 搞混 Throttle 和 Sample | Throttle = 靜默期後觸發;Sample = 固定取樣 |
延伸閱讀:
- 概觀 — Cold vs Hot 基本概念
- 錯誤處理 — Retry 策略
- Scheduler 與線程管理 — ObserveOn 放在管道最後
- 工業場景 Pattern — 以上原則的實際應用