From ec2e7d81455b642db273f63cb14af1f8c4e394ec Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Wed, 20 May 2026 07:11:06 +0530 Subject: [PATCH] send LLM use-case metadata through Rowboat gateway Attach the current analytics use-case context to Rowboat gateway requests so backend billing generation rows can capture use_case, sub_use_case, and agent_name. Wrap streamed agent calls and direct instrumented LLM call sites in explicit use-case context to keep metadata available when provider requests are created. --- apps/x/packages/core/src/agents/runtime.ts | 32 +++++++++++++------ .../core/src/application/lib/builtin-tools.ts | 12 ++++--- apps/x/packages/core/src/events/routing.ts | 6 ++-- .../core/src/knowledge/classify_thread.ts | 5 +-- .../core/src/knowledge/inline_tasks.ts | 5 +-- .../core/src/knowledge/summarize_meeting.ts | 5 +-- apps/x/packages/core/src/models/gateway.ts | 5 +++ 7 files changed, 48 insertions(+), 22 deletions(-) diff --git a/apps/x/packages/core/src/agents/runtime.ts b/apps/x/packages/core/src/agents/runtime.ts index cbc83532..809fb996 100644 --- a/apps/x/packages/core/src/agents/runtime.ts +++ b/apps/x/packages/core/src/agents/runtime.ts @@ -28,7 +28,7 @@ import { IAbortRegistry } from "../runs/abort-registry.js"; import { PrefixLogger } from "@x/shared"; import { parse } from "yaml"; import { captureLlmUsage } from "../analytics/usage.js"; -import { enterUseCase, type UseCase } from "../analytics/use_case.js"; +import { enterUseCase, withUseCase, type UseCase } from "../analytics/use_case.js"; import { getRaw as getNoteCreationRaw } from "../knowledge/note_creation.js"; import { getRaw as getLabelingAgentRaw } from "../knowledge/labeling_agent.js"; import { getRaw as getNoteTaggingAgentRaw } from "../knowledge/note_tagging_agent.js"; @@ -1292,14 +1292,28 @@ async function* streamLlm( ): AsyncGenerator, void, unknown> { const converted = convertFromMessages(messages); console.log(`! SENDING payload to model: `, JSON.stringify(converted)) - const { fullStream } = streamText({ - model, - messages: converted, - system: instructions, - tools, - stopWhen: stepCountIs(1), - abortSignal: signal, - }); + const streamResult = analytics + ? withUseCase({ + useCase: analytics.useCase, + ...(analytics.subUseCase ? { subUseCase: analytics.subUseCase } : {}), + ...(analytics.agentName ? { agentName: analytics.agentName } : {}), + }, () => streamText({ + model, + messages: converted, + system: instructions, + tools, + stopWhen: stepCountIs(1), + abortSignal: signal, + })) + : streamText({ + model, + messages: converted, + system: instructions, + tools, + stopWhen: stepCountIs(1), + abortSignal: signal, + }); + const { fullStream } = streamResult; for await (const event of fullStream) { // Check abort on every chunk for responsiveness signal?.throwIfAborted(); diff --git a/apps/x/packages/core/src/application/lib/builtin-tools.ts b/apps/x/packages/core/src/application/lib/builtin-tools.ts index 5b46cb01..149ccb4a 100644 --- a/apps/x/packages/core/src/application/lib/builtin-tools.ts +++ b/apps/x/packages/core/src/application/lib/builtin-tools.ts @@ -50,7 +50,7 @@ import { generateText } from "ai"; import { createProvider } from "../../models/models.js"; import { getDefaultModelAndProvider, resolveProviderConfig } from "../../models/defaults.js"; import { captureLlmUsage } from "../../analytics/usage.js"; -import { getCurrentUseCase } from "../../analytics/use_case.js"; +import { getCurrentUseCase, withUseCase } from "../../analytics/use_case.js"; import { isSignedIn } from "../../account/account.js"; import { getAccessToken } from "../../auth/tokens.js"; import { API_URL } from "../../config/env.js"; @@ -818,7 +818,12 @@ export const BuiltinTools: z.infer = { const userPrompt = prompt || 'Convert this file to well-structured markdown.'; - const response = await generateText({ + const ctx = getCurrentUseCase(); + const response = await withUseCase({ + useCase: ctx?.useCase ?? 'copilot_chat', + subUseCase: 'file_parse', + ...(ctx?.agentName ? { agentName: ctx.agentName } : {}), + }, () => generateText({ model, messages: [ { @@ -829,9 +834,8 @@ export const BuiltinTools: z.infer = { ], }, ], - }); + })); - const ctx = getCurrentUseCase(); captureLlmUsage({ useCase: ctx?.useCase ?? 'copilot_chat', subUseCase: 'file_parse', diff --git a/apps/x/packages/core/src/events/routing.ts b/apps/x/packages/core/src/events/routing.ts index 097bf9dd..5f9e4222 100644 --- a/apps/x/packages/core/src/events/routing.ts +++ b/apps/x/packages/core/src/events/routing.ts @@ -3,7 +3,7 @@ import type { LanguageModel } from 'ai'; import { events, PrefixLogger } from '@x/shared'; import type { RowboatEvent } from '@x/shared/dist/events.js'; import { captureLlmUsage } from '../analytics/usage.js'; -import type { UseCase } from '../analytics/use_case.js'; +import { withUseCase, type UseCase } from '../analytics/use_case.js'; import type { EventConsumerTarget } from './consumer.js'; const log = new PrefixLogger('Events:Routing'); @@ -89,12 +89,12 @@ export async function routeBatch( for (let i = 0; i < targets.length; i += BATCH_SIZE) { const batch = targets.slice(i, i + BATCH_SIZE); try { - const result = await generateObject({ + const result = await withUseCase({ useCase: opts.useCase, subUseCase: 'routing' }, () => generateObject({ model, system: systemPrompt, prompt: buildPrompt(event, batch, opts.entityPlural), schema: events.Pass1OutputSchema, - }); + })); captureLlmUsage({ useCase: opts.useCase, subUseCase: 'routing', diff --git a/apps/x/packages/core/src/knowledge/classify_thread.ts b/apps/x/packages/core/src/knowledge/classify_thread.ts index 69109e34..01fc7731 100644 --- a/apps/x/packages/core/src/knowledge/classify_thread.ts +++ b/apps/x/packages/core/src/knowledge/classify_thread.ts @@ -12,6 +12,7 @@ import { resolveProviderConfig, } from '../models/defaults.js'; import { captureLlmUsage } from '../analytics/usage.js'; +import { withUseCase } from '../analytics/use_case.js'; import type { GmailThreadSnapshot } from './sync_gmail.js'; const STYLE_GUIDE_PATH = path.join(WorkDir, 'knowledge', 'Agent Notes', 'style', 'email.md'); @@ -222,12 +223,12 @@ export async function classifyThread( ? `${SYSTEM_PROMPT}\n\n# Skip the draft\n\nThe user already has their own draft in progress for this thread — DO NOT generate a draftResponse. Always omit the draftResponse field.` : SYSTEM_PROMPT; - const result = await generateObject({ + const result = await withUseCase({ useCase: 'knowledge_sync', subUseCase: 'email_classifier' }, () => generateObject({ model, system: systemPrompt, prompt: buildPrompt(snapshot, userEmail, styleGuide, calendar), schema: ClassificationSchema, - }); + })); captureLlmUsage({ useCase: 'knowledge_sync', diff --git a/apps/x/packages/core/src/knowledge/inline_tasks.ts b/apps/x/packages/core/src/knowledge/inline_tasks.ts index 5a19e4bd..cd2f23c0 100644 --- a/apps/x/packages/core/src/knowledge/inline_tasks.ts +++ b/apps/x/packages/core/src/knowledge/inline_tasks.ts @@ -11,6 +11,7 @@ import { createProvider } from '../models/models.js'; import { inlineTask } from '@x/shared'; import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js'; import { captureLlmUsage } from '../analytics/usage.js'; +import { withUseCase } from '../analytics/use_case.js'; const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds const INLINE_TASK_AGENT = 'inline_task_agent'; @@ -664,11 +665,11 @@ Default end time (local): ${localEnd} Respond with ONLY valid JSON: either a schedule object or null. No other text.`; try { - const result = await generateText({ + const result = await withUseCase({ useCase: 'knowledge_sync', subUseCase: 'inline_task_classify' }, () => generateText({ model, system: systemPrompt, prompt: instruction, - }); + })); captureLlmUsage({ useCase: 'knowledge_sync', diff --git a/apps/x/packages/core/src/knowledge/summarize_meeting.ts b/apps/x/packages/core/src/knowledge/summarize_meeting.ts index cd84cdb5..dde7acb5 100644 --- a/apps/x/packages/core/src/knowledge/summarize_meeting.ts +++ b/apps/x/packages/core/src/knowledge/summarize_meeting.ts @@ -5,6 +5,7 @@ import { createProvider } from '../models/models.js'; import { getDefaultModelAndProvider, getMeetingNotesModel, resolveProviderConfig } from '../models/defaults.js'; import { WorkDir } from '../config/config.js'; import { captureLlmUsage } from '../analytics/usage.js'; +import { withUseCase } from '../analytics/use_case.js'; const CALENDAR_SYNC_DIR = path.join(WorkDir, 'calendar_sync'); @@ -152,11 +153,11 @@ export async function summarizeMeeting(transcript: string, meetingStartTime?: st const prompt = `Meeting recording started at: ${meetingStartTime || 'unknown'}\n\n${transcript}${calendarContext}`; - const result = await generateText({ + const result = await withUseCase({ useCase: 'meeting_note' }, () => generateText({ model, system: SYSTEM_PROMPT, prompt, - }); + })); captureLlmUsage({ useCase: 'meeting_note', diff --git a/apps/x/packages/core/src/models/gateway.ts b/apps/x/packages/core/src/models/gateway.ts index 6f613704..ee14a7fa 100644 --- a/apps/x/packages/core/src/models/gateway.ts +++ b/apps/x/packages/core/src/models/gateway.ts @@ -1,12 +1,17 @@ import { ProviderV2 } from '@ai-sdk/provider'; import { createOpenRouter } from '@openrouter/ai-sdk-provider'; import { getAccessToken } from '../auth/tokens.js'; +import { getCurrentUseCase } from '../analytics/use_case.js'; import { API_URL } from '../config/env.js'; const authedFetch: typeof fetch = async (input, init) => { const token = await getAccessToken(); const headers = new Headers(init?.headers); headers.set('Authorization', `Bearer ${token}`); + const ctx = getCurrentUseCase(); + if (ctx?.useCase) headers.set('x-rowboat-use-case', ctx.useCase); + if (ctx?.subUseCase) headers.set('x-rowboat-sub-use-case', ctx.subUseCase); + if (ctx?.agentName) headers.set('x-rowboat-agent-name', ctx.agentName); return fetch(input, { ...init, headers }); };