diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index d927d56d67..bc6925e553 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -10,6 +10,7 @@ import { getIndexQueue, getGenerateLlmsTxtQueue, getDeepResearchQueue, + getBillingQueue, } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; @@ -58,6 +59,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ new BullAdapter(getIndexQueue()), new BullAdapter(getGenerateLlmsTxtQueue()), new BullAdapter(getDeepResearchQueue()), + new BullAdapter(getBillingQueue()), ], serverAdapter: serverAdapter, }); diff --git a/apps/api/src/services/billing/batch_billing.ts b/apps/api/src/services/billing/batch_billing.ts new file mode 100644 index 0000000000..caa2c0e0c9 --- /dev/null +++ b/apps/api/src/services/billing/batch_billing.ts @@ -0,0 +1,327 @@ +import { logger } from "../../lib/logger"; +import { redisConnection } from "../queue-service"; +import { supabase_service } from "../supabase"; +import * as Sentry from "@sentry/node"; +import { Queue } from "bullmq"; +import { withAuth } from "../../lib/withAuth"; +import { getACUC, setCachedACUC } from "../../controllers/auth"; + +// Configuration constants +const BATCH_KEY = "billing_batch"; +const BATCH_LOCK_KEY = "billing_batch_lock"; +const BATCH_SIZE = 50; // Batch size for processing +const BATCH_TIMEOUT = 15000; // 15 seconds processing interval +const LOCK_TIMEOUT = 30000; // 30 seconds lock timeout + +// Define interfaces for billing operations +interface BillingOperation { + team_id: string; + subscription_id: string | null; + credits: number; + is_extract: boolean; + timestamp: string; +} + +// Grouped billing operations for batch processing +interface GroupedBillingOperation { + team_id: string; + subscription_id: string | null; + total_credits: number; + is_extract: boolean; + operations: BillingOperation[]; +} + +// Function to acquire a lock for batch processing +async function acquireLock(): Promise { + const redis = redisConnection; + // Set lock with NX (only if it doesn't exist) and PX (millisecond expiry) + const result = await redis.set(BATCH_LOCK_KEY, "1", "PX", LOCK_TIMEOUT, "NX"); + const acquired = result === "OK"; + if (acquired) { + logger.info("🔒 Acquired billing batch processing lock"); + } + return acquired; +} + +// Function to release the lock +async function releaseLock() { + const redis = redisConnection; + await redis.del(BATCH_LOCK_KEY); + logger.info("🔓 Released billing batch processing lock"); +} + +// Main function to process the billing batch +export async function processBillingBatch() { + const redis = redisConnection; + + // Try to acquire lock + if (!(await acquireLock())) { + return; + } + + try { + // Get all operations from Redis list + const operations: BillingOperation[] = []; + while (operations.length < BATCH_SIZE) { + const op = await redis.lpop(BATCH_KEY); + if (!op) break; + operations.push(JSON.parse(op)); + } + + if (operations.length === 0) { + logger.info("No billing operations to process in batch"); + return; + } + + logger.info(`📦 Processing batch of ${operations.length} billing operations`); + + // Group operations by team_id and subscription_id + const groupedOperations = new Map(); + + for (const op of operations) { + const key = `${op.team_id}:${op.subscription_id ?? 'null'}:${op.is_extract}`; + + if (!groupedOperations.has(key)) { + groupedOperations.set(key, { + team_id: op.team_id, + subscription_id: op.subscription_id, + total_credits: 0, + is_extract: op.is_extract, + operations: [] + }); + } + + const group = groupedOperations.get(key)!; + group.total_credits += op.credits; + group.operations.push(op); + } + + // Process each group of operations + for (const [key, group] of groupedOperations.entries()) { + logger.info(`🔄 Billing team ${group.team_id} for ${group.total_credits} credits`, { + team_id: group.team_id, + subscription_id: group.subscription_id, + total_credits: group.total_credits, + operation_count: group.operations.length, + is_extract: group.is_extract + }); + + // Skip billing for preview teams + if (group.team_id === "preview" || group.team_id.startsWith("preview_")) { + logger.info(`Skipping billing for preview team ${group.team_id}`); + continue; + } + + try { + // Execute the actual billing + await withAuth(supaBillTeam, { success: true, message: "No DB, bypassed." })( + group.team_id, + group.subscription_id, + group.total_credits, + logger, + group.is_extract + ); + + logger.info(`✅ Successfully billed team ${group.team_id} for ${group.total_credits} credits`); + } catch (error) { + logger.error(`❌ Failed to bill team ${group.team_id}`, { error, group }); + Sentry.captureException(error, { + data: { + operation: "batch_billing", + team_id: group.team_id, + credits: group.total_credits + } + }); + } + } + + logger.info("✅ Billing batch processing completed successfully"); + } catch (error) { + logger.error("Error processing billing batch", { error }); + Sentry.captureException(error, { + data: { + operation: "batch_billing_process" + } + }); + } finally { + await releaseLock(); + } +} + +// Start periodic batch processing +let batchInterval: NodeJS.Timeout | null = null; + +export function startBillingBatchProcessing() { + if (batchInterval) return; + + logger.info("🔄 Starting periodic billing batch processing"); + batchInterval = setInterval(async () => { + const queueLength = await redisConnection.llen(BATCH_KEY); + logger.info(`Checking billing batch queue (${queueLength} items pending)`); + await processBillingBatch(); + }, BATCH_TIMEOUT); + + // Unref to not keep process alive + batchInterval.unref(); +} + +// Add a billing operation to the queue +export async function queueBillingOperation( + team_id: string, + subscription_id: string | null | undefined, + credits: number, + is_extract: boolean = false +) { + // Skip queuing for preview teams + if (team_id === "preview" || team_id.startsWith("preview_")) { + logger.info(`Skipping billing queue for preview team ${team_id}`); + return { success: true, message: "Preview team, no credits used" }; + } + + logger.info(`Queueing billing operation for team ${team_id}`, { + team_id, + subscription_id, + credits, + is_extract + }); + + try { + const operation: BillingOperation = { + team_id, + subscription_id: subscription_id ?? null, + credits, + is_extract, + timestamp: new Date().toISOString() + }; + + // Add operation to Redis list + const redis = redisConnection; + await redis.rpush(BATCH_KEY, JSON.stringify(operation)); + const queueLength = await redis.llen(BATCH_KEY); + logger.info(`📥 Added billing operation to queue (${queueLength} total pending)`, { + team_id, + credits + }); + + // Start batch processing if not already started + startBillingBatchProcessing(); + + // If we have enough items, trigger immediate processing + if (queueLength >= BATCH_SIZE) { + logger.info("🔄 Billing queue reached batch size, triggering immediate processing"); + await processBillingBatch(); + } + // TODO is there a better way to do this? + + // Update cached credits used immediately to provide accurate feedback to users + // This is optimistic - actual billing happens in batch + // Should we add this? + // I guess batch is fast enough that it's fine + + + // if (process.env.USE_DB_AUTHENTICATION === "true") { + // (async () => { + // // Get API keys for this team to update in cache + // const { data } = await supabase_service + // .from("api_keys") + // .select("key") + // .eq("team_id", team_id); + + // for (const apiKey of (data ?? []).map(x => x.key)) { + // await setCachedACUC(apiKey, (acuc) => + // acuc + // ? { + // ...acuc, + // credits_used: acuc.credits_used + credits, + // adjusted_credits_used: acuc.adjusted_credits_used + credits, + // remaining_credits: acuc.remaining_credits - credits, + // } + // : null, + // ); + // } + // })().catch(error => { + // logger.error("Failed to update cached credits", { error, team_id }); + // }); + // } + + return { success: true }; + } catch (error) { + logger.error("Error queueing billing operation", { error, team_id }); + Sentry.captureException(error, { + data: { + operation: "queue_billing", + team_id, + credits + } + }); + return { success: false, error }; + } +} + +// Modified version of the billing function for batch operations +async function supaBillTeam( + team_id: string, + subscription_id: string | null | undefined, + credits: number, + __logger?: any, + is_extract: boolean = false, +) { + const _logger = (__logger ?? logger).child({ + module: "credit_billing", + method: "supaBillTeam", + teamId: team_id, + subscriptionId: subscription_id, + credits, + }); + + if (team_id === "preview" || team_id.startsWith("preview_")) { + return { success: true, message: "Preview team, no credits used" }; + } + + _logger.info(`Batch billing team ${team_id} for ${credits} credits`); + + // Perform the actual database operation + const { data, error } = await supabase_service.rpc("bill_team_w_extract_3", { + _team_id: team_id, + sub_id: subscription_id ?? null, + fetch_subscription: subscription_id === undefined, + credits, + is_extract_param: is_extract, + }); + + if (error) { + Sentry.captureException(error); + _logger.error("Failed to bill team.", { error }); + return { success: false, error }; + } + + // Update cached ACUC to reflect the new credit usage + (async () => { + for (const apiKey of (data ?? []).map((x) => x.api_key)) { + await setCachedACUC(apiKey, (acuc) => + acuc + ? { + ...acuc, + credits_used: acuc.credits_used + credits, + adjusted_credits_used: acuc.adjusted_credits_used + credits, + remaining_credits: acuc.remaining_credits - credits, + } + : null, + ); + } + })().catch(error => { + _logger.error("Failed to update cached credits", { error, team_id }); + }); + + return { success: true, data }; +} + +// Cleanup on exit +process.on("beforeExit", async () => { + if (batchInterval) { + clearInterval(batchInterval); + batchInterval = null; + logger.info("Stopped periodic billing batch processing"); + } + await processBillingBatch(); +}); \ No newline at end of file diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index d3827f3093..47b5eeab36 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -10,6 +10,7 @@ import { issueCredits } from "./issue_credits"; import { redlock } from "../redlock"; import { autoCharge } from "./auto_charge"; import { getValue, setValue } from "../redis"; +import { queueBillingOperation } from "./batch_billing"; import type { Logger } from "winston"; // Deprecated, done via rpc @@ -25,14 +26,16 @@ export async function billTeam( logger?: Logger, is_extract: boolean = false, ) { - return withAuth(supaBillTeam, { success: true, message: "No DB, bypassed." })( - team_id, - subscription_id, - credits, - logger, - is_extract, - ); + // Maintain the withAuth wrapper for authentication + return withAuth( + async (team_id, subscription_id, credits, logger, is_extract) => { + // Within the authenticated context, queue the billing operation + return queueBillingOperation(team_id, subscription_id, credits, is_extract); + }, + { success: true, message: "No DB, bypassed." } + )(team_id, subscription_id, credits, logger, is_extract); } + export async function supaBillTeam( team_id: string, subscription_id: string | null | undefined, @@ -40,6 +43,8 @@ export async function supaBillTeam( __logger?: Logger, is_extract: boolean = false, ) { + // This function should no longer be called directly + // It has been moved to batch_billing.ts const _logger = (__logger ?? logger).child({ module: "credit_billing", method: "supaBillTeam", @@ -48,39 +53,16 @@ export async function supaBillTeam( credits, }); - if (team_id === "preview" || team_id.startsWith("preview_")) { - return { success: true, message: "Preview team, no credits used" }; - } - _logger.info(`Billing team ${team_id} for ${credits} credits`); - - const { data, error } = await supabase_service.rpc("bill_team_w_extract_3", { - _team_id: team_id, - sub_id: subscription_id ?? null, - fetch_subscription: subscription_id === undefined, - credits, - is_extract_param: is_extract, + _logger.warn("supaBillTeam was called directly. This function is deprecated and should only be called from batch_billing.ts"); + queueBillingOperation(team_id, subscription_id, credits, is_extract).catch((err) => { + _logger.error("Error queuing billing operation", { err }); + Sentry.captureException(err); }); - - if (error) { - Sentry.captureException(error); - _logger.error("Failed to bill team.", { error }); - return; - } - - (async () => { - for (const apiKey of (data ?? []).map((x) => x.api_key)) { - await setCachedACUC(apiKey, (acuc) => - acuc - ? { - ...acuc, - credits_used: acuc.credits_used + credits, - adjusted_credits_used: acuc.adjusted_credits_used + credits, - remaining_credits: acuc.remaining_credits - credits, - } - : null, - ); - } - })(); + // Forward to the batch billing system + return { + success: true, + message: "Billing operation queued", + }; } export type CheckTeamCreditsResponse = { diff --git a/apps/api/src/services/billing/queue-billing.ts b/apps/api/src/services/billing/queue-billing.ts new file mode 100644 index 0000000000..28c4e02958 --- /dev/null +++ b/apps/api/src/services/billing/queue-billing.ts @@ -0,0 +1,49 @@ +import { logger } from "../../lib/logger"; +import { getBillingQueue } from "../queue-service"; +import { v4 as uuidv4 } from "uuid"; +import * as Sentry from "@sentry/node"; + +/** + * Adds a job to the billing queue to trigger batch processing + * This can be used when we want to ensure billing is processed without waiting for the next interval + */ +export async function addBillingBatchJob() { + try { + const jobId = uuidv4(); + logger.info("Adding billing batch job to queue", { jobId }); + + await getBillingQueue().add( + "process-batch", + { + timestamp: new Date().toISOString(), + }, + { + jobId, + priority: 10, + } + ); + + return { success: true, jobId }; + } catch (error) { + logger.error("Error adding billing batch job", { error }); + Sentry.captureException(error, { + data: { + operation: "add_billing_batch_job" + } + }); + return { success: false, error }; + } +} + +/** + * Trigger immediate processing of any pending billing operations + * This is useful for ensuring billing operations are processed without delay + */ +export async function triggerImmediateBillingProcess() { + try { + return await addBillingBatchJob(); + } catch (error) { + logger.error("Error triggering immediate billing process", { error }); + return { success: false, error }; + } +} \ No newline at end of file diff --git a/apps/api/src/services/indexing/index-worker.ts b/apps/api/src/services/indexing/index-worker.ts index af9e456da5..0b38b90a75 100644 --- a/apps/api/src/services/indexing/index-worker.ts +++ b/apps/api/src/services/indexing/index-worker.ts @@ -7,8 +7,11 @@ import { redisConnection, indexQueueName, getIndexQueue, + billingQueueName, + getBillingQueue, } from "../queue-service"; import { saveCrawlMap } from "./crawl-maps-index"; +import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing"; import systemMonitor from "../system-monitor"; import { v4 as uuidv4 } from "uuid"; @@ -61,6 +64,59 @@ const processJobInternal = async (token: string, job: Job) => { return err; }; +// Create a processor for billing jobs +const processBillingJobInternal = async (token: string, job: Job) => { + if (!job.id) { + throw new Error("Job has no ID"); + } + + const logger = _logger.child({ + module: "billing-worker", + method: "processBillingJobInternal", + jobId: job.id, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on billing job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + let err = null; + try { + // Check job type - it could be either a batch processing trigger or an individual billing operation + if (job.name === "process-batch") { + // Process the entire batch + logger.info("Received batch process trigger job"); + await processBillingBatch(); + } else if (job.name === "bill_team") { + // This is an individual billing operation that should be queued for batch processing + const { team_id, subscription_id, credits, is_extract } = job.data; + + logger.info(`Adding team ${team_id} billing operation to batch queue`, { + credits, + is_extract, + originating_job_id: job.data.originating_job_id, + }); + + // Add to the REDIS batch queue + await queueBillingOperation(team_id, subscription_id, credits, is_extract); + } else { + logger.warn(`Unknown billing job type: ${job.name}`); + } + + await job.moveToCompleted({ success: true }, token, false); + } catch (error) { + logger.error("Error processing billing job", { error }); + Sentry.captureException(error); + err = error; + await job.moveToFailed(error, token, false); + } finally { + clearInterval(extendLockInterval); + } + + return err; +}; + let isShuttingDown = false; process.on("SIGINT", () => { @@ -75,7 +131,8 @@ process.on("SIGTERM", () => { let cantAcceptConnectionCount = 0; -const workerFun = async (queue: Queue) => { +// Generic worker function that can process different job types +const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) => Promise) => { const logger = _logger.child({ module: "index-worker", method: "workerFun" }); const worker = new Worker(queue.name, null, { @@ -139,13 +196,13 @@ const workerFun = async (queue: Queue) => { }, }, async () => { - await processJobInternal(token, job); + await jobProcessor(token, job); }, ); }, ); } else { - await processJobInternal(token, job); + await jobProcessor(token, job); } if (job.id) { @@ -168,7 +225,15 @@ const workerFun = async (queue: Queue) => { process.exit(0); }; -// Start the worker +// Start the workers (async () => { - await workerFun(getIndexQueue()); + // Start index worker + const indexWorkerPromise = workerFun(getIndexQueue(), processJobInternal); + + // Start billing worker and batch processing + startBillingBatchProcessing(); + const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal); + + // Wait for both workers to complete (which should only happen on shutdown) + await Promise.all([indexWorkerPromise, billingWorkerPromise]); })(); diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index b910fa4b4e..e84c006c6b 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -10,6 +10,7 @@ let loggingQueue: Queue; let indexQueue: Queue; let deepResearchQueue: Queue; let generateLlmsTxtQueue: Queue; +let billingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, @@ -21,6 +22,7 @@ export const loggingQueueName = "{loggingQueue}"; export const indexQueueName = "{indexQueue}"; export const generateLlmsTxtQueueName = "{generateLlmsTxtQueue}"; export const deepResearchQueueName = "{deepResearchQueue}"; +export const billingQueueName = "{billingQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -112,6 +114,24 @@ export function getDeepResearchQueue() { return deepResearchQueue; } +export function getBillingQueue() { + if (!billingQueue) { + billingQueue = new Queue(billingQueueName, { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + }); + logger.info("Billing queue created"); + } + return billingQueue; +} + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index cb967b57d5..9842ac5077 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -12,6 +12,7 @@ import { deepResearchQueueName, getIndexQueue, getGenerateLlmsTxtQueue, + getBillingQueue, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -1118,16 +1119,38 @@ async function processJob(job: Job & { id: string }, token: string) { creditsToBeBilled = 5; } - if (job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID!) { - billTeam(job.data.team_id, undefined, creditsToBeBilled, logger).catch( - (error) => { - logger.error( - `Failed to bill team ${job.data.team_id} for ${creditsToBeBilled} credits`, - { error }, - ); - // Optionally, you could notify an admin or add to a retry queue here - }, - ); + if (job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! && process.env.USE_DB_AUTHENTICATION === "true") { + try { + const billingJobId = uuidv4(); + logger.debug(`Adding billing job to queue for team ${job.data.team_id}`, { + billingJobId, + credits: creditsToBeBilled, + is_extract: job.data.scrapeOptions.extract, + }); + + // Add directly to the billing queue - the billing worker will handle the rest + await getBillingQueue().add( + "bill_team", + { + team_id: job.data.team_id, + subscription_id: undefined, + credits: creditsToBeBilled, + is_extract: job.data.scrapeOptions.extract, + timestamp: new Date().toISOString(), + originating_job_id: job.id + }, + { + jobId: billingJobId, + priority: 10, + } + ); + } catch (error) { + logger.error( + `Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`, + { error }, + ); + Sentry.captureException(error); + } } }