mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-22 18:45:19 +02:00
send LLM use-case metadata through Rowboat gateway
Attach the current analytics use-case context to Rowboat gateway requests so backend billing generation rows can capture use_case, sub_use_case, and agent_name. Wrap streamed agent calls and direct instrumented LLM call sites in explicit use-case context to keep metadata available when provider requests are created.
This commit is contained in:
parent
aba65843c2
commit
ec2e7d8145
7 changed files with 48 additions and 22 deletions
|
|
@ -28,7 +28,7 @@ import { IAbortRegistry } from "../runs/abort-registry.js";
|
||||||
import { PrefixLogger } from "@x/shared";
|
import { PrefixLogger } from "@x/shared";
|
||||||
import { parse } from "yaml";
|
import { parse } from "yaml";
|
||||||
import { captureLlmUsage } from "../analytics/usage.js";
|
import { captureLlmUsage } from "../analytics/usage.js";
|
||||||
import { enterUseCase, type UseCase } from "../analytics/use_case.js";
|
import { enterUseCase, withUseCase, type UseCase } from "../analytics/use_case.js";
|
||||||
import { getRaw as getNoteCreationRaw } from "../knowledge/note_creation.js";
|
import { getRaw as getNoteCreationRaw } from "../knowledge/note_creation.js";
|
||||||
import { getRaw as getLabelingAgentRaw } from "../knowledge/labeling_agent.js";
|
import { getRaw as getLabelingAgentRaw } from "../knowledge/labeling_agent.js";
|
||||||
import { getRaw as getNoteTaggingAgentRaw } from "../knowledge/note_tagging_agent.js";
|
import { getRaw as getNoteTaggingAgentRaw } from "../knowledge/note_tagging_agent.js";
|
||||||
|
|
@ -1292,14 +1292,28 @@ async function* streamLlm(
|
||||||
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
|
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
|
||||||
const converted = convertFromMessages(messages);
|
const converted = convertFromMessages(messages);
|
||||||
console.log(`! SENDING payload to model: `, JSON.stringify(converted))
|
console.log(`! SENDING payload to model: `, JSON.stringify(converted))
|
||||||
const { fullStream } = streamText({
|
const streamResult = analytics
|
||||||
model,
|
? withUseCase({
|
||||||
messages: converted,
|
useCase: analytics.useCase,
|
||||||
system: instructions,
|
...(analytics.subUseCase ? { subUseCase: analytics.subUseCase } : {}),
|
||||||
tools,
|
...(analytics.agentName ? { agentName: analytics.agentName } : {}),
|
||||||
stopWhen: stepCountIs(1),
|
}, () => streamText({
|
||||||
abortSignal: signal,
|
model,
|
||||||
});
|
messages: converted,
|
||||||
|
system: instructions,
|
||||||
|
tools,
|
||||||
|
stopWhen: stepCountIs(1),
|
||||||
|
abortSignal: signal,
|
||||||
|
}))
|
||||||
|
: streamText({
|
||||||
|
model,
|
||||||
|
messages: converted,
|
||||||
|
system: instructions,
|
||||||
|
tools,
|
||||||
|
stopWhen: stepCountIs(1),
|
||||||
|
abortSignal: signal,
|
||||||
|
});
|
||||||
|
const { fullStream } = streamResult;
|
||||||
for await (const event of fullStream) {
|
for await (const event of fullStream) {
|
||||||
// Check abort on every chunk for responsiveness
|
// Check abort on every chunk for responsiveness
|
||||||
signal?.throwIfAborted();
|
signal?.throwIfAborted();
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ import { generateText } from "ai";
|
||||||
import { createProvider } from "../../models/models.js";
|
import { createProvider } from "../../models/models.js";
|
||||||
import { getDefaultModelAndProvider, resolveProviderConfig } from "../../models/defaults.js";
|
import { getDefaultModelAndProvider, resolveProviderConfig } from "../../models/defaults.js";
|
||||||
import { captureLlmUsage } from "../../analytics/usage.js";
|
import { captureLlmUsage } from "../../analytics/usage.js";
|
||||||
import { getCurrentUseCase } from "../../analytics/use_case.js";
|
import { getCurrentUseCase, withUseCase } from "../../analytics/use_case.js";
|
||||||
import { isSignedIn } from "../../account/account.js";
|
import { isSignedIn } from "../../account/account.js";
|
||||||
import { getAccessToken } from "../../auth/tokens.js";
|
import { getAccessToken } from "../../auth/tokens.js";
|
||||||
import { API_URL } from "../../config/env.js";
|
import { API_URL } from "../../config/env.js";
|
||||||
|
|
@ -818,7 +818,12 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
|
||||||
|
|
||||||
const userPrompt = prompt || 'Convert this file to well-structured markdown.';
|
const userPrompt = prompt || 'Convert this file to well-structured markdown.';
|
||||||
|
|
||||||
const response = await generateText({
|
const ctx = getCurrentUseCase();
|
||||||
|
const response = await withUseCase({
|
||||||
|
useCase: ctx?.useCase ?? 'copilot_chat',
|
||||||
|
subUseCase: 'file_parse',
|
||||||
|
...(ctx?.agentName ? { agentName: ctx.agentName } : {}),
|
||||||
|
}, () => generateText({
|
||||||
model,
|
model,
|
||||||
messages: [
|
messages: [
|
||||||
{
|
{
|
||||||
|
|
@ -829,9 +834,8 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
});
|
}));
|
||||||
|
|
||||||
const ctx = getCurrentUseCase();
|
|
||||||
captureLlmUsage({
|
captureLlmUsage({
|
||||||
useCase: ctx?.useCase ?? 'copilot_chat',
|
useCase: ctx?.useCase ?? 'copilot_chat',
|
||||||
subUseCase: 'file_parse',
|
subUseCase: 'file_parse',
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ import type { LanguageModel } from 'ai';
|
||||||
import { events, PrefixLogger } from '@x/shared';
|
import { events, PrefixLogger } from '@x/shared';
|
||||||
import type { RowboatEvent } from '@x/shared/dist/events.js';
|
import type { RowboatEvent } from '@x/shared/dist/events.js';
|
||||||
import { captureLlmUsage } from '../analytics/usage.js';
|
import { captureLlmUsage } from '../analytics/usage.js';
|
||||||
import type { UseCase } from '../analytics/use_case.js';
|
import { withUseCase, type UseCase } from '../analytics/use_case.js';
|
||||||
import type { EventConsumerTarget } from './consumer.js';
|
import type { EventConsumerTarget } from './consumer.js';
|
||||||
|
|
||||||
const log = new PrefixLogger('Events:Routing');
|
const log = new PrefixLogger('Events:Routing');
|
||||||
|
|
@ -89,12 +89,12 @@ export async function routeBatch(
|
||||||
for (let i = 0; i < targets.length; i += BATCH_SIZE) {
|
for (let i = 0; i < targets.length; i += BATCH_SIZE) {
|
||||||
const batch = targets.slice(i, i + BATCH_SIZE);
|
const batch = targets.slice(i, i + BATCH_SIZE);
|
||||||
try {
|
try {
|
||||||
const result = await generateObject({
|
const result = await withUseCase({ useCase: opts.useCase, subUseCase: 'routing' }, () => generateObject({
|
||||||
model,
|
model,
|
||||||
system: systemPrompt,
|
system: systemPrompt,
|
||||||
prompt: buildPrompt(event, batch, opts.entityPlural),
|
prompt: buildPrompt(event, batch, opts.entityPlural),
|
||||||
schema: events.Pass1OutputSchema,
|
schema: events.Pass1OutputSchema,
|
||||||
});
|
}));
|
||||||
captureLlmUsage({
|
captureLlmUsage({
|
||||||
useCase: opts.useCase,
|
useCase: opts.useCase,
|
||||||
subUseCase: 'routing',
|
subUseCase: 'routing',
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import {
|
||||||
resolveProviderConfig,
|
resolveProviderConfig,
|
||||||
} from '../models/defaults.js';
|
} from '../models/defaults.js';
|
||||||
import { captureLlmUsage } from '../analytics/usage.js';
|
import { captureLlmUsage } from '../analytics/usage.js';
|
||||||
|
import { withUseCase } from '../analytics/use_case.js';
|
||||||
import type { GmailThreadSnapshot } from './sync_gmail.js';
|
import type { GmailThreadSnapshot } from './sync_gmail.js';
|
||||||
|
|
||||||
const STYLE_GUIDE_PATH = path.join(WorkDir, 'knowledge', 'Agent Notes', 'style', 'email.md');
|
const STYLE_GUIDE_PATH = path.join(WorkDir, 'knowledge', 'Agent Notes', 'style', 'email.md');
|
||||||
|
|
@ -222,12 +223,12 @@ export async function classifyThread(
|
||||||
? `${SYSTEM_PROMPT}\n\n# Skip the draft\n\nThe user already has their own draft in progress for this thread — DO NOT generate a draftResponse. Always omit the draftResponse field.`
|
? `${SYSTEM_PROMPT}\n\n# Skip the draft\n\nThe user already has their own draft in progress for this thread — DO NOT generate a draftResponse. Always omit the draftResponse field.`
|
||||||
: SYSTEM_PROMPT;
|
: SYSTEM_PROMPT;
|
||||||
|
|
||||||
const result = await generateObject({
|
const result = await withUseCase({ useCase: 'knowledge_sync', subUseCase: 'email_classifier' }, () => generateObject({
|
||||||
model,
|
model,
|
||||||
system: systemPrompt,
|
system: systemPrompt,
|
||||||
prompt: buildPrompt(snapshot, userEmail, styleGuide, calendar),
|
prompt: buildPrompt(snapshot, userEmail, styleGuide, calendar),
|
||||||
schema: ClassificationSchema,
|
schema: ClassificationSchema,
|
||||||
});
|
}));
|
||||||
|
|
||||||
captureLlmUsage({
|
captureLlmUsage({
|
||||||
useCase: 'knowledge_sync',
|
useCase: 'knowledge_sync',
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import { createProvider } from '../models/models.js';
|
||||||
import { inlineTask } from '@x/shared';
|
import { inlineTask } from '@x/shared';
|
||||||
import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js';
|
import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js';
|
||||||
import { captureLlmUsage } from '../analytics/usage.js';
|
import { captureLlmUsage } from '../analytics/usage.js';
|
||||||
|
import { withUseCase } from '../analytics/use_case.js';
|
||||||
|
|
||||||
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
|
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
|
||||||
const INLINE_TASK_AGENT = 'inline_task_agent';
|
const INLINE_TASK_AGENT = 'inline_task_agent';
|
||||||
|
|
@ -664,11 +665,11 @@ Default end time (local): ${localEnd}
|
||||||
Respond with ONLY valid JSON: either a schedule object or null. No other text.`;
|
Respond with ONLY valid JSON: either a schedule object or null. No other text.`;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await generateText({
|
const result = await withUseCase({ useCase: 'knowledge_sync', subUseCase: 'inline_task_classify' }, () => generateText({
|
||||||
model,
|
model,
|
||||||
system: systemPrompt,
|
system: systemPrompt,
|
||||||
prompt: instruction,
|
prompt: instruction,
|
||||||
});
|
}));
|
||||||
|
|
||||||
captureLlmUsage({
|
captureLlmUsage({
|
||||||
useCase: 'knowledge_sync',
|
useCase: 'knowledge_sync',
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import { createProvider } from '../models/models.js';
|
||||||
import { getDefaultModelAndProvider, getMeetingNotesModel, resolveProviderConfig } from '../models/defaults.js';
|
import { getDefaultModelAndProvider, getMeetingNotesModel, resolveProviderConfig } from '../models/defaults.js';
|
||||||
import { WorkDir } from '../config/config.js';
|
import { WorkDir } from '../config/config.js';
|
||||||
import { captureLlmUsage } from '../analytics/usage.js';
|
import { captureLlmUsage } from '../analytics/usage.js';
|
||||||
|
import { withUseCase } from '../analytics/use_case.js';
|
||||||
|
|
||||||
const CALENDAR_SYNC_DIR = path.join(WorkDir, 'calendar_sync');
|
const CALENDAR_SYNC_DIR = path.join(WorkDir, 'calendar_sync');
|
||||||
|
|
||||||
|
|
@ -152,11 +153,11 @@ export async function summarizeMeeting(transcript: string, meetingStartTime?: st
|
||||||
|
|
||||||
const prompt = `Meeting recording started at: ${meetingStartTime || 'unknown'}\n\n${transcript}${calendarContext}`;
|
const prompt = `Meeting recording started at: ${meetingStartTime || 'unknown'}\n\n${transcript}${calendarContext}`;
|
||||||
|
|
||||||
const result = await generateText({
|
const result = await withUseCase({ useCase: 'meeting_note' }, () => generateText({
|
||||||
model,
|
model,
|
||||||
system: SYSTEM_PROMPT,
|
system: SYSTEM_PROMPT,
|
||||||
prompt,
|
prompt,
|
||||||
});
|
}));
|
||||||
|
|
||||||
captureLlmUsage({
|
captureLlmUsage({
|
||||||
useCase: 'meeting_note',
|
useCase: 'meeting_note',
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,17 @@
|
||||||
import { ProviderV2 } from '@ai-sdk/provider';
|
import { ProviderV2 } from '@ai-sdk/provider';
|
||||||
import { createOpenRouter } from '@openrouter/ai-sdk-provider';
|
import { createOpenRouter } from '@openrouter/ai-sdk-provider';
|
||||||
import { getAccessToken } from '../auth/tokens.js';
|
import { getAccessToken } from '../auth/tokens.js';
|
||||||
|
import { getCurrentUseCase } from '../analytics/use_case.js';
|
||||||
import { API_URL } from '../config/env.js';
|
import { API_URL } from '../config/env.js';
|
||||||
|
|
||||||
const authedFetch: typeof fetch = async (input, init) => {
|
const authedFetch: typeof fetch = async (input, init) => {
|
||||||
const token = await getAccessToken();
|
const token = await getAccessToken();
|
||||||
const headers = new Headers(init?.headers);
|
const headers = new Headers(init?.headers);
|
||||||
headers.set('Authorization', `Bearer ${token}`);
|
headers.set('Authorization', `Bearer ${token}`);
|
||||||
|
const ctx = getCurrentUseCase();
|
||||||
|
if (ctx?.useCase) headers.set('x-rowboat-use-case', ctx.useCase);
|
||||||
|
if (ctx?.subUseCase) headers.set('x-rowboat-sub-use-case', ctx.subUseCase);
|
||||||
|
if (ctx?.agentName) headers.set('x-rowboat-agent-name', ctx.agentName);
|
||||||
return fetch(input, { ...init, headers });
|
return fetch(input, { ...init, headers });
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue