跳至主要内容

錯誤處理

Rx.NET 的錯誤處理模型與傳統 try/catch 不同。理解 OnError 的終止語意是避免踩坑的關鍵。


OnError 的終止語意

在 Rx 中,OnError 是一個終止信號。一旦 Observable 發出 OnError:

  1. 該 Observable 永久結束,不會再發出任何 OnNext
  2. 所有下游 operator 和 subscriber 都會收到錯誤
  3. 訂閱自動取消
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnError(new Exception("設備斷線"));
observer.OnNext(3); // 永遠不會被執行
return Disposable.Empty;
});

source.Subscribe(
x => Console.WriteLine($"收到: {x}"),
ex => Console.WriteLine($"錯誤: {ex.Message}"),
() => Console.WriteLine("完成") // 不會被呼叫
);

// 輸出:
// 收到: 1
// 收到: 2
// 錯誤: 設備斷線
注意

如果 Subscribe 時沒有提供 onError 處理器,未處理的 OnError 會拋出例外。在工業場景中,這可能導致整個應用程式崩潰。永遠提供 onError 處理器


錯誤傳播流程

下圖展示一條 Rx pipeline 從正常值流 → Source 拋出 OnError → 三種 recovery operator (Catch / Retry / OnErrorResumeNext) 的不同走向。理解這張圖能讓你決定「該在 pipeline 的哪個位置插哪一個 operator」:

讀圖重點:

Recovery operator觸發條件recover 動作對 Observer 而言
Catch<TException>(handler)接到指定型別的 OnError切換到 handler 回傳的新 Observable連續看到值切換來源;可能 OnCompleted 終止
Retry(N) / Retry()任意 OnError重新 subscribe 同一個 Source可能再次收到從頭開始的 OnNext 序列;超過 N 次後 OnError 透傳
RetryWhen(errors => signal)任意 OnErrorsignal 流發出值才重新 subscribe同上,但兩次 retry 之間可插入延遲 / Jitter / 條件
OnErrorResumeNext(next)任意 OnErrorOnCompleted接續訂閱下一個 Observable連續看到值,與來源切換無感

關鍵不變式:

  • OnError 是終止信號 → 一旦傳到某個 operator,該 operator 不會再收到原 Source 的 OnNext。recovery 必須切換到「新的訂閱對象」(fallback / 自己重訂 / 下一個)才能讓流繼續。
  • 位置決定保護範圍 → recovery operator 放在哪一個 operator 後面,就只能保護它上游的錯誤。例如 Source.Select(...).Catch(...).Where(...) 中,Where 拋出的錯誤不會Catch 吃掉。
  • 疊加多種策略 → 常見模式是 Source.Timeout(...).Retry(N).Catch(_ => Observable.Empty<T>()):先用 Timeout 把卡住的訂閱轉成 OnError,Retry 嘗試 N 次,仍失敗時 Catch 用空流結束。

Catch — 替換錯誤流

Catch 在發生錯誤時,用另一個 Observable 替換。原流結束,切換到替代流。

// 主設備讀取失敗,切換到備用設備
primaryDevice.GetDataStream()
.Catch<SensorData, CommunicationException>(ex =>
{
Logger.Warn($"主設備斷線: {ex.Message},切換到備用設備");
return backupDevice.GetDataStream();
})
.Subscribe(data => ProcessData(data));

Catch 也可以用 Observable.Empty 吞掉錯誤,讓流正常完成:

// 忽略特定類型的錯誤
dataStream
.Catch<int, TimeoutException>(_ => Observable.Empty<int>())
.Subscribe(x => Console.WriteLine(x));

OnErrorResumeNext

不管前一個 Observable 是正常完成還是錯誤,都接續下一個 Observable。

// 依序嘗試三個資料來源
Observable.OnErrorResumeNext(
source1,
source2,
source3
).Subscribe(data => ProcessData(data));

Retry — 自動重試

基本 Retry

錯誤發生時重新訂閱 Observable。

// 無限重試
deviceStream
.Retry()
.Subscribe(data => ProcessData(data));

// 最多重試 3 次
deviceStream
.Retry(3)
.Subscribe(
data => ProcessData(data),
ex => Logger.Error($"重試 3 次仍失敗: {ex.Message}"));
注意

Retry() 不加參數會無限重試。如果錯誤是永久性的(例如設定錯誤),會造成無窮迴圈。務必設定上限或搭配延遲。

RetryWhen — 帶條件的重試

RetryWhen 讓你控制重試的時機和策略。它接收一個錯誤流,你回傳一個信號流來觸發重試。

// 每次錯誤後等 2 秒再重試
deviceStream
.RetryWhen(errors => errors
.Do(ex => Logger.Warn($"通訊錯誤: {ex.Message},2 秒後重試"))
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(2))))
.Subscribe(data => ProcessData(data));

Timeout — 逾時控制

如果 Observable 在指定時間內沒有發出新值,就拋出 TimeoutException

// 設備必須在 5 秒內回應
deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.Subscribe(
data => ProcessData(data),
ex =>
{
if (ex is TimeoutException)
Logger.Error("設備回應逾時");
});

也可以指定逾時後切換到備用流:

// 逾時後切換到快取資料
deviceStream
.Timeout(TimeSpan.FromSeconds(5), cachedDataStream)
.Subscribe(data => ProcessData(data));

組合技:工業場景的斷線重連

在工業自動化中,設備通訊不穩定是常態。以下是完整的斷線重連 pattern:

基本重連(固定延遲)

deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.RetryWhen(errors => errors
.Do(ex => Logger.Warn($"連線失敗: {ex.Message}"))
.Delay(TimeSpan.FromSeconds(2)))
.Subscribe(data => ProcessData(data));

指數退避重連

錯誤越頻繁,等待時間越長,避免在設備持續離線時造成過多連線嘗試:

deviceStream
.Timeout(TimeSpan.FromSeconds(5))
.RetryWhen(errors => errors
.Select((error, retryCount) => (error, retryCount))
.SelectMany(t =>
{
var delay = TimeSpan.FromSeconds(
Math.Min(Math.Pow(2, t.retryCount), 60)); // 上限 60 秒
Logger.Warn($"第 {t.retryCount + 1} 次重連,{delay.TotalSeconds}s 後重試");
return Observable.Timer(delay);
}))
.Subscribe(data => ProcessData(data));

完整的通訊管道

結合多個錯誤處理策略:

Observable.Defer(() => ConnectToDeviceAsync().ToObservable())
.SelectMany(connection => connection.GetDataStream())
// 5 秒沒收到資料視為斷線
.Timeout(TimeSpan.FromSeconds(5))
// 指數退避重連
.RetryWhen(errors => errors
.Select((ex, i) => (ex, attempt: i))
.SelectMany(t =>
{
var delay = TimeSpan.FromSeconds(Math.Min(Math.Pow(2, t.attempt), 60));
Logger.Warn($"通訊異常 [{t.ex.GetType().Name}],第 {t.attempt + 1} 次重連({delay.TotalSeconds}s)");
return Observable.Timer(delay);
}))
// 連線狀態通知
.Do(
_ => ConnectionStatus.OnNext(true),
ex => ConnectionStatus.OnNext(false))
.Subscribe(
data => ProcessData(data),
ex => Logger.Error($"通訊永久失敗: {ex.Message}"));

錯誤處理決策樹

發生錯誤
├── 可以忽略? → Catch + Observable.Empty
├── 有備用來源? → Catch + 替代 Observable
├── 應該重試?
│ ├── 簡單重試 → Retry(n)
│ ├── 延遲重試 → RetryWhen + Delay
│ └── 指數退避 → RetryWhen + 遞增 Timer
├── 是逾時問題? → Timeout + Catch/Retry
└── 無法處理? → 讓 OnError 傳到 Subscribe 處理

延伸閱讀