Skip to content
Draft
6 changes: 6 additions & 0 deletions .server-changes/mollifier-reads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints.
184 changes: 181 additions & 3 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import {
logger,
} from "@trigger.dev/core/v3";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";
Expand Down Expand Up @@ -64,13 +69,34 @@ type CommonRelatedRun = Prisma.Result<
"findFirstOrThrow"
>;

type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
// Full shape returned by findRun() — the commonRunSelect fields plus the
// extras the route handler reads. Declared explicitly (not inferred via
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
// run without the type becoming self-referential.
type FoundRun = CommonRelatedRun & {
traceId: string;
payload: string;
payloadType: string;
output: string | null;
outputType: string;
error: Prisma.JsonValue;
attempts: { id: string }[];
attemptNumber: number | null;
engine: "V1" | "V2";
taskEventStore: string;
parentTaskRun: CommonRelatedRun | null;
rootTaskRun: CommonRelatedRun | null;
childRuns: CommonRelatedRun[];
};

export class ApiRetrieveRunPresenter {
constructor(private readonly apiVersion: API_VERSIONS) {}

public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
return $replica.taskRun.findFirst({
public static async findRun(
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: env.id,
Expand Down Expand Up @@ -102,6 +128,23 @@ export class ApiRetrieveRunPresenter {
},
},
});

if (pgRow) return pgRow;

// Postgres miss → fall back to the mollifier buffer. When the gate
// diverted a trigger, the run lives in Redis until the drainer replays
// it through engine.trigger. Synthesise the FoundRun shape so call()
// returns a `QUEUED` (or `FAILED`) response with empty output, no
// attempts, no relations.
const buffered = await findRunByIdWithMollifierFallback({
runId: friendlyId,
environmentId: env.id,
organizationId: env.organizationId,
});

if (!buffered) return null;

return synthesiseFoundRunFromBuffer(buffered);
}

public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
Expand Down Expand Up @@ -475,3 +518,138 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
}
}

// Build a FoundRun-shaped object from a buffered (mollified) run. The run
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
// yet, so every field that comes from execution state (output, attempts,
// completedAt, cost, relations) takes a default. The presenter's call()
// handles QUEUED-state runs without surprise.
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
switch (status) {
case "FAILED":
return "SYSTEM_FAILURE";
case "CANCELED":
return "CANCELED";
default:
return "PENDING";
}
}

// The PG path stores `TaskRun.payload` as `String?`, so in production
// the buffered snapshot's `payload` is always a string. We defensively
// coerce other types instead of silently dropping them: an object gets
// JSON-stringified (matches how the trigger path would serialise it),
// anything truly unrenderable falls back to an empty string. The log
// line surfaces format drift to ops without crashing the read path.
function synthesisePayload(buffered: SyntheticRun): string {
const payload = buffered.payload;
if (typeof payload === "string") return payload;
if (payload === undefined || payload === null) return "";
try {
const serialised = JSON.stringify(payload);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return typeof serialised === "string" ? serialised : "";
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return "";
}
}

// Mirror synthesisePayload for metadata. The PG path stores
// `TaskRun.metadata` as `String?`, and the snapshot writes it from
// `metadataPacket.data` (also a string), so in production it is always a
// string or absent. We coerce defensively — an object gets JSON-stringified
// (matching how the trigger path serialises it) rather than silently
// dropped to null, and the log line surfaces format drift to ops.
function synthesiseMetadata(buffered: SyntheticRun): string | null {
const metadata = buffered.metadata;
if (typeof metadata === "string") return metadata;
if (metadata === undefined || metadata === null) return null;
try {
const serialised = JSON.stringify(metadata);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return typeof serialised === "string" ? serialised : null;
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return null;
}
}

function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);

const errorJson: Prisma.JsonValue = buffered.error
? {
type: "STRING_ERROR",
raw: `${buffered.error.code}: ${buffered.error.message}`,
}
: null;

const metadata: string | null = synthesiseMetadata(buffered);

return {
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
// is the user-facing `run_xxx` token. Downstream logging keyed off
// `taskRun.id` correlates with other systems via the cuid — using
// the friendlyId here breaks log correlation. `SyntheticRun` carries
// the cuid alongside the friendlyId for exactly this reason
// (RunId.fromFriendlyId in readFallback.server.ts).
id: buffered.id,
friendlyId: buffered.friendlyId,
status,
taskIdentifier: buffered.taskIdentifier ?? "",
createdAt: buffered.createdAt,
startedAt: null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
expiredAt: null,
delayUntil: buffered.delayUntil ?? null,
metadata,
metadataType: buffered.metadataType ?? "application/json",
ttl: buffered.ttl ?? null,
costInCents: 0,
baseCostInCents: 0,
usageDurationMs: 0,
idempotencyKey: buffered.idempotencyKey ?? null,
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
isTest: buffered.isTest,
depth: buffered.depth,
scheduleId: null,
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
// Reconstruct the batch from the snapshot's internal id so a buffered
// run reports the same `batchId` / triggerFunction as it will once
// materialised, and so batch-scoped JWTs authorise against it (the
// route authorization callbacks read `run.batch?.friendlyId`).
batch: buffered.batchId
? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) }
: null,
runTags: buffered.tags,
traceId: buffered.traceId ?? "",
payload: synthesisePayload(buffered),
payloadType: buffered.payloadType ?? "application/json",
output: null,
outputType: "application/json",
error: errorJson,
attempts: [],
attemptNumber: null,
engine: "V2",
taskEventStore: "taskEvent",
workerQueue: buffered.workerQueue ?? "main",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
};
}
82 changes: 72 additions & 10 deletions apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,68 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";

const ParamsSchema = z.object({
runId: z.string(),
spanId: z.string(),
});

// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs
// only have one valid spanId (the queued span recorded at gate time and
// reused as the run's root spanId when the drainer materialises). Any
// other spanId returns a deterministic 404; the queued span returns a
// minimal synthesised shape so the customer's SDK sees the same 200
// contract they'd get for a freshly-triggered run.
type ResolvedRun =
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };

async function findPgRun(runId: string, environmentId: string) {
return $replica.taskRun.findFirst({
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
});
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
findResource: async (params, auth): Promise<ResolvedRun | null> => {
const pgRun = await findPgRun(params.runId, auth.environment.id);
if (pgRun) return { source: "pg", run: pgRun };

const buffered = await findRunByIdWithMollifierFallback({
runId: params.runId,
environmentId: auth.environment.id,
organizationId: auth.environment.organizationId,
});
if (buffered) return { source: "buffer", run: buffered };

return null;
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => {
resource: (resolved) => {
if (resolved.source === "pg") {
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
}
return anyResource(resources);
}
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
...run.tags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
Expand All @@ -44,7 +79,34 @@ export const loader = createLoaderApiRoute(
},
},
},
async ({ params, resource: run, authentication }) => {
async ({ params, resource: resolved, authentication }) => {
if (resolved.source === "buffer") {
// Buffered runs have exactly one valid spanId — the queued span the
// mollifier gate recorded at trigger time, which becomes the run's
// root spanId once the drainer materialises. Any other spanId is a
// deterministic 404. The matching spanId returns a minimal shape
// representing "span exists, no execution data yet."
if (resolved.run.spanId !== params.spanId) {
return json({ error: "Span not found" }, { status: 404 });
}
return json(
{
spanId: resolved.run.spanId,
parentId: resolved.run.parentSpanId ?? null,
runId: resolved.run.friendlyId,
message: resolved.run.taskIdentifier ?? "",
isError: false,
isPartial: resolved.run.status !== "CANCELED",
isCancelled: resolved.run.status === "CANCELED",
level: "TRACE",
startTime: resolved.run.createdAt,
durationMs: 0,
},
{ status: 200 }
);
}

const run = resolved.run;
const eventRepository = await getEventRepositoryForStore(
run.taskEventStore,
authentication.environment.organization.id
Expand Down
Loading