Skip to content

Commit 7e073b4

Browse files
committed
fix batch queue redis config
1 parent 82da484 commit 7e073b4

File tree

3 files changed

+15
-33
lines changed

3 files changed

+15
-33
lines changed

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class BatchQueue {
4545
private fairQueue: FairQueue<typeof BatchItemPayloadSchema>;
4646
private completionTracker: BatchCompletionTracker;
4747
private logger: Logger;
48-
private concurrencyRedis: import("@internal/redis").Redis;
48+
private concurrencyRedis: Redis;
4949
private defaultConcurrency: number;
5050

5151
private processItemCallback?: ProcessBatchItemCallback;
@@ -91,22 +91,11 @@ export class BatchQueue {
9191
},
9292
});
9393

94-
// Create DRR scheduler
95-
const redisOptions: RedisOptions = {
96-
host: options.redis.host,
97-
port: options.redis.port,
98-
username: options.redis.username,
99-
password: options.redis.password,
100-
keyPrefix: options.redis.keyPrefix,
101-
enableAutoPipelining: options.redis.enableAutoPipelining,
102-
...(options.redis.tls ? { tls: {} } : {}),
103-
};
104-
10594
// Create a separate Redis client for concurrency lookups
106-
this.concurrencyRedis = createRedisClient(redisOptions);
95+
this.concurrencyRedis = createRedisClient(options.redis);
10796

10897
const scheduler = new DRRScheduler({
109-
redis: redisOptions,
98+
redis: options.redis,
11099
keys: keyProducer,
111100
quantum: options.drr.quantum,
112101
maxDeficit: options.drr.maxDeficit,
@@ -118,7 +107,7 @@ export class BatchQueue {
118107

119108
// Create FairQueue with telemetry and environment-based concurrency limiting
120109
const fairQueueOptions: FairQueueOptions<typeof BatchItemPayloadSchema> = {
121-
redis: redisOptions,
110+
redis: options.redis,
122111
keys: keyProducer,
123112
scheduler,
124113
payloadSchema: BatchItemPayloadSchema,
@@ -161,7 +150,7 @@ export class BatchQueue {
161150

162151
// Create completion tracker
163152
this.completionTracker = new BatchCompletionTracker({
164-
redis: redisOptions,
153+
redis: options.redis,
165154
logger: {
166155
debug: (msg, ctx) => this.logger.debug(msg, ctx),
167156
info: (msg, ctx) => this.logger.info(msg, ctx),
@@ -599,15 +588,18 @@ export class BatchQueue {
599588
// For inline payloads, store the full payload - it's under the offload threshold anyway
600589
const payloadStr =
601590
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
591+
602592
processedCount = await this.completionTracker.recordFailure(batchId, {
603593
index: itemIndex,
604594
taskIdentifier: item.task,
605595
payload: payloadStr,
606-
options: item.options as Record<string, unknown>,
596+
options: item.options,
607597
error: result.error,
608598
errorCode: result.errorCode,
609599
});
600+
610601
this.itemsFailedCounter?.add(1, { envId: meta.environmentId, errorCode: result.errorCode });
602+
611603
this.logger.error("Batch item processing failed", {
612604
batchId,
613605
itemIndex,
@@ -621,14 +613,16 @@ export class BatchQueue {
621613
// For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
622614
const payloadStr =
623615
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
616+
624617
processedCount = await this.completionTracker.recordFailure(batchId, {
625618
index: itemIndex,
626619
taskIdentifier: item.task,
627620
payload: payloadStr,
628-
options: item.options as Record<string, unknown>,
621+
options: item.options,
629622
error: error instanceof Error ? error.message : String(error),
630623
errorCode: "UNEXPECTED_ERROR",
631624
});
625+
632626
this.itemsFailedCounter?.add(1, { envId: meta.environmentId, errorCode: "UNEXPECTED_ERROR" });
633627
this.logger.error("Unexpected error processing batch item", {
634628
batchId,

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { RuntimeEnvironmentType } from "@trigger.dev/database";
33
import { Logger, LogLevel } from "@trigger.dev/core/logger";
44
import { GlobalRateLimiter } from "@trigger.dev/redis-worker";
55
import { Meter, Tracer } from "@internal/tracing";
6+
import { RedisOptions } from "@internal/redis";
67

78
// ============================================================================
89
// Batch Item Schemas
@@ -188,15 +189,7 @@ export type CompleteBatchResult = {
188189
*/
189190
export type BatchQueueOptions = {
190191
/** Redis connection options */
191-
redis: {
192-
host: string;
193-
port: number;
194-
username?: string;
195-
password?: string;
196-
keyPrefix?: string;
197-
tls?: boolean;
198-
enableAutoPipelining?: boolean;
199-
};
192+
redis: RedisOptions;
200193
/** DRR configuration */
201194
drr: DRRConfig;
202195
/** Number of consumer loops to run */

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,8 @@ export class RunEngine {
324324

325325
this.batchQueue = new BatchQueue({
326326
redis: {
327-
host: options.batchQueue?.redis.host ?? "localhost",
328-
port: options.batchQueue?.redis.port ?? 6379,
329-
username: options.batchQueue?.redis.username,
330-
password: options.batchQueue?.redis.password,
331327
keyPrefix: `${options.batchQueue?.redis.keyPrefix ?? ""}batch-queue:`,
332-
enableAutoPipelining: options.batchQueue?.redis.enableAutoPipelining ?? true,
333-
tls: options.batchQueue?.redis.tls !== undefined,
328+
...options.batchQueue?.redis,
334329
},
335330
drr: {
336331
quantum: options.batchQueue?.drr?.quantum ?? 5,

0 commit comments

Comments
 (0)