Skip to content

Commit b77c910

Browse files
committed
should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown
1 parent da247bf commit b77c910

File tree

2 files changed

+200
-1
lines changed

2 files changed

+200
-1
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,13 @@ export class RunEngineTriggerTaskService {
364364
} catch (error) {
365365
if (error instanceof RunDuplicateIdempotencyKeyError) {
366366
//retry calling this function, because this time it will return the idempotent run
367-
return await this.call({ taskId, environment, body, options, attempt: attempt + 1 });
367+
return await this.call({
368+
taskId,
369+
environment,
370+
body,
371+
options: { ...options, runFriendlyId },
372+
attempt: attempt + 1,
373+
});
368374
}
369375

370376
if (error instanceof RunOneTimeUseTokenError) {

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,4 +717,197 @@ describe("RunEngineTriggerTaskService", () => {
717717
await engine.quit();
718718
}
719719
);
720+
721+
containerTest(
722+
"should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown",
723+
async ({ prisma, redisOptions }) => {
724+
const engine = new RunEngine({
725+
prisma,
726+
worker: {
727+
redis: redisOptions,
728+
workers: 1,
729+
tasksPerWorker: 10,
730+
pollIntervalMs: 100,
731+
},
732+
queue: {
733+
redis: redisOptions,
734+
},
735+
runLock: {
736+
redis: redisOptions,
737+
},
738+
machines: {
739+
defaultMachine: "small-1x",
740+
machines: {
741+
"small-1x": {
742+
name: "small-1x" as const,
743+
cpu: 0.5,
744+
memory: 0.5,
745+
centsPerMs: 0.0001,
746+
},
747+
},
748+
baseCostInCents: 0.0005,
749+
},
750+
tracer: trace.getTracer("test", "0.0.0"),
751+
logLevel: "debug",
752+
});
753+
754+
const parentTask = "parent-task";
755+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
756+
const taskIdentifier = "test-task";
757+
758+
// Create background worker
759+
await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, taskIdentifier]);
760+
761+
// Create parent runs and start their attempts (required for resumeParentOnCompletion)
762+
const parentRun1 = await engine.trigger(
763+
{
764+
number: 1,
765+
friendlyId: "run_p1",
766+
environment: authenticatedEnvironment,
767+
taskIdentifier: parentTask,
768+
payload: "{}",
769+
payloadType: "application/json",
770+
context: {},
771+
traceContext: {},
772+
traceId: "t12345",
773+
spanId: "s12345",
774+
queue: `task/${parentTask}`,
775+
isTest: false,
776+
tags: [],
777+
workerQueue: "main",
778+
},
779+
prisma
780+
);
781+
782+
await setTimeout(500);
783+
const dequeued = await engine.dequeueFromWorkerQueue({
784+
consumerId: "test_12345",
785+
workerQueue: "main",
786+
});
787+
await engine.startRunAttempt({
788+
runId: parentRun1.id,
789+
snapshotId: dequeued[0].snapshot.id,
790+
});
791+
792+
const parentRun2 = await engine.trigger(
793+
{
794+
number: 2,
795+
friendlyId: "run_p2",
796+
environment: authenticatedEnvironment,
797+
taskIdentifier: parentTask,
798+
payload: "{}",
799+
payloadType: "application/json",
800+
context: {},
801+
traceContext: {},
802+
traceId: "t12346",
803+
spanId: "s12346",
804+
queue: `task/${parentTask}`,
805+
isTest: false,
806+
tags: [],
807+
workerQueue: "main",
808+
},
809+
prisma
810+
);
811+
812+
await setTimeout(500);
813+
const dequeued2 = await engine.dequeueFromWorkerQueue({
814+
consumerId: "test_12345",
815+
workerQueue: "main",
816+
});
817+
await engine.startRunAttempt({
818+
runId: parentRun2.id,
819+
snapshotId: dequeued2[0].snapshot.id,
820+
});
821+
822+
const queuesManager = new DefaultQueueManager(prisma, engine);
823+
const idempotencyKeyConcern = new IdempotencyKeyConcern(
824+
prisma,
825+
engine,
826+
new MockTraceEventConcern()
827+
);
828+
829+
const triggerRacepointSystem = new MockTriggerRacepointSystem();
830+
831+
// Track all friendlyIds passed to the payload processor
832+
const processedFriendlyIds: string[] = [];
833+
class TrackingPayloadProcessor implements PayloadProcessor {
834+
async process(request: TriggerTaskRequest): Promise<IOPacket> {
835+
processedFriendlyIds.push(request.friendlyId);
836+
return {
837+
data: JSON.stringify(request.body.payload),
838+
dataType: "application/json",
839+
};
840+
}
841+
}
842+
843+
const triggerTaskService = new RunEngineTriggerTaskService({
844+
engine,
845+
prisma,
846+
payloadProcessor: new TrackingPayloadProcessor(),
847+
queueConcern: queuesManager,
848+
idempotencyKeyConcern,
849+
validator: new MockTriggerTaskValidator(),
850+
traceEventConcern: new MockTraceEventConcern(),
851+
tracer: trace.getTracer("test", "0.0.0"),
852+
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
853+
triggerRacepointSystem,
854+
});
855+
856+
const idempotencyKey = "test-preserve-friendly-id";
857+
const racepoint = triggerRacepointSystem.registerRacepoint("idempotencyKey", idempotencyKey);
858+
859+
// Trigger two concurrent requests with same idempotency key
860+
// One will succeed, one will fail with RunDuplicateIdempotencyKeyError and retry
861+
const childTriggerPromise1 = triggerTaskService.call({
862+
taskId: taskIdentifier,
863+
environment: authenticatedEnvironment,
864+
body: {
865+
payload: { test: "test1" },
866+
options: {
867+
idempotencyKey,
868+
parentRunId: parentRun1.friendlyId,
869+
resumeParentOnCompletion: true,
870+
},
871+
},
872+
});
873+
874+
const childTriggerPromise2 = triggerTaskService.call({
875+
taskId: taskIdentifier,
876+
environment: authenticatedEnvironment,
877+
body: {
878+
payload: { test: "test2" },
879+
options: {
880+
idempotencyKey,
881+
parentRunId: parentRun2.friendlyId,
882+
resumeParentOnCompletion: true,
883+
},
884+
},
885+
});
886+
887+
await setTimeout(500);
888+
889+
// Resolve the racepoint to allow both requests to proceed
890+
racepoint.resolve();
891+
892+
const result1 = await childTriggerPromise1;
893+
const result2 = await childTriggerPromise2;
894+
895+
// Both should return the same run (one created, one cached)
896+
expect(result1).toBeDefined();
897+
expect(result2).toBeDefined();
898+
expect(result1?.run.friendlyId).toBe(result2?.run.friendlyId);
899+
900+
// The key assertion: When a retry happens due to RunDuplicateIdempotencyKeyError,
901+
// the same friendlyId should be used. We expect exactly 2 calls to payloadProcessor
902+
// (one for each concurrent request), not 3 (which would indicate a new friendlyId on retry)
903+
// Since the retry returns early from the idempotency cache, payloadProcessor is not called again.
904+
expect(processedFriendlyIds.length).toBe(2);
905+
906+
// Verify that we have exactly 2 unique friendlyIds (one per original request)
907+
const uniqueFriendlyIds = new Set(processedFriendlyIds);
908+
expect(uniqueFriendlyIds.size).toBe(2);
909+
910+
await engine.quit();
911+
}
912+
);
720913
});

0 commit comments

Comments
 (0)