Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,10 @@ const EnvironmentSchema = z
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB

MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(1_000), // 1 second
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
Comment on lines 536 to 539
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defaulting MAXIMUM_DEV_QUEUE_SIZE changes enforcement behavior.

This turns previously-unlimited dev environments into a hard 500-queue cap (via guardQueueSizeLimitsForEnv). If that’s not intentional, remove the default and require an explicit env var to enable the limit.

💡 Suggested change (avoid unintended hard limit)
-    MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
+    MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
🤖 Prompt for AI Agents
In `@apps/webapp/app/env.server.ts` around lines 536 - 539, The
MAXIMUM_DEV_QUEUE_SIZE default of 500 introduces an unintended hard cap for dev
environments because guardQueueSizeLimitsForEnv reads this value and enforces
it; remove the .default(500) so MAXIMUM_DEV_QUEUE_SIZE remains
optional/undefined unless explicitly set in the environment, leaving the
z.coerce.number().int().optional() schema for MAXIMUM_DEV_QUEUE_SIZE and ensure
any code calling guardQueueSizeLimitsForEnv continues to treat undefined as "no
cap" (verify guardQueueSizeLimitsForEnv behavior and update it only if it
currently treats undefined incorrectly).

MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

Expand Down Expand Up @@ -591,6 +593,12 @@ const EnvironmentSchema = z
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),

// TTL System settings for automatic run expiration
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),

RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "~/v3/marqs/index.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit } from "~/v3/utils/queueLimits.server";
import { BasePresenter } from "./basePresenter.server";

export type Environment = {
Expand All @@ -9,6 +10,7 @@ export type Environment = {
concurrencyLimit: number;
burstFactor: number;
runsEnabled: boolean;
queueSizeLimit: number | null;
};

export class EnvironmentQueuePresenter extends BasePresenter {
Expand All @@ -30,19 +32,24 @@ export class EnvironmentQueuePresenter extends BasePresenter {
},
select: {
runsEnabled: true,
maximumDevQueueSize: true,
maximumDeployedQueueSize: true,
},
});

if (!organization) {
throw new Error("Organization not found");
}

const queueSizeLimit = getQueueSizeLimit(environment.type, organization);

return {
running,
queued,
concurrencyLimit: environment.maximumConcurrencyLimit,
burstFactor: environment.concurrencyLimitBurstFactor.toNumber(),
runsEnabled: environment.type === "DEVELOPMENT" || organization.runsEnabled,
queueSizeLimit,
};
}
}
51 changes: 36 additions & 15 deletions apps/webapp/app/presenters/v3/LimitsPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Ratelimit } from "@upstash/ratelimit";
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { createHash } from "node:crypto";
import { env } from "~/env.server";
import { getCurrentPlan } from "~/services/platform.v3.server";
Expand All @@ -12,6 +13,8 @@ import { BasePresenter } from "./basePresenter.server";
import { singleton } from "~/utils/singleton";
import { logger } from "~/services/logger.server";
import { CheckScheduleService } from "~/v3/services/checkSchedule.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit, getQueueSizeLimitSource } from "~/v3/utils/queueLimits.server";

// Create a singleton Redis client for rate limit queries
const rateLimitRedisClient = singleton("rateLimitQueryRedisClient", () =>
Expand Down Expand Up @@ -66,8 +69,7 @@ export type LimitsResult = {
logRetentionDays: QuotaInfo | null;
realtimeConnections: QuotaInfo | null;
batchProcessingConcurrency: QuotaInfo;
devQueueSize: QuotaInfo;
deployedQueueSize: QuotaInfo;
queueSize: QuotaInfo;
};
features: {
hasStagingEnvironment: FeatureInfo;
Expand All @@ -84,11 +86,13 @@ export class LimitsPresenter extends BasePresenter {
organizationId,
projectId,
environmentId,
environmentType,
environmentApiKey,
}: {
organizationId: string;
projectId: string;
environmentId: string;
environmentType: RuntimeEnvironmentType;
environmentApiKey: string;
}): Promise<LimitsResult> {
// Get organization with all limit-related fields
Expand Down Expand Up @@ -167,6 +171,30 @@ export class LimitsPresenter extends BasePresenter {
batchRateLimitConfig
);

// Get current queue size for this environment
// We need the runtime environment fields for the engine query
const runtimeEnv = await this._replica.runtimeEnvironment.findFirst({
where: { id: environmentId },
select: {
id: true,
maximumConcurrencyLimit: true,
concurrencyLimitBurstFactor: true,
},
});

let currentQueueSize = 0;
if (runtimeEnv) {
const engineEnv = {
id: runtimeEnv.id,
type: environmentType,
maximumConcurrencyLimit: runtimeEnv.maximumConcurrencyLimit,
concurrencyLimitBurstFactor: runtimeEnv.concurrencyLimitBurstFactor,
organization: { id: organizationId },
project: { id: projectId },
};
currentQueueSize = (await engine.lengthOfEnvQueue(engineEnv)) ?? 0;
}

// Get plan-level limits
const schedulesLimit = limits?.schedules?.number ?? null;
const teamMembersLimit = limits?.teamMembers?.number ?? null;
Expand Down Expand Up @@ -282,19 +310,12 @@ export class LimitsPresenter extends BasePresenter {
canExceed: true,
isUpgradable: true,
},
devQueueSize: {
name: "Dev queue size",
description: "Maximum pending runs in development environments",
limit: organization.maximumDevQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDevQueueSize ? "override" : "default",
},
deployedQueueSize: {
name: "Deployed queue size",
description: "Maximum pending runs in deployed environments",
limit: organization.maximumDeployedQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDeployedQueueSize ? "override" : "default",
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs per individual queue in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
},
Comment on lines 313 to 319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Queue-size quota never shows an upgrade action.

isUpgradable is omitted, so the Upgrade column is empty even for plan-based limits. If upgrades should be offered, set it to true (and optionally canExceed).

💡 Suggested change
         queueSize: {
           name: "Max queued runs",
           description: "Maximum pending runs across all queues in this environment",
           limit: getQueueSizeLimit(environmentType, organization),
           currentUsage: currentQueueSize,
           source: getQueueSizeLimitSource(environmentType, organization),
+          isUpgradable: true,
         },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs across all queues in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
},
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs across all queues in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
isUpgradable: true,
},
🤖 Prompt for AI Agents
In `@apps/webapp/app/presenters/v3/LimitsPresenter.server.ts` around lines 313 -
319, The queue-size quota object (queueSize) in LimitsPresenter.server.ts is
missing the isUpgradable flag so the UI never shows an Upgrade action; update
the queueSize payload returned by the presenter to include isUpgradable: true
for plan-based limits (and add canExceed: true|false as appropriate), e.g., set
isUpgradable to true when getQueueSizeLimitSource(environmentType, organization)
indicates a plan-based source and ensure the UI-facing fields
(queueSize.currentUsage, queueSize.limit, queueSize.source) remain unchanged.

},
features: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
organizationId: project.organizationId,
projectId: project.id,
environmentId: environment.id,
environmentType: environment.type,
environmentApiKey: environment.apiKey,
})
);
Expand Down Expand Up @@ -507,9 +508,8 @@ function QuotasSection({
// Include batch processing concurrency
quotaRows.push(quotas.batchProcessingConcurrency);

// Add queue size quotas if set
if (quotas.devQueueSize.limit !== null) quotaRows.push(quotas.devQueueSize);
if (quotas.deployedQueueSize.limit !== null) quotaRows.push(quotas.deployedQueueSize);
// Add queue size quota if set
if (quotas.queueSize.limit !== null) quotaRows.push(quotas.queueSize);

return (
<div className="flex flex-col gap-3">
Expand Down Expand Up @@ -556,9 +556,12 @@ function QuotaRow({
billingPath: string;
}) {
// For log retention, we don't show current usage as it's a duration, not a count
// For queue size, we don't show current usage as the limit is per-queue, not environment-wide
const isRetentionQuota = quota.name === "Log retention";
const isQueueSizeQuota = quota.name === "Max queued runs";
const hideCurrentUsage = isRetentionQuota || isQueueSizeQuota;
const percentage =
!isRetentionQuota && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
!hideCurrentUsage && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;

// Special handling for Log retention
if (quota.name === "Log retention") {
Expand Down Expand Up @@ -657,10 +660,10 @@ function QuotaRow({
alignment="right"
className={cn(
"tabular-nums",
isRetentionQuota ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
hideCurrentUsage ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
)}
>
{isRetentionQuota ? "–" : formatNumber(quota.currentUsage)}
{hideCurrentUsage ? "–" : formatNumber(quota.currentUsage)}
</TableCell>
<TableCell alignment="right">
<SourceBadge source={quota.source} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ export default function Page() {
<BigNumber
title="Queued"
value={environment.queued}
suffix={env.paused && environment.queued > 0 ? "paused" : undefined}
suffix={env.paused ? <span className="text-warning">paused</span> : undefined}
animate
accessory={
<div className="flex items-start gap-1">
Expand Down Expand Up @@ -509,7 +509,10 @@ export default function Page() {
{queues.length > 0 ? (
queues.map((queue) => {
const limit = queue.concurrencyLimit ?? environment.concurrencyLimit;
const isAtLimit = queue.running >= limit;
const isAtConcurrencyLimit = queue.running >= limit;
const isAtQueueLimit =
environment.queueSizeLimit !== null &&
queue.queued >= environment.queueSizeLimit;
const queueFilterableName = `${queue.type === "task" ? "task/" : ""}${
queue.name
}`;
Expand All @@ -535,7 +538,12 @@ export default function Page() {
Paused
</Badge>
) : null}
{isAtLimit ? (
{isAtQueueLimit ? (
<Badge variant="extra-small" className="text-error">
At queue limit
</Badge>
) : null}
{isAtConcurrencyLimit ? (
<Badge variant="extra-small" className="text-warning">
At concurrency limit
</Badge>
Expand All @@ -546,7 +554,8 @@ export default function Page() {
alignment="right"
className={cn(
"w-[1%] pl-16 tabular-nums",
queue.paused ? "opacity-50" : undefined
queue.paused ? "opacity-50" : undefined,
isAtQueueLimit && "text-error"
)}
>
{queue.queued}
Expand All @@ -557,7 +566,7 @@ export default function Page() {
"w-[1%] pl-16 tabular-nums",
queue.paused ? "opacity-50" : undefined,
queue.running > 0 && "text-text-bright",
isAtLimit && "text-warning"
isAtConcurrencyLimit && "text-warning"
)}
>
{queue.running}
Expand All @@ -577,7 +586,7 @@ export default function Page() {
className={cn(
"w-[1%] pl-16",
queue.paused ? "opacity-50" : undefined,
isAtLimit && "text-warning",
isAtConcurrencyLimit && "text-warning",
queue.concurrency?.overriddenAt && "font-medium text-text-bright"
)}
>
Expand Down Expand Up @@ -1118,3 +1127,4 @@ function BurstFactorTooltip({
/>
);
}

25 changes: 20 additions & 5 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,26 @@ export class IdempotencyKeyConcern {
}

// We have an idempotent run, so we return it
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;

//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
if (resumeParentOnCompletion && parentRunId) {
// Get or create waitpoint lazily (existing run may not have one if it was standalone)
let associatedWaitpoint = existingRun.associatedWaitpoint;
if (!associatedWaitpoint) {
associatedWaitpoint = await this.engine.getOrCreateRunWaitpoint({
runId: existingRun.id,
projectId: request.environment.projectId,
environmentId: request.environment.id,
});
}

// If run already completed, return without blocking
if (!associatedWaitpoint) {
return { isCached: true, run: existingRun };
}

await this.traceEventConcern.traceIdempotentRun(
request,
parentStore,
Expand All @@ -98,13 +113,13 @@ export class IdempotencyKeyConcern {
request.options?.parentAsLinkType === "replay"
? event.spanId
: event.traceparent?.spanId
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;

//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
waitpoints: associatedWaitpoint!.id,
spanIdToComplete: spanId,
batch: request.options?.batchId
? {
Expand Down
Loading