錯誤處理
Rx.NET 的錯誤處理模型與傳統 try/catch 不同。理解 OnError 的終止語意是避免踩坑的關鍵。
OnError 的終止語意
在 Rx 中,OnError 是一個終止信號。一旦 Observable 發出 OnError:
- 該 Observable 永久結束,不會再發出任何 OnNext
- 所有下游 operator 和 subscriber 都會收到錯誤
- 訂閱自動取消
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnError(new Exception("設備斷線"));
observer.OnNext(3); // 永遠不會被執行
return Disposable.Empty;
});
source.Subscribe(
x => Console.WriteLine($"收到: {x}"),
ex => Console.WriteLine($"錯誤: {ex.Message}"),
() => Console.WriteLine("完成") // 不會被呼叫
);
// 輸出:
// 收到: 1
// 收到: 2
// 錯誤: 設備斷線
如果 Subscribe 時沒有提供 onError 處理器,未處理的 OnError 會拋出例外。在工業場景中,這可能導致整個應用程式崩潰。永遠提供 onError 處理器。
錯誤傳播流程
下圖展示一條 Rx pipeline 從正常值流 → Source 拋出 OnError → 三種 recovery operator (Catch / Retry / OnErrorResumeNext) 的不同走向。理解這張圖能讓你決定「該在 pipeline 的哪個位置插哪一個 operator」:
讀圖重點:
| Recovery operator | 觸發條件 | recover 動作 | 對 Observer 而言 |
|---|---|---|---|
Catch<TException>(handler) | 接到指定型別的 OnError | 切換到 handler 回傳的新 Observable | 連續看到值切換來源;可能 OnCompleted 終止 |
Retry(N) / Retry() | 任意 OnError | 重新 subscribe 同一個 Source | 可能再次收到從頭開始的 OnNext 序列;超過 N 次後 OnError 透傳 |
RetryWhen(errors => signal) | 任意 OnError | 等 signal 流發出值才重新 subscribe | 同上,但兩次 retry 之間可插入延遲 / Jitter / 條件 |
OnErrorResumeNext(next) | 任意 OnError 或 OnCompleted | 接續訂閱下一個 Observable | 連續看到值,與來源切換無感 |
關鍵不變式:
OnError是終止信號 → 一旦傳到某個 operator,該 operator 不會再收到原 Source 的 OnNext。recovery 必須切換到「新的訂閱對象」(fallback / 自己重訂 / 下一個)才能讓流繼續。- 位置決定保護範圍 → recovery operator 放在哪一個 operator 後面,就只能保護它上游的錯誤。例如
Source.Select(...).Catch(...).Where(...)中,Where拋出的錯誤不會被Catch吃掉。 - 疊加多種策略 → 常見模式是
Source.Timeout(...).Retry(N).Catch(_ => Observable.Empty<T>()):先用 Timeout 把卡住的訂閱轉成 OnError,Retry 嘗試 N 次,仍失敗時 Catch 用空流結束。
Catch — 替換錯誤流
Catch 在發生錯誤時,用另一個 Observable 替換。原流結束,切換到替代流。
// 主設備讀取失敗,切換到備用設備
primaryDevice.GetDataStream()
.Catch<SensorData, CommunicationException>(ex =>
{
Logger.Warn($"主設備斷線: {ex.Message},切換到備用設備");
return backupDevice.GetDataStream();
})
.Subscribe(data => ProcessData(data));
Catch 也可以用 Observable.Empty 吞掉錯誤,讓流正常完成:
// 忽略特定類型的錯誤
dataStream
.Catch<int, TimeoutException>(_ => Observable.Empty<int>())
.Subscribe(x => Console.WriteLine(x));
OnErrorResumeNext
不管前一個 Observable 是正常完成還是錯誤,都接續下一個 Observable。
// 依序嘗試三個資料來源
Observable.OnErrorResumeNext(
source1,
source2,
source3
).Subscribe(data => ProcessData(data));
Retry — 自動重試
基本 Retry
錯誤發生時重新訂閱 Observable。
// 無限重試
deviceStream
.Retry()
.Subscribe(data => ProcessData(data));
// 最多重試 3 次
deviceStream
.Retry(3)
.Subscribe(
data => ProcessData(data),
ex => Logger.Error($"重試 3 次仍失敗: {ex.Message}"));
Retry() 不加參數會無限重試。如果錯誤是永久性的(例如設定錯誤),會造成無窮迴圈。務必設定上限或搭配延遲。
RetryWhen — 帶條件的重試
RetryWhen 讓你控制重試的時機和策略。它接收一個錯誤流,你回傳一個信號流來觸發重試。
// 每次錯誤後等 2 秒再重試
deviceStream
.RetryWhen(errors => errors
.Do(ex => Logger.Warn($"通訊錯誤: {ex.Message},2 秒後重試"))
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(2))))
.Subscribe(data => ProcessData(data));
Timeout — 逾時控制
如果 Observable 在指定時間內沒有發出新值,就拋出 TimeoutException。
// 設備必須在 5 秒內回應
deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.Subscribe(
data => ProcessData(data),
ex =>
{
if (ex is TimeoutException)
Logger.Error("設備回應逾時");
});
也可以指定逾時後切換到備用流:
// 逾時後切換到快取資料
deviceStream
.Timeout(TimeSpan.FromSeconds(5), cachedDataStream)
.Subscribe(data => ProcessData(data));
組合技:工業場景的斷線重連
在工業自動化中,設備通訊不穩定是常態。以下是完整的斷線重連 pattern:
基本重連(固定延遲)
deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.RetryWhen(errors => errors
.Do(ex => Logger.Warn($"連線失敗: {ex.Message}"))
.Delay(TimeSpan.FromSeconds(2)))
.Subscribe(data => ProcessData(data));
指數退避重連
錯誤越頻繁,等待時間越長,避免在設備持續離線時造成過多連線嘗試:
deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.RetryWhen(errors => errors
.Select((error, retryCount) => (error, retryCount))
.SelectMany(t =>
{
var delay = TimeSpan.FromSeconds(
Math.Min(Math.Pow(2, t.retryCount), 60)); // 上限 60 秒
Logger.Warn($"第 {t.retryCount + 1} 次重連,{delay.TotalSeconds}s 後重試");
return Observable.Timer(delay);
}))
.Subscribe(data => ProcessData(data));
完整的通訊管道
結合多個錯誤處理策略:
Observable.Defer(() => ConnectToDeviceAsync().ToObservable())
.SelectMany(connection => connection.GetDataStream())
// 5 秒沒收到資料視為斷線
.Timeout(TimeSpan.FromSeconds(5))
// 指數退避重連
.RetryWhen(errors => errors
.Select((ex, i) => (ex, attempt: i))
.SelectMany(t =>
{
var delay = TimeSpan.FromSeconds(Math.Min(Math.Pow(2, t.attempt), 60));
Logger.Warn($"通訊異常 [{t.ex.GetType().Name}],第 {t.attempt + 1} 次重連({delay.TotalSeconds}s)");
return Observable.Timer(delay);
}))
// 連線狀態通知
.Do(
_ => ConnectionStatus.OnNext(true),
ex => ConnectionStatus.OnNext(false))
.Subscribe(
data => ProcessData(data),
ex => Logger.Error($"通訊永久失敗: {ex.Message}"));
錯誤處理決策樹
發生錯誤
├── 可以忽略? → Catch + Observable.Empty
├── 有備用來源? → Catch + 替代 Observable
├── 應該重試?
│ ├── 簡單重試 → Retry(n)
│ ├── 延遲重試 → RetryWhen + Delay
│ └── 指數退避 → RetryWhen + 遞增 Timer
├── 是逾時問題? → Timeout + Catch/Retry
└── 無法處理? → 讓 OnError 傳到 Subscribe 處理
延伸閱讀:
- 常用 Operators — 了解 SelectMany、Delay 等搭配用法
- 工業場景 Pattern — 更多斷線重連實戰範例
- 最佳實踐 — Subscribe 一定要提供 onError