From 60a25cd3359e8f54965b120cf94937ad5de917e9 Mon Sep 17 00:00:00 2001 From: Wout Stiens <71498452+StiensWout@users.noreply.github.com> Date: Mon, 22 Jun 2026 15:54:53 +0000 Subject: [PATCH] fix: retry shell subscription failures Co-authored-by: Codex --- .../src/state/shell-sync.test.ts | 104 +++++++++++++----- packages/client-runtime/src/state/shell.ts | 1 + 2 files changed, 80 insertions(+), 25 deletions(-) diff --git a/packages/client-runtime/src/state/shell-sync.test.ts b/packages/client-runtime/src/state/shell-sync.test.ts index 2eab7214225..ccbca983094 100644 --- a/packages/client-runtime/src/state/shell-sync.test.ts +++ b/packages/client-runtime/src/state/shell-sync.test.ts @@ -8,8 +8,10 @@ import { describe, expect, it } from "@effect/vitest"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; import * as Queue from "effect/Queue"; +import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; import * as SubscriptionRef from "effect/SubscriptionRef"; +import * as TestClock from "effect/testing/TestClock"; import { AVAILABLE_CONNECTION_STATE, @@ -46,6 +48,37 @@ function session(client: WsRpcProtocolClient): RpcSession.RpcSession { }; } +function makeTestShellState(client: WsRpcProtocolClient) { + return Effect.gen(function* () { + const supervisorState = yield* SubscriptionRef.make(AVAILABLE_CONNECTION_STATE); + const activeSession = yield* SubscriptionRef.make>( + Option.some(session(client)), + ); + const supervisor = EnvironmentSupervisor.EnvironmentSupervisor.of({ + target: TARGET, + state: supervisorState, + session: activeSession, + prepared: yield* SubscriptionRef.make(Option.none()), + connect: Effect.void, + disconnect: Effect.void, + retryNow: Effect.void, + } satisfies EnvironmentSupervisor.EnvironmentSupervisor["Service"]); + const cache = Persistence.EnvironmentCacheStore.of({ + loadShell: () => Effect.succeed(Option.none()), + saveShell: () => Effect.never, + loadThread: () => Effect.succeed(Option.none()), + saveThread: () => Effect.void, + removeThread: () => Effect.void, + clear: () => Effect.void, + }); + const shellState = yield* makeEnvironmentShellState().pipe( + Effect.provideService(EnvironmentSupervisor.EnvironmentSupervisor, supervisor), + Effect.provideService(Persistence.EnvironmentCacheStore, cache), + ); + return { shellState, supervisorState }; + }); +} + describe("environment shell synchronization", () => { it.effect("publishes live state before persistence and preserves it when ready", () => Effect.gen(function* () { @@ -53,31 +86,7 @@ describe("environment shell synchronization", () => { const client = { [ORCHESTRATION_WS_METHODS.subscribeShell]: () => Stream.fromQueue(events), } as unknown as WsRpcProtocolClient; - const supervisorState = yield* SubscriptionRef.make(AVAILABLE_CONNECTION_STATE); - const activeSession = yield* SubscriptionRef.make>( - Option.some(session(client)), - ); - const supervisor = EnvironmentSupervisor.EnvironmentSupervisor.of({ - target: TARGET, - state: supervisorState, - session: activeSession, - prepared: yield* SubscriptionRef.make(Option.none()), - connect: Effect.void, - disconnect: Effect.void, - retryNow: Effect.void, - } satisfies EnvironmentSupervisor.EnvironmentSupervisor["Service"]); - const cache = Persistence.EnvironmentCacheStore.of({ - loadShell: () => Effect.succeed(Option.none()), - saveShell: () => Effect.never, - loadThread: () => Effect.succeed(Option.none()), - saveThread: () => Effect.void, - removeThread: () => Effect.void, - clear: () => Effect.void, - }); - const shellState = yield* makeEnvironmentShellState().pipe( - Effect.provideService(EnvironmentSupervisor.EnvironmentSupervisor, supervisor), - Effect.provideService(Persistence.EnvironmentCacheStore, cache), - ); + const { shellState, supervisorState } = yield* makeTestShellState(client); yield* SubscriptionRef.set(supervisorState, { desired: true, @@ -117,4 +126,49 @@ describe("environment shell synchronization", () => { expect(Option.getOrThrow(state.snapshot)).toEqual(LIVE_SHELL_SNAPSHOT); }), ); + + it.effect("retries expected subscription failures and accepts the next live snapshot", () => + Effect.gen(function* () { + const events = yield* Queue.unbounded(); + const attempts = yield* Ref.make(0); + const client = { + [ORCHESTRATION_WS_METHODS.subscribeShell]: () => + Stream.unwrap( + Ref.getAndUpdate(attempts, (count) => count + 1).pipe( + Effect.map((attempt) => + attempt === 0 + ? Stream.fail(new Error("transient shell failure")) + : Stream.fromQueue(events), + ), + ), + ), + } as unknown as WsRpcProtocolClient; + const { shellState } = yield* makeTestShellState(client); + + yield* SubscriptionRef.changes(shellState).pipe( + Stream.filter((state) => Option.isSome(state.error)), + Stream.runHead, + ); + const failedState = yield* SubscriptionRef.get(shellState); + expect(failedState.status).toBe("empty"); + expect(Option.getOrThrow(failedState.error)).toBe("Could not synchronize environment data."); + expect(yield* Ref.get(attempts)).toBe(1); + + yield* TestClock.adjust("250 millis"); + yield* Queue.offer(events, { + kind: "snapshot", + snapshot: LIVE_SHELL_SNAPSHOT, + }); + yield* SubscriptionRef.changes(shellState).pipe( + Stream.filter((state) => state.status === "live"), + Stream.runHead, + ); + + const liveState = yield* SubscriptionRef.get(shellState); + expect(yield* Ref.get(attempts)).toBe(2); + expect(liveState.status).toBe("live"); + expect(liveState.error).toEqual(Option.none()); + expect(Option.getOrThrow(liveState.snapshot)).toEqual(LIVE_SHELL_SNAPSHOT); + }), + ); }); diff --git a/packages/client-runtime/src/state/shell.ts b/packages/client-runtime/src/state/shell.ts index 2b0ba6346f5..fcb37fa51a7 100644 --- a/packages/client-runtime/src/state/shell.ts +++ b/packages/client-runtime/src/state/shell.ts @@ -152,6 +152,7 @@ export const makeEnvironmentShellState = Effect.fn("EnvironmentShellState.make") {}, { onExpectedFailure: (cause) => setStreamError(Cause.squash(cause)), + retryExpectedFailureAfter: "250 millis", }, ).pipe(Stream.runForEach(applyItem), Effect.forkScoped); yield* SubscriptionRef.changes(supervisor.state).pipe(