跳至主要内容

測試 Rx 程式碼

Rx 程式碼涉及時間(IntervalTimeoutDelay),如果用真實時間跑測試,測一個 5 秒 Timeout 就要等 5 秒。TestScheduler 提供虛擬時間控制,讓你在毫秒內驗證數分鐘甚至數小時的邏輯。


安裝測試套件

dotnet add package Microsoft.Reactive.Testing

常用 namespace:

using Microsoft.Reactive.Testing;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

TestScheduler 概念

TestScheduler 是一個可以手動控制時間的 Scheduler。時間不會自動前進,只有你明確推進時才會觸發排程的工作。

核心方法:

方法說明
AdvanceTo(ticks)將虛擬時間推進到指定刻度
AdvanceBy(ticks)將虛擬時間往前推進指定長度
Start()推進時間直到所有排程工作完成
CreateObserver<T>()建立測試用 Observer,自動記錄所有通知
CreateHotObservable(...)建立 Hot Observable,事件在絕對時間觸發
CreateColdObservable(...)建立 Cold Observable,事件相對於訂閱時間觸發

時間單位:tick(1 tick = 100 奈秒)。TimeSpan.FromSeconds(1).Ticks = 10,000,000。


基本用法:測試 Interval

不需要真的等,虛擬時間瞬間推進:

[TestMethod]
public void Interval_ShouldEmitFiveValues()
{
var scheduler = new TestScheduler();
var results = new List<long>();

Observable.Interval(TimeSpan.FromSeconds(1), scheduler)
.Take(5)
.Subscribe(results.Add);

// 推進到所有工作完成
scheduler.Start();

CollectionAssert.AreEqual(
new long[] { 0, 1, 2, 3, 4 }, results);
}

使用 CreateObserver 記錄通知

CreateObserver<T>() 回傳的 observer 會記錄每個通知的值和時間戳:

[TestMethod]
public void Timer_ShouldEmitAfterDelay()
{
var scheduler = new TestScheduler();
var observer = scheduler.CreateObserver<long>();

Observable.Timer(TimeSpan.FromSeconds(3), scheduler)
.Subscribe(observer);

// 2 秒時還沒有值
scheduler.AdvanceTo(TimeSpan.FromSeconds(2).Ticks);
Assert.AreEqual(0, observer.Messages.Count);

// 3 秒時收到值
scheduler.AdvanceTo(TimeSpan.FromSeconds(3).Ticks);
Assert.AreEqual(1, observer.Messages.Count);
Assert.AreEqual(
NotificationKind.OnNext,
observer.Messages[0].Value.Kind);
}

測試 Hot vs Cold Observable

Cold Observable

事件時間相對於訂閱時間。如果訂閱發生在 t=100,第一個 OnNext 排在 t=10,則實際觸發在 t=110。

[TestMethod]
public void ColdObservable_TimesAreRelativeToSubscription()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, "A"),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, "B"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(3).Ticks));

var observer = scheduler.CreateObserver<string>();

// 在 t=0 訂閱
source.Subscribe(observer);

scheduler.Start();

Assert.AreEqual(3, observer.Messages.Count); // 2 OnNext + 1 OnCompleted
Assert.AreEqual("A", observer.Messages[0].Value.Value);
Assert.AreEqual("B", observer.Messages[1].Value.Value);
}

Hot Observable

事件在絕對時間觸發,錯過就是錯過:

[TestMethod]
public void HotObservable_LateSubscriberMissesEarlyEvents()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateHotObservable(
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, "A"),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, "B"),
ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, "C"));

var observer = scheduler.CreateObserver<string>();

// 在 1.5 秒時才訂閱,錯過 "A"
scheduler.ScheduleAbsolute(
TimeSpan.FromSeconds(1.5).Ticks,
() => source.Subscribe(observer));

scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);

// 只收到 "B" 和 "C"
Assert.AreEqual(2, observer.Messages.Count);
Assert.AreEqual("B", observer.Messages[0].Value.Value);
Assert.AreEqual("C", observer.Messages[1].Value.Value);
}

實戰範例:測試設備輪詢邏輯

驗證輪詢 PatternDistinctUntilChanged 行為,不需要等真實的 500ms:

[TestMethod]
public void Polling_ShouldFilterDuplicateValues()
{
var scheduler = new TestScheduler();
var readIndex = 0;
// 模擬設備讀值:1, 1, 2, 2, 2, 3
var deviceValues = new[] { 1, 1, 2, 2, 2, 3 };

var pollingStream = Observable
.Interval(TimeSpan.FromMilliseconds(500), scheduler)
.Select(_ => deviceValues[Math.Min(readIndex++, deviceValues.Length - 1)])
.DistinctUntilChanged();

var observer = scheduler.CreateObserver<int>();
pollingStream.Subscribe(observer);

// 推進 3 秒(6 次輪詢)
scheduler.AdvanceTo(TimeSpan.FromSeconds(3).Ticks);

// DistinctUntilChanged 過濾掉重複值,只剩 1, 2, 3
var values = observer.Messages
.Where(m => m.Value.Kind == NotificationKind.OnNext)
.Select(m => m.Value.Value)
.ToList();

CollectionAssert.AreEqual(new[] { 1, 2, 3 }, values);
}

實戰範例:測試 Timeout 邏輯

驗證超時會正確觸發:

[TestMethod]
public void Timeout_ShouldFireWhenNoData()
{
var scheduler = new TestScheduler();

// 建立一個 5 秒內不發出任何值的 Observable
var silentSource = scheduler.CreateColdObservable<int>();

var observer = scheduler.CreateObserver<int>();

silentSource
.Timeout(TimeSpan.FromSeconds(5), scheduler)
.Subscribe(observer);

// 推進 6 秒
scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);

// 應該收到 TimeoutException
Assert.AreEqual(1, observer.Messages.Count);
Assert.AreEqual(NotificationKind.OnError, observer.Messages[0].Value.Kind);
Assert.IsInstanceOfType(
observer.Messages[0].Value.Exception,
typeof(TimeoutException));
}

實戰範例:測試 Buffer 批次邏輯

[TestMethod]
public void Buffer_ShouldCollectByTimeWindow()
{
var scheduler = new TestScheduler();
var batches = new List<IList<int>>();

// 每秒發出一個值
Observable.Interval(TimeSpan.FromSeconds(1), scheduler)
.Select(i => (int)i)
.Buffer(TimeSpan.FromSeconds(3), scheduler)
.Subscribe(batch => batches.Add(batch));

// 推進 7 秒
scheduler.AdvanceTo(TimeSpan.FromSeconds(7).Ticks);

// 第一批 (0s-3s): [0, 1, 2]
// 第二批 (3s-6s): [3, 4, 5]
Assert.AreEqual(2, batches.Count);
CollectionAssert.AreEqual(new[] { 0, 1, 2 }, (List<int>)batches[0]);
CollectionAssert.AreEqual(new[] { 3, 4, 5 }, (List<int>)batches[1]);
}

測試最佳實踐

  1. 所有時間相關的 operator 都傳入 Scheduler 參數,讓程式碼可測試(參見 Scheduler 最佳實踐

  2. scheduler.Start() 跑完所有排程,或用 AdvanceTo / AdvanceBy 精確控制時間點

  3. CreateObserver<T>() 記錄結果,它會同時記錄值和時間戳

  4. 測試 OnError 路徑,不要只測 happy path

  5. 避免在測試中用真實 Task.Delay,搭配 TestScheduler 就不需要


延伸閱讀