Skip to content

Commit 75fdc44

Browse files
committed
Handle large payloads and correct the trace ID propogation to child runs
1 parent b360a5c commit 75fdc44

File tree

14 files changed

+472
-101
lines changed

14 files changed

+472
-101
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ const EnvironmentSchema = z
528528
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
529529
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
530530
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
531+
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().optional(), // Defaults to TASK_PAYLOAD_OFFLOAD_THRESHOLD if not set
531532
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
532533
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
533534
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
@@ -539,7 +540,7 @@ const EnvironmentSchema = z
539540

540541
// 2-phase batch API settings
541542
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
542-
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(512_000), // 512KB max per NDJSON line
543+
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
543544

544545
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
545546
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),

apps/webapp/app/routes/api.v2.batches.$batchId.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ export const loader = createLoaderApiRoute(
3838
updatedAt: batch.updatedAt,
3939
runCount: batch.runCount,
4040
runs: batch.runIds,
41-
// Include error details for PARTIAL_FAILED batches
42-
successfulRunCount: batch.successfulRunCount ?? undefined,
43-
failedRunCount: batch.failedRunCount ?? undefined,
44-
errors:
41+
processingErrors:
4542
batch.errors.length > 0
4643
? batch.errors.map((err) => ({
4744
index: err.index,
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
2+
import { env } from "~/env.server";
3+
import { startActiveSpan } from "~/v3/tracer.server";
4+
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
5+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
6+
import { logger } from "~/services/logger.server";
7+
8+
export type BatchPayloadProcessResult = {
9+
/** The processed payload - either the original or an R2 path */
10+
payload: unknown;
11+
/** The payload type - "application/store" if offloaded to R2 */
12+
payloadType: string;
13+
/** Whether the payload was offloaded to R2 */
14+
wasOffloaded: boolean;
15+
/** Size of the payload in bytes */
16+
size: number;
17+
};
18+
19+
/**
20+
* BatchPayloadProcessor handles payload offloading for batch items.
21+
*
22+
* When a batch item's payload exceeds the configured threshold, it's uploaded
23+
* to object storage (R2) and the payload is replaced with the storage path.
24+
* This aligns with how single task triggers work via DefaultPayloadProcessor.
25+
*
26+
* Path format: batch_{batchId}/item_{index}/payload.json
27+
*/
28+
export class BatchPayloadProcessor {
29+
/**
30+
* Check if object storage is available for payload offloading.
31+
* If not available, large payloads will be stored inline (which may fail for very large payloads).
32+
*/
33+
isObjectStoreAvailable(): boolean {
34+
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
35+
}
36+
37+
/**
38+
* Process a batch item payload, offloading to R2 if it exceeds the threshold.
39+
*
40+
* @param payload - The raw payload from the batch item
41+
* @param payloadType - The payload type (e.g., "application/json")
42+
* @param batchId - The batch ID (internal format)
43+
* @param itemIndex - The item index within the batch
44+
* @param environment - The authenticated environment for R2 path construction
45+
* @returns The processed result with potentially offloaded payload
46+
*/
47+
async process(
48+
payload: unknown,
49+
payloadType: string,
50+
batchId: string,
51+
itemIndex: number,
52+
environment: AuthenticatedEnvironment
53+
): Promise<BatchPayloadProcessResult> {
54+
return startActiveSpan("BatchPayloadProcessor.process()", async (span) => {
55+
span.setAttribute("batchId", batchId);
56+
span.setAttribute("itemIndex", itemIndex);
57+
span.setAttribute("payloadType", payloadType);
58+
59+
// Create the packet for size checking
60+
const packet = this.#createPayloadPacket(payload, payloadType);
61+
62+
if (!packet.data) {
63+
return {
64+
payload,
65+
payloadType,
66+
wasOffloaded: false,
67+
size: 0,
68+
};
69+
}
70+
71+
const threshold = env.BATCH_PAYLOAD_OFFLOAD_THRESHOLD ?? env.TASK_PAYLOAD_OFFLOAD_THRESHOLD;
72+
const { needsOffloading, size } = packetRequiresOffloading(packet, threshold);
73+
74+
span.setAttribute("payloadSize", size);
75+
span.setAttribute("needsOffloading", needsOffloading);
76+
span.setAttribute("threshold", threshold);
77+
78+
if (!needsOffloading) {
79+
return {
80+
payload,
81+
payloadType,
82+
wasOffloaded: false,
83+
size,
84+
};
85+
}
86+
87+
// Check if object store is available
88+
if (!this.isObjectStoreAvailable()) {
89+
logger.warn("Payload exceeds threshold but object store is not available", {
90+
batchId,
91+
itemIndex,
92+
size,
93+
threshold,
94+
});
95+
96+
// Return without offloading - the payload will be stored inline
97+
// This may fail downstream for very large payloads
98+
return {
99+
payload,
100+
payloadType,
101+
wasOffloaded: false,
102+
size,
103+
};
104+
}
105+
106+
// Upload to R2
107+
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;
108+
109+
const [uploadError] = await tryCatch(
110+
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
111+
);
112+
113+
if (uploadError) {
114+
logger.error("Failed to upload batch item payload to object store", {
115+
batchId,
116+
itemIndex,
117+
error: uploadError instanceof Error ? uploadError.message : String(uploadError),
118+
});
119+
120+
// Throw to fail this item - SDK can retry
121+
throw new Error(
122+
`Failed to upload large payload to object store: ${uploadError instanceof Error ? uploadError.message : String(uploadError)}`
123+
);
124+
}
125+
126+
logger.debug("Batch item payload offloaded to R2", {
127+
batchId,
128+
itemIndex,
129+
filename,
130+
size,
131+
});
132+
133+
span.setAttribute("wasOffloaded", true);
134+
span.setAttribute("offloadPath", filename);
135+
136+
return {
137+
payload: filename,
138+
payloadType: "application/store",
139+
wasOffloaded: true,
140+
size,
141+
};
142+
});
143+
}
144+
145+
/**
146+
* Create an IOPacket from payload for size checking.
147+
*/
148+
#createPayloadPacket(payload: unknown, payloadType: string): IOPacket {
149+
if (payloadType === "application/json") {
150+
return { data: JSON.stringify(payload), dataType: "application/json" };
151+
}
152+
153+
if (typeof payload === "string") {
154+
return { data: payload, dataType: payloadType };
155+
}
156+
157+
// For other types, try to stringify
158+
try {
159+
return { data: JSON.stringify(payload), dataType: payloadType };
160+
} catch {
161+
return { dataType: payloadType };
162+
}
163+
}
164+
}
165+

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { prisma, type PrismaClientOrTransaction } from "~/db.server";
99
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1010
import { logger } from "~/services/logger.server";
1111
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
12+
import { BatchPayloadProcessor } from "../concerns/batchPayloads.server";
1213

1314
export type StreamBatchItemsServiceOptions = {
1415
maxItemBytes: number;
@@ -28,8 +29,11 @@ export type StreamBatchItemsServiceOptions = {
2829
* providing backpressure through the async iterator pattern.
2930
*/
3031
export class StreamBatchItemsService extends WithRunEngine {
32+
private readonly payloadProcessor: BatchPayloadProcessor;
33+
3134
constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
3235
super({ prisma });
36+
this.payloadProcessor = new BatchPayloadProcessor();
3337
}
3438

3539
/**
@@ -108,11 +112,23 @@ export class StreamBatchItemsService extends WithRunEngine {
108112
);
109113
}
110114

111-
// Convert to BatchItem format
115+
// Get the original payload type
116+
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";
117+
118+
// Process payload - offload to R2 if it exceeds threshold
119+
const processedPayload = await this.payloadProcessor.process(
120+
item.payload,
121+
originalPayloadType,
122+
batchId,
123+
item.index,
124+
environment
125+
);
126+
127+
// Convert to BatchItem format with potentially offloaded payload
112128
const batchItem: BatchItem = {
113129
task: item.task,
114-
payload: item.payload,
115-
payloadType: (item.options?.payloadType as string) ?? "application/json",
130+
payload: processedPayload.payload,
131+
payloadType: processedPayload.payloadType,
116132
options: item.options,
117133
};
118134

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -160,26 +160,23 @@ function createRunEngine() {
160160
},
161161
// BatchQueue with DRR scheduling for fair batch processing
162162
// Consumers are controlled by options.worker.disabled (same as main worker)
163-
batchQueue:
164-
env.BATCH_TRIGGER_WORKER_ENABLED === "true"
165-
? {
166-
redis: {
167-
keyPrefix: "engine:",
168-
port: env.BATCH_TRIGGER_WORKER_REDIS_PORT ?? undefined,
169-
host: env.BATCH_TRIGGER_WORKER_REDIS_HOST ?? undefined,
170-
username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME ?? undefined,
171-
password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD ?? undefined,
172-
enableAutoPipelining: true,
173-
...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
174-
},
175-
drr: {
176-
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
177-
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
178-
},
179-
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
180-
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
181-
}
182-
: undefined,
163+
batchQueue: {
164+
redis: {
165+
keyPrefix: "engine:",
166+
port: env.BATCH_TRIGGER_WORKER_REDIS_PORT ?? undefined,
167+
host: env.BATCH_TRIGGER_WORKER_REDIS_HOST ?? undefined,
168+
username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME ?? undefined,
169+
password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD ?? undefined,
170+
enableAutoPipelining: true,
171+
...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
172+
},
173+
drr: {
174+
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
175+
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
176+
},
177+
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
178+
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
179+
},
183180
});
184181

185182
// Set up BatchQueue callbacks if enabled
@@ -192,17 +189,24 @@ function createRunEngine() {
192189

193190
/**
194191
* Normalize the payload from BatchQueue.
195-
* The payload might be a JSON string if the SDK sent it pre-serialized.
196-
* If it's a JSON string and payloadType is "application/json", parse it
197-
* to avoid double-stringification in DefaultPayloadProcessor.
192+
*
193+
* Handles different payload types:
194+
* - "application/store": Already offloaded to R2, payload is the path - pass through as-is
195+
* - "application/json": May be a pre-serialized JSON string - parse to avoid double-stringification
196+
* - Other types: Pass through as-is
197+
*
198+
* @param payload - The raw payload from the batch item
199+
* @param payloadType - The payload type (e.g., "application/json", "application/store")
198200
*/
199201
function normalizePayload(payload: unknown, payloadType?: string): unknown {
200-
// Only normalize for JSON payloads
202+
// For non-JSON payloads (including application/store for R2-offloaded payloads),
203+
// return as-is - no normalization needed
201204
if (payloadType !== "application/json" && payloadType !== undefined) {
202205
return payload;
203206
}
204207

205-
// If payload is a string, try to parse it as JSON
208+
// For JSON payloads, if payload is a string, try to parse it
209+
// This handles pre-serialized JSON from the SDK
206210
if (typeof payload === "string") {
207211
try {
208212
return JSON.parse(payload);
@@ -218,14 +222,19 @@ function normalizePayload(payload: unknown, payloadType?: string): unknown {
218222
/**
219223
* Set up the BatchQueue processing callbacks.
220224
* These handle creating runs from batch items and completing batches.
225+
*
226+
* Payload handling:
227+
* - If payloadType is "application/store", the payload is an R2 path (already offloaded)
228+
* - DefaultPayloadProcessor in TriggerTaskService will pass it through without re-offloading
229+
* - The run engine will download from R2 when the task executes
221230
*/
222231
function setupBatchQueueCallbacks(engine: RunEngine) {
223232
// Item processing callback - creates a run for each batch item
224233
engine.setBatchProcessItemCallback(async ({ batchId, friendlyId, itemIndex, item, meta }) => {
225234
try {
226235
const triggerTaskService = new TriggerTaskService();
227236

228-
// Normalize payload to avoid double-stringification
237+
// Normalize payload - for application/store (R2 paths), this passes through as-is
229238
const payload = normalizePayload(item.payload, item.payloadType);
230239

231240
const result = await triggerTaskService.call(

0 commit comments

Comments
 (0)