Add conversations and turns foundation + DDD (#188)

- Store conversations and turns for:
   - playground chat
   - api
 - New DDD code organisation with container dependency injection
 - sdk update
 - streaming api support
This commit is contained in:
Ramnique Singh 2025-08-05 14:40:48 +05:30 committed by GitHub
parent 659b23ae2b
commit 51a33ab2df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1474 additions and 525 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "rowboat"
version = "4.0.0"
version = "5.0.0"
authors = [
{ name = "Ramnique Singh", email = "ramnique@rowboatlabs.com" },
]

View file

@ -1,4 +1,4 @@
from .client import Client, StatefulChat
from .client import Client
from .schema import (
ApiMessage,
UserMessage,
@ -8,21 +8,4 @@ from .schema import (
ToolMessage,
ApiRequest,
ApiResponse
)
__version__ = "0.1.0"
__all__ = [
"Client",
"StatefulChat",
# Message types
"ApiMessage",
"UserMessage",
"SystemMessage",
"AssistantMessage",
"AssistantMessageWithToolCalls",
"ToolMessage",
# Request/Response types
"ApiRequest",
"ApiResponse",
]
)

View file

@ -1,36 +1,30 @@
from typing import Dict, List, Optional, Any, Union
from typing import Dict, List, Optional
import requests
from .schema import (
ApiRequest,
ApiResponse,
ApiMessage,
UserMessage,
AssistantMessage,
AssistantMessageWithToolCalls
)
class Client:
def __init__(self, host: str, project_id: str, api_key: str) -> None:
self.base_url: str = f'{host}/api/v1/{project_id}/chat'
def __init__(self, host: str, projectId: str, apiKey: str) -> None:
self.base_url: str = f'{host}/api/v1/{projectId}/chat'
self.headers: Dict[str, str] = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
'Authorization': f'Bearer {apiKey}'
}
def _call_api(
self,
messages: List[ApiMessage],
state: Optional[Dict[str, Any]] = None,
workflow_id: Optional[str] = None,
test_profile_id: Optional[str] = None,
mock_tools: Optional[Dict[str, str]] = None
conversationId: Optional[str] = None,
mockTools: Optional[Dict[str, str]] = None
) -> ApiResponse:
request = ApiRequest(
messages=messages,
state=state,
workflowId=workflow_id,
testProfileId=test_profile_id,
mockTools=mock_tools
conversationId=conversationId,
mockTools=mockTools
)
json_data = request.model_dump()
response = requests.post(self.base_url, headers=self.headers, json=json_data)
@ -38,86 +32,23 @@ class Client:
if not response.status_code == 200:
raise ValueError(f"Error: {response.status_code} - {response.text}")
response_data = ApiResponse.model_validate(response.json())
if not response_data.messages:
raise ValueError("No response")
last_message = response_data.messages[-1]
if not isinstance(last_message, (AssistantMessage, AssistantMessageWithToolCalls)):
raise ValueError("Last message was not an assistant message")
return ApiResponse.model_validate(response.json())
return response_data
def chat(
def run_turn(
self,
messages: List[ApiMessage],
state: Optional[Dict[str, Any]] = None,
workflow_id: Optional[str] = None,
test_profile_id: Optional[str] = None,
mock_tools: Optional[Dict[str, str]] = None,
conversationId: Optional[str] = None,
mockTools: Optional[Dict[str, str]] = None,
) -> ApiResponse:
"""Stateless chat method that handles a single conversation turn"""
# call api
response_data = self._call_api(
return self._call_api(
messages=messages,
state=state,
workflow_id=workflow_id,
test_profile_id=test_profile_id,
mock_tools=mock_tools,
conversationId=conversationId,
mockTools=mockTools,
)
if not response_data.messages[-1].responseType == 'external':
raise ValueError("Last message was not an external message")
return response_data
class StatefulChat:
"""Maintains conversation state across multiple turns"""
def __init__(
self,
client: Client,
workflow_id: Optional[str] = None,
test_profile_id: Optional[str] = None,
mock_tools: Optional[Dict[str, str]] = None,
) -> None:
self.client = client
self.messages: List[ApiMessage] = []
self.state: Optional[Dict[str, Any]] = None
self.workflow_id = workflow_id
self.test_profile_id = test_profile_id
self.mock_tools = mock_tools
def run(self, message: Union[str]) -> str:
"""Handle a single user turn in the conversation"""
# Process the message
user_msg = UserMessage(role='user', content=message)
self.messages.append(user_msg)
# Get response using the client's chat method
response_data = self.client.chat(
messages=self.messages,
state=self.state,
workflow_id=self.workflow_id,
test_profile_id=self.test_profile_id,
mock_tools=self.mock_tools,
)
# Update internal state
self.messages.extend(response_data.messages)
self.state = response_data.state
# Return only the final message content
last_message = self.messages[-1]
return last_message.content
def weather_lookup_tool(city_name: str) -> str:
return f"The weather in {city_name} is 22°C."
if __name__ == "__main__":
host: str = "<HOST>"
@ -125,13 +56,18 @@ if __name__ == "__main__":
api_key: str = "<API_KEY>"
client = Client(host, project_id, api_key)
result = client.chat(
result = client.run_turn(
messages=[
UserMessage(role='user', content="Hello")
UserMessage(role='user', content="list my github repos")
]
)
print(result.messages[-1].content)
print(result.turn.output[-1].content)
print(result.conversationId)
chat_session = StatefulChat(client)
resp = chat_session.run("Hello")
print(resp)
result = client.run_turn(
messages=[
UserMessage(role='user', content="how many did you find?")
],
conversationId=result.conversationId
)
print(result.turn.output[-1].content)

View file

@ -1,4 +1,4 @@
from typing import List, Optional, Union, Any, Literal, Dict
from typing import List, Optional, Union, Literal, Dict
from pydantic import BaseModel
class SystemMessage(BaseModel):
@ -44,13 +44,15 @@ ApiMessage = Union[
ToolMessage
]
class Turn(BaseModel):
id: str
output: List[ApiMessage]
class ApiRequest(BaseModel):
conversationId: Optional[str] = None
messages: List[ApiMessage]
state: Any
workflowId: Optional[str] = None
testProfileId: Optional[str] = None
mockTools: Optional[Dict[str, str]] = None
class ApiResponse(BaseModel):
messages: List[ApiMessage]
state: Optional[Any] = None
conversationId: str
turn: Turn

View file

@ -1,38 +0,0 @@
'use server';
import { z } from 'zod';
import { getAgenticResponseStreamId } from "../lib/utils";
import { check_query_limit } from "../lib/rate_limiting";
import { QueryLimitError } from "../lib/client_utils";
import { projectAuthCheck } from "./project_actions";
import { authorizeUserAction } from "./billing_actions";
import { Workflow } from "../lib/types/workflow_types";
import { Message } from "@/app/lib/types/types";
export async function getAssistantResponseStreamId(
projectId: string,
workflow: z.infer<typeof Workflow>,
messages: z.infer<typeof Message>[],
): Promise<{ streamId: string } | { billingError: string }> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}
// Check billing authorization
const agentModels = workflow.agents.reduce((acc, agent) => {
acc.push(agent.model);
return acc;
}, [] as string[]);
const { success, error } = await authorizeUserAction({
type: 'agent_response',
data: {
agentModels,
},
});
if (!success) {
return { billingError: error || 'Billing error' };
}
const response = await getAgenticResponseStreamId(projectId, workflow, messages);
return response;
}

View file

@ -8,7 +8,7 @@ import {
import { DataSource } from "../lib/types/datasource_types";
import { z } from 'zod';
import { check_query_limit } from "../lib/rate_limiting";
import { QueryLimitError } from "../lib/client_utils";
import { QueryLimitError } from "@/src/entities/errors/common";
import { projectAuthCheck } from "./project_actions";
import { redisClient } from "../lib/redis";
import { authorizeUserAction, logUsage } from "./billing_actions";

View file

@ -0,0 +1,54 @@
'use server';
import { z } from 'zod';
import { Workflow } from "../lib/types/workflow_types";
import { Message } from "@/app/lib/types/types";
import { authCheck } from './auth_actions';
import { container } from '@/di/container';
import { Conversation } from '@/src/entities/models/conversation';
import { ICreatePlaygroundConversationController } from '@/src/interface-adapters/controllers/conversations/create-playground-conversation.controller';
import { ICreateCachedTurnController } from '@/src/interface-adapters/controllers/conversations/create-cached-turn.controller';
export async function createConversation({
projectId,
workflow,
isLiveWorkflow,
}: {
projectId: string;
workflow: z.infer<typeof Workflow>;
isLiveWorkflow: boolean;
}): Promise<z.infer<typeof Conversation>> {
const user = await authCheck();
const controller = container.resolve<ICreatePlaygroundConversationController>("createPlaygroundConversationController");
return await controller.execute({
userId: user._id,
projectId,
workflow,
isLiveWorkflow,
});
}
export async function createCachedTurn({
conversationId,
messages,
}: {
conversationId: string;
messages: z.infer<typeof Message>[];
}): Promise<{ key: string }> {
const user = await authCheck();
const createCachedTurnController = container.resolve<ICreateCachedTurnController>("createCachedTurnController");
const { key } = await createCachedTurnController.execute({
caller: "user",
userId: user._id,
conversationId,
input: {
messages,
},
});
return {
key,
};
}

View file

@ -1,67 +1,41 @@
import { getCustomerIdForProject, logUsage } from "@/app/lib/billing";
import { USE_BILLING } from "@/app/lib/feature_flags";
import { redisClient } from "@/app/lib/redis";
import { streamResponse } from "@/app/lib/agents";
import { ZStreamAgentResponsePayload } from "@/app/lib/types/types";
import { container } from "@/di/container";
import { IRunCachedTurnController } from "@/src/interface-adapters/controllers/conversations/run-cached-turn.controller";
import { requireAuth } from "@/app/lib/auth";
export async function GET(request: Request, props: { params: Promise<{ streamId: string }> }) {
const params = await props.params;
// get the payload from redis
const payload = await redisClient.get(`chat-stream-${params.streamId}`);
if (!payload) {
return new Response("Stream not found", { status: 404 });
}
// parse the payload
const { projectId, workflow, messages } = ZStreamAgentResponsePayload.parse(JSON.parse(payload));
console.log('payload', payload);
// fetch billing customer id
let billingCustomerId: string | null = null;
if (USE_BILLING) {
billingCustomerId = await getCustomerIdForProject(projectId);
}
const encoder = new TextEncoder();
let messageCount = 0;
const stream = new ReadableStream({
async start(controller) {
try {
// Iterate over the generator
for await (const event of streamResponse(projectId, workflow, messages)) {
// Check if this is a message event (has role property)
if ('role' in event) {
if (event.role === 'assistant') {
messageCount++;
const params = await props.params;
// get user data
const user = await requireAuth();
const runCachedTurnController = container.resolve<IRunCachedTurnController>("runCachedTurnController");
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
// Iterate over the generator
for await (const event of runCachedTurnController.execute({
caller: "user",
userId: user._id,
cachedTurnKey: params.streamId,
})) {
controller.enqueue(encoder.encode(`event: message\ndata: ${JSON.stringify(event)}\n\n`));
}
controller.close();
} catch (error) {
console.error('Error processing stream:', error);
controller.error(error);
}
controller.enqueue(encoder.encode(`event: message\ndata: ${JSON.stringify(event)}\n\n`));
} else {
controller.enqueue(encoder.encode(`event: done\ndata: ${JSON.stringify(event)}\n\n`));
}
}
controller.close();
// Log billing usage
if (USE_BILLING && billingCustomerId) {
await logUsage(billingCustomerId, {
type: "agent_messages",
amount: messageCount,
});
}
} catch (error) {
console.error('Error processing stream:', error);
controller.error(error);
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}

View file

@ -1,14 +1,10 @@
import { NextRequest } from "next/server";
import { projectsCollection } from "../../../../lib/mongodb";
import { z } from "zod";
import { ObjectId } from "mongodb";
import { authCheck } from "../../utils";
import { ApiRequest, ApiResponse } from "../../../../lib/types/types";
import { check_query_limit } from "../../../../lib/rate_limiting";
import { ApiResponse } from "@/app/lib/types/api_types";
import { ApiRequest } from "@/app/lib/types/api_types";
import { PrefixLogger } from "../../../../lib/utils";
import { authorize, getCustomerIdForProject, logUsage } from "@/app/lib/billing";
import { USE_BILLING } from "@/app/lib/feature_flags";
import { getResponse } from "@/app/lib/agents";
import { container } from "@/di/container";
import { IRunTurnController } from "@/src/interface-adapters/controllers/conversations/run-turn.controller";
// get next turn / agent response
export async function POST(
@ -19,91 +15,69 @@ export async function POST(
const requestId = crypto.randomUUID();
const logger = new PrefixLogger(`${requestId}`);
logger.log(`Got chat request for project ${projectId}`);
// parse and validate the request body
let data;
try {
const body = await req.json();
data = ApiRequest.parse(body);
} catch (e) {
logger.log(`Invalid JSON in request body: ${e}`);
return Response.json({ error: "Invalid request" }, { status: 400 });
}
const { conversationId, messages, mockTools, stream } = data;
// check query limit
if (!await check_query_limit(projectId)) {
logger.log(`Query limit exceeded for project ${projectId}`);
return Response.json({ error: "Query limit exceeded" }, { status: 429 });
const runTurnController = container.resolve<IRunTurnController>("runTurnController");
// get assistant response
const response = await runTurnController.execute({
caller: "api",
apiKey: req.headers.get("Authorization")?.split(" ")[1],
projectId,
input: {
messages,
mockTools,
},
conversationId: conversationId || undefined,
stream: Boolean(stream),
});
// if streaming is requested, return SSE stream
if (stream && 'stream' in response) {
const encoder = new TextEncoder();
const readableStream = new ReadableStream({
async start(controller) {
try {
// Iterate over the generator
for await (const event of response.stream) {
controller.enqueue(encoder.encode(`event: message\ndata: ${JSON.stringify(event)}\n\n`));
}
controller.close();
} catch (error) {
logger.log(`Error processing stream: ${error}`);
controller.error(error);
}
},
});
return new Response(readableStream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
return await authCheck(projectId, req, async () => {
// fetch billing customer id
let billingCustomerId: string | null = null;
if (USE_BILLING) {
billingCustomerId = await getCustomerIdForProject(projectId);
}
// non-streaming response (existing behavior)
if (!('turn' in response)) {
logger.log(`No turn data found in response`);
return Response.json({ error: "No turn data found in response" }, { status: 500 });
}
// parse and validate the request body
let body;
try {
body = await req.json();
} catch (e) {
logger.log(`Invalid JSON in request body: ${e}`);
return Response.json({ error: "Invalid JSON in request body" }, { status: 400 });
}
logger.log(`Request json: ${JSON.stringify(body, null, 2)}`);
const result = ApiRequest.safeParse(body);
if (!result.success) {
logger.log(`Invalid request body: ${result.error.message}`);
return Response.json({ error: `Invalid request body: ${result.error.message}` }, { status: 400 });
}
const reqMessages = result.data.messages;
const mockToolOverrides = result.data.mockTools;
// fetch published workflow id
const project = await projectsCollection.findOne({
_id: projectId,
});
if (!project) {
logger.log(`Project ${projectId} not found`);
return Response.json({ error: "Project not found" }, { status: 404 });
}
// fetch workflow
const workflow = project.liveWorkflow;
if (!workflow) {
logger.log(`Workflow not found for project ${projectId}`);
return Response.json({ error: "Workflow not found" }, { status: 404 });
}
// override mock instructions
if (mockToolOverrides) {
workflow.mockTools = mockToolOverrides;
}
// check billing authorization
if (USE_BILLING && billingCustomerId) {
const agentModels = workflow.agents.reduce((acc, agent) => {
acc.push(agent.model);
return acc;
}, [] as string[]);
const response = await authorize(billingCustomerId, {
type: 'agent_response',
data: {
agentModels,
},
});
if (!response.success) {
return Response.json({ error: response.error || 'Billing error' }, { status: 402 });
}
}
// get assistant response
const { messages } = await getResponse(projectId, workflow, reqMessages);
// log billing usage
if (USE_BILLING && billingCustomerId) {
const agentMessageCount = messages.filter(m => m.role === 'assistant').length;
await logUsage(billingCustomerId, {
type: 'agent_messages',
amount: agentMessageCount,
});
}
const responseBody: z.infer<typeof ApiResponse> = {
messages,
};
return Response.json(responseBody);
});
const responseBody: z.infer<typeof ApiResponse> = {
conversationId: response.conversationId,
turn: response.turn,
};
return Response.json(responseBody);
}

View file

@ -23,13 +23,6 @@ const GUEST_BILLING_CUSTOMER = {
updatedAt: new Date().toISOString(),
};
export class BillingError extends Error {
constructor(message: string) {
super(message);
this.name = 'BillingError';
}
}
export async function getCustomerIdForProject(projectId: string): Promise<string> {
const project = await projectsCollection.findOne({ _id: projectId });
if (!project) {

View file

@ -1,13 +1,6 @@
import { WorkflowTool, WorkflowAgent, WorkflowPrompt } from "./types/workflow_types";
import { z } from "zod";
export class QueryLimitError extends Error {
constructor(message: string = 'Query limit exceeded') {
super(message);
this.name = 'QueryLimitError';
}
}
export function validateConfigChanges(configType: string, configChanges: Record<string, unknown>, name: string) {
let testObject: any;
let schema: z.ZodType<any>;

View file

@ -0,0 +1,14 @@
import { Message } from "./types";
import { Turn } from "@/src/entities/models/turn";
import { z } from "zod";
export const ApiRequest = z.object({
messages: z.array(Message),
conversationId: z.string().nullable().optional(),
mockTools: z.record(z.string(), z.string()).nullable().optional(),
stream: z.boolean().optional().nullable().default(false),
});export const ApiResponse = z.object({
turn: Turn,
conversationId: z.string().optional(),
});

View file

@ -1,24 +1,28 @@
import { z } from "zod";
import { Workflow, WorkflowTool } from "./workflow_types";
import { WorkflowTool } from "./workflow_types";
export const SystemMessage = z.object({
export const BaseMessage = z.object({
timestamp: z.string().datetime().optional(),
});
export const SystemMessage = BaseMessage.extend({
role: z.literal("system"),
content: z.string(),
});
export const UserMessage = z.object({
export const UserMessage = BaseMessage.extend({
role: z.literal("user"),
content: z.string(),
});
export const AssistantMessage = z.object({
export const AssistantMessage = BaseMessage.extend({
role: z.literal("assistant"),
content: z.string(),
agentName: z.string().nullable(),
responseType: z.enum(['internal', 'external']),
});
export const AssistantMessageWithToolCalls = z.object({
export const AssistantMessageWithToolCalls = BaseMessage.extend({
role: z.literal("assistant"),
content: z.null(),
toolCalls: z.array(z.object({
@ -32,7 +36,7 @@ export const AssistantMessageWithToolCalls = z.object({
agentName: z.string().nullable(),
});
export const ToolMessage = z.object({
export const ToolMessage = BaseMessage.extend({
role: z.literal("tool"),
content: z.string(),
toolCallId: z.string(),
@ -143,18 +147,6 @@ export const ChatClientId = z.object({
export type WithStringId<T> = T & { _id: string };
export const ApiRequest = z.object({
messages: z.array(Message),
state: z.unknown(),
testProfileId: z.string().nullable().optional(),
mockTools: z.record(z.string(), z.string()).nullable().optional(),
});
export const ApiResponse = z.object({
messages: z.array(Message),
state: z.unknown(),
});
// Helper function to convert MCP server tool to WorkflowTool
export function convertMcpServerToolToWorkflowTool(
mcpTool: z.infer<typeof McpServerTool>,
@ -194,9 +186,4 @@ export function convertMcpServerToolToWorkflowTool(
};
return converted;
}
export const ZStreamAgentResponsePayload = z.object({
projectId: z.string(),
workflow: Workflow,
messages: z.array(Message),
});
}

View file

@ -1,35 +1,7 @@
import { z } from "zod";
import { generateObject } from "ai";
import { openai } from "@ai-sdk/openai";
import { redisClient } from "./redis";
import { Workflow, WorkflowTool } from "./types/workflow_types";
import { Message, ZStreamAgentResponsePayload } from "./types/types";
export async function getAgenticResponseStreamId(
projectId: string,
workflow: z.infer<typeof Workflow>,
messages: z.infer<typeof Message>[],
): Promise<{
streamId: string,
}> {
const payload: z.infer<typeof ZStreamAgentResponsePayload> = {
projectId,
workflow,
messages,
}
// serialize the request
const serialized = JSON.stringify(payload);
// create a uuid for the stream
const streamId = crypto.randomUUID();
// store payload in redis
await redisClient.set(`chat-stream-${streamId}`, serialized, 'EX', 60 * 10); // expire in 10 minutes
return {
streamId,
};
}
import { Message } from "./types/types";
// create a PrefixLogger class that wraps console.log with a prefix
// and allows chaining with a parent logger

View file

@ -16,6 +16,7 @@ export function App({
messageSubscriber,
onPanelClick,
triggerCopilotChat,
isLiveWorkflow,
}: {
hidden?: boolean;
projectId: string;
@ -23,6 +24,7 @@ export function App({
messageSubscriber?: (messages: z.infer<typeof Message>[]) => void;
onPanelClick?: () => void;
triggerCopilotChat?: (message: string) => void;
isLiveWorkflow: boolean;
}) {
const [counter, setCounter] = useState<number>(0);
const [showDebugMessages, setShowDebugMessages] = useState<boolean>(true);
@ -118,6 +120,7 @@ export function App({
onCopyClick={(fn) => { getCopyContentRef.current = fn; }}
showDebugMessages={showDebugMessages}
triggerCopilotChat={triggerCopilotChat}
isLiveWorkflow={isLiveWorkflow}
/>
</div>
</Panel>

View file

@ -1,8 +1,8 @@
'use client';
import { useEffect, useRef, useState, useCallback } from "react";
import { getAssistantResponseStreamId } from "@/app/actions/actions";
import { createCachedTurn, createConversation } from "@/app/actions/playground-chat.actions";
import { Messages } from "./messages";
import z from "zod";
import { z } from "zod";
import { Message, ToolMessage } from "@/app/lib/types/types";
import { Workflow } from "@/app/lib/types/workflow_types";
import { ComposeBoxPlayground } from "@/components/common/compose-box-playground";
@ -11,6 +11,7 @@ import { BillingUpgradeModal } from "@/components/common/billing-upgrade-modal";
import { ChevronDownIcon } from "@heroicons/react/24/outline";
import { FeedbackModal } from "./feedback-modal";
import { FIX_WORKFLOW_PROMPT, FIX_WORKFLOW_PROMPT_WITH_FEEDBACK, EXPLAIN_WORKFLOW_PROMPT_ASSISTANT, EXPLAIN_WORKFLOW_PROMPT_TOOL, EXPLAIN_WORKFLOW_PROMPT_TRANSITION } from "../copilot-prompts";
import { TurnEvent } from "@/src/entities/models/turn";
export function Chat({
projectId,
@ -20,6 +21,7 @@ export function Chat({
showDebugMessages = true,
showJsonMode = false,
triggerCopilotChat,
isLiveWorkflow,
}: {
projectId: string;
workflow: z.infer<typeof Workflow>;
@ -28,10 +30,12 @@ export function Chat({
showDebugMessages?: boolean;
showJsonMode?: boolean;
triggerCopilotChat?: (message: string) => void;
isLiveWorkflow: boolean;
}) {
const conversationId = useRef<string | null>(null);
const [messages, setMessages] = useState<z.infer<typeof Message>[]>([]);
const [loadingAssistantResponse, setLoadingAssistantResponse] = useState<boolean>(false);
const [fetchResponseError, setFetchResponseError] = useState<string | null>(null);
const [loading, setLoading] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
const [billingError, setBillingError] = useState<string | null>(null);
const [lastAgenticRequest, setLastAgenticRequest] = useState<unknown | null>(null);
const [lastAgenticResponse, setLastAgenticResponse] = useState<unknown | null>(null);
@ -142,7 +146,7 @@ export function Chat({
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
setLoadingAssistantResponse(false);
setLoading(false);
}
}, []);
@ -152,7 +156,7 @@ export function Chat({
content: prompt,
}];
setMessages(updatedMessages);
setFetchResponseError(null);
setError(null);
setIsLastInteracted(true);
}
@ -165,7 +169,7 @@ export function Chat({
} else {
setShowUnreadBubble(true);
}
}, [optimisticMessages, loadingAssistantResponse, autoScroll]);
}, [optimisticMessages, loading, autoScroll]);
// Expose copy function to parent
useEffect(() => {
@ -190,148 +194,175 @@ export function Chat({
}
}, [messages, messageSubscriber]);
// get assistant response
// get agent response
useEffect(() => {
let ignore = false;
let eventSource: EventSource | null = null;
let msgs: z.infer<typeof Message>[] = [];
async function process() {
setLoadingAssistantResponse(true);
setFetchResponseError(null);
// Reset request/response state before making new request
setLastAgenticRequest(null);
setLastAgenticResponse(null);
let streamId: string | null = null;
try {
const response = await getAssistantResponseStreamId(
projectId,
workflow,
messages,
);
// first, if there is no conversation id, create it
if (!conversationId.current) {
const response = await createConversation({
projectId,
workflow,
isLiveWorkflow,
});
conversationId.current = response.id;
}
// set up a cached turn
const response = await createCachedTurn({
conversationId: conversationId.current,
messages: messages.slice(-1), // only send the last message
});
if (ignore) {
return;
}
if ('billingError' in response) {
setBillingError(response.billingError);
setFetchResponseError(response.billingError);
setLoadingAssistantResponse(false);
console.log('returning from getAssistantResponseStreamId due to billing error');
return;
}
streamId = response.streamId;
} catch (err) {
if (!ignore) {
setFetchResponseError(`Failed to get assistant response: ${err instanceof Error ? err.message : 'Unknown error'}`);
setLoadingAssistantResponse(false);
}
}
// if ('billingError' in response) {
// setBillingError(response.billingError);
// setError(response.billingError);
// setLoading(false);
// console.log('returning from createRun due to billing error');
// return;
// }
if (ignore || !streamId) {
return;
}
// stream events
eventSource = new EventSource(`/api/stream-response/${response.key}`);
eventSourceRef.current = eventSource;
console.log(`chat.tsx: got streamid: ${streamId}`);
eventSource = new EventSource(`/api/stream-response/${streamId}`);
eventSourceRef.current = eventSource;
// handle events
eventSource.addEventListener("message", (event) => {
console.log(`chat.tsx: got message: ${JSON.stringify(event.data)}`);
if (ignore) {
return;
}
eventSource.addEventListener("message", (event) => {
console.log(`chat.tsx: got message: ${event.data}`);
if (ignore) {
return;
}
try {
const data = JSON.parse(event.data);
const turnEvent = TurnEvent.parse(data);
console.log(`chat.tsx: got event: ${turnEvent}`);
try {
const data = JSON.parse(event.data);
const parsedMsg = Message.parse(data);
msgs.push(parsedMsg);
// Update optimistic messages immediately for real-time streaming UX
setOptimisticMessages(prev => [...prev, parsedMsg]);
} catch (err) {
console.error('Failed to parse SSE message:', err);
setFetchResponseError(`Failed to parse SSE message: ${err instanceof Error ? err.message : 'Unknown error'}`);
// Rollback to last known good state on parsing errors
setOptimisticMessages(messages);
}
});
switch (turnEvent.type) {
case "message": {
// Handle regular message events
const generatedMessage = turnEvent.data;
// Update optimistic messages immediately for real-time streaming UX
setOptimisticMessages(prev => [...prev, generatedMessage]);
break;
}
case "done": {
// Handle completion event
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
eventSource.addEventListener('done', (event) => {
console.log(`chat.tsx: got done event: ${event.data}`);
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
// Combine state and collected messages in the response
setLastAgenticResponse({
turn: turnEvent.turn,
messages: turnEvent.turn.output,
});
const parsed = JSON.parse(event.data);
// Commit all streamed messages atomically to the source of truth
setMessages([...messages, ...turnEvent.turn.output]);
setLoading(false);
break;
}
case "error": {
// Handle error event
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
// Combine state and collected messages in the response
setLastAgenticResponse({
...parsed,
messages: msgs
console.error('Turn Error:', turnEvent.error);
if (!ignore) {
setLoading(false);
setError('Error: ' + turnEvent.error);
// Rollback to last known good state on stream errors
setOptimisticMessages(messages);
// check if billing error
if (turnEvent.isBillingError) {
setBillingError(turnEvent.error);
}
}
break;
}
}
} catch (err) {
console.error('Failed to parse SSE message:', err);
setError(`Failed to parse SSE message: ${err instanceof Error ? err.message : 'Unknown error'}`);
// Rollback to last known good state on parsing errors
setOptimisticMessages(messages);
}
});
// Commit all streamed messages atomically to the source of truth
setMessages([...messages, ...msgs]);
setLoadingAssistantResponse(false);
});
eventSource.addEventListener('stream_error', (event) => {
console.log(`chat.tsx: got stream_error event: ${event.data}`);
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
console.error('SSE Error:', event);
if (!ignore) {
setLoading(false);
setError('Error: ' + JSON.parse(event.data).error);
// Rollback to last known good state on stream errors
setOptimisticMessages(messages);
}
});
eventSource.addEventListener('stream_error', (event) => {
console.log(`chat.tsx: got stream_error event: ${event.data}`);
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
console.error('SSE Error:', event);
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
if (!ignore) {
setLoading(false);
setError('Stream connection failed');
// Rollback to last known good state on connection errors
setOptimisticMessages(messages);
}
};
} catch (err) {
if (!ignore) {
setLoadingAssistantResponse(false);
setFetchResponseError('Error: ' + JSON.parse(event.data).error);
// Rollback to last known good state on stream errors
setOptimisticMessages(messages);
setError(`Failed to create run: ${err instanceof Error ? err.message : 'Unknown error'}`);
setLoading(false);
}
});
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
if (!ignore) {
setLoadingAssistantResponse(false);
setFetchResponseError('Stream connection failed');
// Rollback to last known good state on connection errors
setOptimisticMessages(messages);
}
};
}
// if last message is not a user message, return
if (messages.length > 0) {
const last = messages[messages.length - 1];
if (last.role !== 'user') {
return;
}
}
// if there is an error, return
if (fetchResponseError) {
// if there are no messages yet, return
if (messages.length === 0) {
return;
}
console.log(`executing response process: fetchresponseerr: ${fetchResponseError}`);
// if last message is not a user message, return
const last = messages[messages.length - 1];
if (last.role !== 'user') {
return;
}
// if there is an error, return
if (error) {
return;
}
console.log(`chat.tsx: fetching agent response`);
setLoading(true);
setError(null);
process();
return () => {
ignore = true;
if (eventSource) {
eventSource.close();
eventSourceRef.current = null;
}
};
}, [
conversationId,
messages,
projectId,
workflow,
fetchResponseError,
isLiveWorkflow,
error,
]);
return (
@ -349,9 +380,17 @@ export function Chat({
>
<Messages
projectId={projectId}
messages={optimisticMessages}
messages={[
{
role: 'assistant',
content: 'Hi, how can I help you today?',
agentName: 'assistant',
responseType: 'external',
},
...optimisticMessages,
]}
toolCallResults={toolCallResults}
loadingAssistantResponse={loadingAssistantResponse}
loadingAssistantResponse={loading}
workflow={workflow}
showDebugMessages={showDebugMessages}
showJsonMode={showJsonMode}
@ -403,15 +442,15 @@ export function Chat({
</Button>
</div>
)}
{fetchResponseError && (
{error && (
<div className="mb-4 p-3 bg-red-50 dark:bg-red-900/20 border border-red-200 dark:border-red-800
rounded-lg flex gap-2 justify-between items-center">
<p className="text-red-600 dark:text-red-400 text-sm">{fetchResponseError}</p>
<p className="text-red-600 dark:text-red-400 text-sm">{error}</p>
<Button
size="sm"
color="danger"
onPress={() => {
setFetchResponseError(null);
setError(null);
setBillingError(null);
}}
>
@ -423,7 +462,7 @@ export function Chat({
<ComposeBoxPlayground
handleUserMessage={handleUserMessage}
messages={messages.filter(msg => msg.content !== undefined) as any}
loading={loadingAssistantResponse}
loading={loading}
shouldAutoFocus={isLastInteracted}
onFocus={() => setIsLastInteracted(true)}
onCancel={handleStop}

View file

@ -1160,6 +1160,7 @@ export function WorkflowEditor({
messageSubscriber={updateChatMessages}
onPanelClick={handlePlaygroundClick}
triggerCopilotChat={triggerCopilotChat}
isLiveWorkflow={isLive}
/>
{state.present.selection?.type === "agent" && <AgentConfig
key={`agent-${state.present.workflow.agents.findIndex(agent => agent.name === state.present.selection!.name)}`}

View file

@ -16,7 +16,8 @@ import crypto from 'crypto';
import path from 'path';
import { createOpenAI } from '@ai-sdk/openai';
import { USE_BILLING, USE_GEMINI_FILE_PARSING } from '../lib/feature_flags';
import { authorize, BillingError, getCustomerIdForProject, logUsage } from '../lib/billing';
import { authorize, getCustomerIdForProject, logUsage } from '../lib/billing';
import { BillingError } from '@/src/entities/errors/common';
const FILE_PARSING_PROVIDER_API_KEY = process.env.FILE_PARSING_PROVIDER_API_KEY || process.env.OPENAI_API_KEY || '';
const FILE_PARSING_PROVIDER_BASE_URL = process.env.FILE_PARSING_PROVIDER_BASE_URL || undefined;

View file

@ -10,7 +10,8 @@ import { qdrantClient } from '../lib/qdrant';
import { PrefixLogger } from "../lib/utils";
import crypto from 'crypto';
import { USE_BILLING } from '../lib/feature_flags';
import { authorize, BillingError, getCustomerIdForProject, logUsage } from '../lib/billing';
import { authorize, getCustomerIdForProject, logUsage } from '../lib/billing';
import { BillingError } from '@/src/entities/errors/common';
const splitter = new RecursiveCharacterTextSplitter({
separators: ['\n\n', '\n', '. ', '.', ''],

View file

@ -11,7 +11,8 @@ import { qdrantClient } from '../lib/qdrant';
import { PrefixLogger } from "../lib/utils";
import crypto from 'crypto';
import { USE_BILLING } from '../lib/feature_flags';
import { authorize, BillingError, getCustomerIdForProject, logUsage } from '../lib/billing';
import { authorize, getCustomerIdForProject, logUsage } from '../lib/billing';
import { BillingError } from '@/src/entities/errors/common';
const firecrawl = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY });

View file

@ -0,0 +1,36 @@
import { RunConversationTurnUseCase } from "@/src/application/use-cases/conversations/run-conversation-turn.use-case";
import { MongoDBConversationsRepository } from "@/src/infrastructure/repositories/mongodb.conversations.repository";
import { RunCachedTurnController } from "@/src/interface-adapters/controllers/conversations/run-cached-turn.controller";
import { asClass, createContainer, InjectionMode } from "awilix";
import { CreatePlaygroundConversationController } from "@/src/interface-adapters/controllers/conversations/create-playground-conversation.controller";
import { CreateConversationUseCase } from "@/src/application/use-cases/conversations/create-conversation.use-case";
import { RedisCacheService } from "@/src/infrastructure/services/redis.cache.service";
import { CreateCachedTurnUseCase } from "@/src/application/use-cases/conversations/create-cached-turn.use-case";
import { FetchCachedTurnUseCase } from "@/src/application/use-cases/conversations/fetch-cached-turn.use-case";
import { CreateCachedTurnController } from "@/src/interface-adapters/controllers/conversations/create-cached-turn.controller";
import { RunTurnController } from "@/src/interface-adapters/controllers/conversations/run-turn.controller";
export const container = createContainer({
injectionMode: InjectionMode.PROXY,
strict: true,
});
container.register({
// services
// ---
cacheService: asClass(RedisCacheService).singleton(),
// conversations
// ---
conversationsRepository: asClass(MongoDBConversationsRepository).singleton(),
createConversationUseCase: asClass(CreateConversationUseCase).singleton(),
createCachedTurnUseCase: asClass(CreateCachedTurnUseCase).singleton(),
fetchCachedTurnUseCase: asClass(FetchCachedTurnUseCase).singleton(),
runConversationTurnUseCase: asClass(RunConversationTurnUseCase).singleton(),
createPlaygroundConversationController: asClass(CreatePlaygroundConversationController).singleton(),
createCachedTurnController: asClass(CreateCachedTurnController).singleton(),
runCachedTurnController: asClass(RunCachedTurnController).singleton(),
runTurnController: asClass(RunTurnController).singleton(),
});

View file

@ -1,6 +1,9 @@
/** @type {import('next').NextConfig} */
const nextConfig = {
output: 'standalone',
serverExternalPackages: [
'awilix',
],
};
export default nextConfig;

View file

@ -31,6 +31,7 @@
"@primer/react": "^37.27.0",
"@qdrant/js-client-rest": "^1.13.0",
"ai": "^4.3.13",
"awilix": "^12.0.5",
"cheerio": "^1.0.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
@ -45,6 +46,7 @@
"lucide-react": "^0.465.0",
"mermaid": "^11.9.0",
"mongodb": "^6.8.0",
"nanoid": "^5.1.5",
"next": "15.3.4",
"openai": "^4.67.2",
"quill": "^2.0.3",
@ -123,6 +125,24 @@
"zod": "^3.23.8"
}
},
"node_modules/@ai-sdk/provider-utils/node_modules/nanoid": {
"version": "3.3.11",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz",
"integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"bin": {
"nanoid": "bin/nanoid.cjs"
},
"engines": {
"node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1"
}
},
"node_modules/@ai-sdk/react": {
"version": "1.2.11",
"resolved": "https://registry.npmjs.org/@ai-sdk/react/-/react-1.2.11.tgz",
@ -4792,7 +4812,6 @@
"version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
"integrity": "sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==",
"dev": true,
"dependencies": {
"@nodelib/fs.stat": "2.0.5",
"run-parallel": "^1.1.9"
@ -4805,7 +4824,6 @@
"version": "2.0.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.stat/-/fs.stat-2.0.5.tgz",
"integrity": "sha512-RkhPPp2zrqDAQA/2jNhnztcPAlv64XdhIp7a7454A5ovI7Bukxgt7MX7udwAu3zg1DcpPU0rz3VV1SeaqvY4+A==",
"dev": true,
"engines": {
"node": ">= 8"
}
@ -4814,7 +4832,6 @@
"version": "1.2.8",
"resolved": "https://registry.npmjs.org/@nodelib/fs.walk/-/fs.walk-1.2.8.tgz",
"integrity": "sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg==",
"dev": true,
"dependencies": {
"@nodelib/fs.scandir": "2.1.5",
"fastq": "^1.6.0"
@ -8800,6 +8817,19 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/awilix": {
"version": "12.0.5",
"resolved": "https://registry.npmjs.org/awilix/-/awilix-12.0.5.tgz",
"integrity": "sha512-Qf/V/hRo6DK0FoBKJ9QiObasRxHAhcNi0mV6kW2JMawxS3zq6Un+VsZmVAZDUfvB+MjTEiJ2tUJUl4cr0JiUAw==",
"license": "MIT",
"dependencies": {
"camel-case": "^4.1.2",
"fast-glob": "^3.3.3"
},
"engines": {
"node": ">=16.3.0"
}
},
"node_modules/axe-core": {
"version": "4.10.3",
"resolved": "https://registry.npmjs.org/axe-core/-/axe-core-4.10.3.tgz",
@ -8952,7 +8982,6 @@
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz",
"integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==",
"dev": true,
"dependencies": {
"fill-range": "^7.1.1"
},
@ -9079,6 +9108,16 @@
"node": ">=6"
}
},
"node_modules/camel-case": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/camel-case/-/camel-case-4.1.2.tgz",
"integrity": "sha512-gxGWBrTT1JuMx6R+o5PTXMmUnhnVzLQ9SNutD4YqKtI6ap897t3tKECYla6gCWEkplXnlNybEkZg9GEGxKFCgw==",
"license": "MIT",
"dependencies": {
"pascal-case": "^3.1.2",
"tslib": "^2.0.3"
}
},
"node_modules/camelize": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/camelize/-/camelize-1.0.1.tgz",
@ -11315,16 +11354,16 @@
"integrity": "sha512-VxPP4NqbUjj6MaAOafWeUn2cXWLcCtljklUtZf0Ind4XQ+QPtmA0b18zZy0jIQx+ExRVCR/ZQpBmik5lXshNsw=="
},
"node_modules/fast-glob": {
"version": "3.3.2",
"resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.2.tgz",
"integrity": "sha512-oX2ruAFQwf/Orj8m737Y5adxDQO0LAB7/S5MnxCdTNDd4p6BsyIVsv9JQsATbTSq8KHRpLwIHbVlUNatxd+1Ow==",
"dev": true,
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.3.tgz",
"integrity": "sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==",
"license": "MIT",
"dependencies": {
"@nodelib/fs.stat": "^2.0.2",
"@nodelib/fs.walk": "^1.2.3",
"glob-parent": "^5.1.2",
"merge2": "^1.3.0",
"micromatch": "^4.0.4"
"micromatch": "^4.0.8"
},
"engines": {
"node": ">=8.6.0"
@ -11334,7 +11373,6 @@
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/glob-parent/-/glob-parent-5.1.2.tgz",
"integrity": "sha512-AOIgSQCepiJYwP3ARnGx+5VnTu2HBYdzbGP45eLw1vr3zB3vZLeyed1sC9hnbcOc9/SrMyM5RPQrkGz4aS9Zow==",
"dev": true,
"dependencies": {
"is-glob": "^4.0.1"
},
@ -11378,7 +11416,6 @@
"version": "1.17.1",
"resolved": "https://registry.npmjs.org/fastq/-/fastq-1.17.1.tgz",
"integrity": "sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==",
"dev": true,
"dependencies": {
"reusify": "^1.0.4"
}
@ -11410,7 +11447,6 @@
"version": "7.1.1",
"resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz",
"integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==",
"dev": true,
"dependencies": {
"to-regex-range": "^5.0.1"
},
@ -12367,7 +12403,6 @@
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/is-extglob/-/is-extglob-2.1.1.tgz",
"integrity": "sha512-SbKbANkN603Vi4jEZv49LeVJMn4yGwsbzZworEoyEiutsN3nJYdbO36zfhGJ6QEDpOZIFkDtnq5JRxmvl3jsoQ==",
"dev": true,
"engines": {
"node": ">=0.10.0"
}
@ -12411,7 +12446,6 @@
"version": "4.0.3",
"resolved": "https://registry.npmjs.org/is-glob/-/is-glob-4.0.3.tgz",
"integrity": "sha512-xelSayHH36ZgE7ZWhli7pW34hNbNl8Ojv5KVmkJD4hBdD3th8Tfk9vYasLM+mXWOZhFkgZfxhLSnrwRr4elSSg==",
"dev": true,
"dependencies": {
"is-extglob": "^2.1.1"
},
@ -12458,7 +12492,6 @@
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz",
"integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==",
"dev": true,
"engines": {
"node": ">=0.12.0"
}
@ -13403,6 +13436,15 @@
"loose-envify": "cli.js"
}
},
"node_modules/lower-case": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/lower-case/-/lower-case-2.0.2.tgz",
"integrity": "sha512-7fm3l3NAF9WfN6W3JOmf5drwpVqX78JtoGJ3A6W0a6ZnldM41w2fV5D490psKFTpMds8TJse/eHLFFsNHHjHgg==",
"license": "MIT",
"dependencies": {
"tslib": "^2.0.3"
}
},
"node_modules/lucide-react": {
"version": "0.465.0",
"resolved": "https://registry.npmjs.org/lucide-react/-/lucide-react-0.465.0.tgz",
@ -13766,7 +13808,6 @@
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/merge2/-/merge2-1.4.1.tgz",
"integrity": "sha512-8q7VEgMJW4J8tcfVPy8g09NcQwZdbwFEqhe/WZkoIzjn/3TGDwtOCYtXGxA3O8tPzpczCCDgv+P2P5y00ZJOOg==",
"dev": true,
"engines": {
"node": ">= 8"
}
@ -14392,7 +14433,6 @@
"version": "4.0.8",
"resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.8.tgz",
"integrity": "sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==",
"dev": true,
"dependencies": {
"braces": "^3.0.3",
"picomatch": "^2.3.1"
@ -14592,20 +14632,21 @@
}
},
"node_modules/nanoid": {
"version": "3.3.11",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz",
"integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==",
"version": "5.1.5",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-5.1.5.tgz",
"integrity": "sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"bin": {
"nanoid": "bin/nanoid.cjs"
"nanoid": "bin/nanoid.js"
},
"engines": {
"node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1"
"node": "^18 || >=20"
}
},
"node_modules/natural-compare": {
@ -15037,6 +15078,24 @@
"url": "https://opencollective.com/libvips"
}
},
"node_modules/next/node_modules/nanoid": {
"version": "3.3.11",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz",
"integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"bin": {
"nanoid": "bin/nanoid.cjs"
},
"engines": {
"node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1"
}
},
"node_modules/next/node_modules/postcss": {
"version": "8.4.31",
"resolved": "https://registry.npmjs.org/postcss/-/postcss-8.4.31.tgz",
@ -15106,6 +15165,16 @@
"@img/sharp-win32-x64": "0.34.2"
}
},
"node_modules/no-case": {
"version": "3.0.4",
"resolved": "https://registry.npmjs.org/no-case/-/no-case-3.0.4.tgz",
"integrity": "sha512-fgAN3jGAh+RoxUGZHTSOLJIqUc2wmoBwGR4tbpNAKmmovFoWq0OdRkb0VkldReO2a2iBT/OEulG9XSUc10r3zg==",
"license": "MIT",
"dependencies": {
"lower-case": "^2.0.2",
"tslib": "^2.0.3"
}
},
"node_modules/node-domexception": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz",
@ -15584,6 +15653,16 @@
"node": ">= 0.8"
}
},
"node_modules/pascal-case": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/pascal-case/-/pascal-case-3.1.2.tgz",
"integrity": "sha512-uWlGT3YSnK9x3BQJaOdcZwrnV6hPpd8jFH1/ucpiLRPh/2zCVJKS19E4GvYHvaCcACn3foXZ0cLB9Wrx1KGe5g==",
"license": "MIT",
"dependencies": {
"no-case": "^3.0.4",
"tslib": "^2.0.3"
}
},
"node_modules/path-data-parser": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/path-data-parser/-/path-data-parser-0.1.0.tgz",
@ -15739,6 +15818,25 @@
"resolved": "https://registry.npmjs.org/postcss-value-parser/-/postcss-value-parser-4.2.0.tgz",
"integrity": "sha512-1NNCs6uurfkVbeXG4S8JFT9t19m45ICnif8zWLd5oPSZ50QnwMfK+H3jv408d4jw/7Bttv5axS5IiHoLaVNHeQ=="
},
"node_modules/postcss/node_modules/nanoid": {
"version": "3.3.11",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz",
"integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==",
"dev": true,
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"bin": {
"nanoid": "bin/nanoid.cjs"
},
"engines": {
"node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1"
}
},
"node_modules/prelude-ls": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/prelude-ls/-/prelude-ls-1.2.1.tgz",
@ -15835,7 +15933,6 @@
"version": "1.2.3",
"resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz",
"integrity": "sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==",
"dev": true,
"funding": [
{
"type": "github",
@ -16237,7 +16334,6 @@
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz",
"integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==",
"dev": true,
"engines": {
"iojs": ">=1.0.0",
"node": ">=0.10.0"
@ -16322,7 +16418,6 @@
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
"integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
"dev": true,
"funding": [
{
"type": "github",
@ -17220,7 +17315,6 @@
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz",
"integrity": "sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==",
"dev": true,
"dependencies": {
"is-number": "^7.0.0"
},

View file

@ -12,7 +12,8 @@
"deleteQdrant": "tsx app/scripts/delete_qdrant.ts",
"ragUrlsWorker": "tsx app/scripts/rag_urls_worker.ts",
"ragFilesWorker": "tsx app/scripts/rag_files_worker.ts",
"ragTextWorker": "tsx app/scripts/rag_text_worker.ts"
"ragTextWorker": "tsx app/scripts/rag_text_worker.ts",
"worker": "tsx app/scripts/worker.ts"
},
"dependencies": {
"@ai-sdk/openai": "^1.3.21",
@ -38,6 +39,7 @@
"@primer/react": "^37.27.0",
"@qdrant/js-client-rest": "^1.13.0",
"ai": "^4.3.13",
"awilix": "^12.0.5",
"cheerio": "^1.0.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
@ -52,6 +54,7 @@
"lucide-react": "^0.465.0",
"mermaid": "^11.9.0",
"mongodb": "^6.8.0",
"nanoid": "^5.1.5",
"next": "15.3.4",
"openai": "^4.67.2",
"quill": "^2.0.3",

View file

@ -0,0 +1,27 @@
import { z } from "zod";
import { Conversation } from "@/src/entities/models/conversation";
import { Turn } from "@/src/entities/models/turn";
export const CreateConversationData = Conversation.pick({
projectId: true,
workflow: true,
isLiveWorkflow: true,
});
export const AddTurnData = Turn.omit({
id: true,
createdAt: true,
updatedAt: true,
});
export interface IConversationsRepository {
// create a new conversation
createConversation(data: z.infer<typeof CreateConversationData>): Promise<z.infer<typeof Conversation>>;
// get conversation
getConversation(id: string): Promise<z.infer<typeof Conversation> | null>;
// add turn data to conversation
// returns the created turn
addTurn(conversationId: string, data: z.infer<typeof AddTurnData>): Promise<z.infer<typeof Turn>>;
}

View file

@ -0,0 +1,34 @@
/**
* Interface defining the contract for cache service implementations.
*
* This interface provides methods for storing, retrieving, and deleting cached data
* with support for time-to-live (TTL) expiration. Implementations can use various
* caching backends such as Redis, in-memory storage, or other cache providers.
*/
export interface ICacheService {
/**
* Retrieves a value from the cache by its key.
*
* @param key - The unique identifier for the cached item
* @returns A promise that resolves to the cached value as a string, or null if the key doesn't exist or has expired
*/
get(key: string): Promise<string | null>;
/**
* Stores a value in the cache with a specified time-to-live.
*
* @param key - The unique identifier for the cached item
* @param value - The value to cache (will be stored as a string)
* @param ttl - Time-to-live in seconds. If not provided, the item will be cached indefinitely.
* @returns A promise that resolves when the value has been successfully stored
*/
set(key: string, value: string, ttl?: number): Promise<void>;
/**
* Removes a cached item by its key.
*
* @param key - The unique identifier of the cached item to remove
* @returns A promise that resolves to true if the item was successfully deleted, false if the key didn't exist
*/
delete(key: string): Promise<boolean>;
}

View file

@ -0,0 +1,97 @@
import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common';
import { check_query_limit } from "@/app/lib/rate_limiting";
import { QueryLimitError } from "@/src/entities/errors/common";
import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb";
import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface";
import { z } from "zod";
import { nanoid } from 'nanoid';
import { ICacheService } from '@/src/application/services/cache.service.interface';
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
conversationId: z.string(),
input: Turn.shape.input,
});
export interface ICreateCachedTurnUseCase {
execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }>;
}
export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase {
private readonly cacheService: ICacheService;
private readonly conversationsRepository: IConversationsRepository;
constructor({
cacheService,
conversationsRepository,
}: {
cacheService: ICacheService,
conversationsRepository: IConversationsRepository,
}) {
this.cacheService = cacheService;
this.conversationsRepository = conversationsRepository;
}
async execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }> {
// fetch conversation
const conversation = await this.conversationsRepository.getConversation(data.conversationId);
if (!conversation) {
throw new NotFoundError('Conversation not found');
}
// extract projectid from conversation
const { projectId } = conversation;
// check query limit for project
if (!await check_query_limit(projectId)) {
throw new QueryLimitError('Query limit exceeded');
}
// if caller is a user, ensure they are a member of project
if (data.caller === "user") {
if (!data.userId) {
throw new BadRequestError('User ID is required');
}
const membership = await projectMembersCollection.findOne({
projectId,
userId: data.userId,
});
if (!membership) {
throw new NotAuthorizedError('User not a member of project');
}
} else {
if (!data.apiKey) {
throw new BadRequestError('API key is required');
}
// check if api key is valid
// while also updating last used timestamp
const result = await apiKeysCollection.findOneAndUpdate(
{
projectId,
key: data.apiKey,
},
{ $set: { lastUsedAt: new Date().toISOString() } }
);
if (!result) {
throw new NotAuthorizedError('Invalid API key');
}
}
// create cache entry
const key = nanoid();
const payload: z.infer<typeof CachedTurnRequest> = {
conversationId: data.conversationId,
input: data.input,
};
// store payload in cache
await this.cacheService.set(`turn-${key}`, JSON.stringify(payload), 60 * 10); // expire in 10 minutes
return {
key,
}
}
}

View file

@ -0,0 +1,96 @@
import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common';
import { check_query_limit } from "@/app/lib/rate_limiting";
import { QueryLimitError } from "@/src/entities/errors/common";
import { apiKeysCollection, projectMembersCollection, projectsCollection } from "@/app/lib/mongodb";
import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface";
import { z } from "zod";
import { Conversation } from "@/src/entities/models/conversation";
import { Workflow } from "@/app/lib/types/workflow_types";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
workflow: Workflow.optional(),
isLiveWorkflow: z.boolean().optional(),
});
export interface ICreateConversationUseCase {
execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>>;
}
export class CreateConversationUseCase implements ICreateConversationUseCase {
private readonly conversationsRepository: IConversationsRepository;
constructor({
conversationsRepository,
}: {
conversationsRepository: IConversationsRepository,
}) {
this.conversationsRepository = conversationsRepository;
}
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>> {
const { caller, userId, apiKey, projectId } = data;
let isLiveWorkflow = Boolean(data.isLiveWorkflow);
let workflow = data.workflow;
// check query limit for project
if (!await check_query_limit(projectId)) {
throw new QueryLimitError('Query limit exceeded');
}
// if caller is a user, ensure they are a member of project
if (caller === "user") {
if (!userId) {
throw new BadRequestError('User ID is required');
}
const membership = await projectMembersCollection.findOne({
projectId,
userId,
});
if (!membership) {
throw new NotAuthorizedError('User not a member of project');
}
} else {
if (!apiKey) {
throw new BadRequestError('API key is required');
}
// check if api key is valid
// while also updating last used timestamp
const result = await apiKeysCollection.findOneAndUpdate(
{
projectId,
key: apiKey,
},
{ $set: { lastUsedAt: new Date().toISOString() } }
);
if (!result) {
throw new NotAuthorizedError('Invalid API key');
}
}
// if workflow is not provided, fetch workflow
if (!workflow) {
const project = await projectsCollection.findOne({
_id: projectId,
});
if (!project) {
throw new NotFoundError('Project not found');
}
if (!project.liveWorkflow) {
throw new BadRequestError('Project does not have a live workflow');
}
workflow = project.liveWorkflow;
isLiveWorkflow = true;
}
// create conversation
return await this.conversationsRepository.createConversation({
projectId,
workflow,
isLiveWorkflow,
});
}
}

View file

@ -0,0 +1,96 @@
import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common';
import { check_query_limit } from "@/app/lib/rate_limiting";
import { QueryLimitError } from "@/src/entities/errors/common";
import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb";
import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface";
import { z } from "zod";
import { ICacheService } from '@/src/application/services/cache.service.interface';
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
key: z.string(),
});
export interface IFetchCachedTurnUseCase {
execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof CachedTurnRequest>>;
}
export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase {
private readonly cacheService: ICacheService;
private readonly conversationsRepository: IConversationsRepository;
constructor({
cacheService,
conversationsRepository,
}: {
cacheService: ICacheService,
conversationsRepository: IConversationsRepository,
}) {
this.cacheService = cacheService;
this.conversationsRepository = conversationsRepository;
}
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof CachedTurnRequest>> {
// fetch cached turn
const payload = await this.cacheService.get(`turn-${data.key}`);
if (!payload) {
throw new NotFoundError('Cached turn not found');
}
// parse cached turn
const cachedTurn = CachedTurnRequest.parse(JSON.parse(payload));
// fetch conversation
const conversation = await this.conversationsRepository.getConversation(cachedTurn.conversationId);
if (!conversation) {
throw new NotFoundError('Conversation not found');
}
// extract projectid from conversation
const { projectId } = conversation;
// check query limit for project
if (!await check_query_limit(projectId)) {
throw new QueryLimitError('Query limit exceeded');
}
// if caller is a user, ensure they are a member of project
if (data.caller === "user") {
if (!data.userId) {
throw new BadRequestError('User ID is required');
}
const membership = await projectMembersCollection.findOne({
projectId,
userId: data.userId,
});
if (!membership) {
throw new NotAuthorizedError('User not a member of project');
}
} else {
if (!data.apiKey) {
throw new BadRequestError('API key is required');
}
// check if api key is valid
// while also updating last used timestamp
const result = await apiKeysCollection.findOneAndUpdate(
{
projectId,
key: data.apiKey,
},
{ $set: { lastUsedAt: new Date().toISOString() } }
);
if (!result) {
throw new NotAuthorizedError('Invalid API key');
}
}
// delete from cache
await this.cacheService.delete(`turn-${data.key}`);
// return cached turn
return cachedTurn;
}
}

View file

@ -0,0 +1,162 @@
import { Turn, TurnEvent } from "@/src/entities/models/turn";
import { USE_BILLING } from "@/app/lib/feature_flags";
import { authorize, getCustomerIdForProject } from "@/app/lib/billing";
import { BadRequestError, BillingError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common';
import { check_query_limit } from "@/app/lib/rate_limiting";
import { QueryLimitError } from "@/src/entities/errors/common";
import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb";
import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface";
import { streamResponse } from "@/app/lib/agents";
import { z } from "zod";
import { Message } from "@/app/lib/types/types";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
conversationId: z.string(),
trigger: Turn.shape.trigger,
input: Turn.shape.input,
});
export interface IRunConversationTurnUseCase {
execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown>;
}
export class RunConversationTurnUseCase implements IRunConversationTurnUseCase {
private readonly conversationsRepository: IConversationsRepository;
constructor({
conversationsRepository,
}: {
conversationsRepository: IConversationsRepository,
}) {
this.conversationsRepository = conversationsRepository;
}
async *execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown> {
// fetch conversation
const conversation = await this.conversationsRepository.getConversation(data.conversationId);
if (!conversation) {
throw new NotFoundError('Conversation not found');
}
// extract projectid from conversation
const { id: conversationId, projectId } = conversation;
// check query limit for project
if (!await check_query_limit(projectId)) {
throw new QueryLimitError('Query limit exceeded');
}
// if caller is a user, ensure they are a member of project
if (data.caller === "user") {
if (!data.userId) {
throw new BadRequestError('User ID is required');
}
const membership = await projectMembersCollection.findOne({
projectId,
userId: data.userId,
});
if (!membership) {
throw new NotAuthorizedError('User not a member of project');
}
} else {
if (!data.apiKey) {
throw new BadRequestError('API key is required');
}
// check if api key is valid
// while also updating last used timestamp
const result = await apiKeysCollection.findOneAndUpdate(
{
projectId,
key: data.apiKey,
},
{ $set: { lastUsedAt: new Date().toISOString() } }
);
if (!result) {
throw new NotAuthorizedError('Invalid API key');
}
}
// Check billing auth
if (USE_BILLING) {
// get billing customer id for project
const customerId = await getCustomerIdForProject(projectId);
const agentModels = conversation.workflow.agents.reduce((acc, agent) => {
acc.push(agent.model);
return acc;
}, [] as string[]);
const response = await authorize(customerId, {
type: 'agent_response',
data: {
agentModels,
},
});
if (!response.success) {
yield {
type: "error",
error: response.error || 'Billing error',
isBillingError: true,
};
return;
}
}
// set timestamps where missing
data.input.messages.forEach(msg => {
if (!msg.timestamp) {
msg.timestamp = new Date().toISOString();
}
});
// fetch previous conversation turns and pull message history
const previousMessages = conversation.turns?.flatMap(t => [
...t.input.messages,
...t.output,
]);
const inputMessages = [
...previousMessages || [],
...data.input.messages,
]
// override mock tools if requested
if (data.input.mockTools) {
conversation.workflow.mockTools = data.input.mockTools;
}
// call agents runtime and handle generated messages
const outputMessages: z.infer<typeof Message>[] = [];
for await (const event of streamResponse(projectId, conversation.workflow, inputMessages)) {
// handle msg events
if ("role" in event) {
// collect generated message
const msg = {
...event,
timestamp: new Date().toISOString(),
};
outputMessages.push(msg);
// yield event
yield {
type: "message",
data: msg,
};
} else {
// save turn data
const turn = await this.conversationsRepository.addTurn(data.conversationId, {
trigger: data.trigger,
input: data.input,
output: outputMessages,
});
// yield event
yield {
type: "done",
turn,
conversationId,
}
}
}
}
}

View file

@ -0,0 +1,29 @@
export class BillingError extends Error {
constructor(message?: string, options?: ErrorOptions) {
super(message, options);
}
}
export class QueryLimitError extends Error {
constructor(message?: string, options?: ErrorOptions) {
super(message, options);
}
}
export class BadRequestError extends Error {
constructor(message?: string, options?: ErrorOptions) {
super(message, options);
}
}
export class NotFoundError extends Error {
constructor(message?: string, options?: ErrorOptions) {
super(message, options);
}
}
export class NotAuthorizedError extends Error {
constructor(message?: string, options?: ErrorOptions) {
super(message, options);
}
}

View file

@ -0,0 +1,13 @@
import { z } from "zod";
import { Turn } from "./turn";
import { Workflow } from "@/app/lib/types/workflow_types";
export const Conversation = z.object({
id: z.string(),
projectId: z.string(),
workflow: Workflow,
isLiveWorkflow: z.boolean(),
turns: z.array(Turn).optional(),
createdAt: z.string().datetime(),
updatedAt: z.string().datetime().optional(),
});

View file

@ -0,0 +1,41 @@
import { Message } from "@/app/lib/types/types";
import { z } from "zod";
export const Turn = z.object({
id: z.string(),
trigger: z.enum([
"chat",
"api",
]),
input: z.object({
messages: z.array(Message),
mockTools: z.record(z.string(), z.string()).nullable().optional(),
}),
output: z.array(Message),
error: z.string().optional(),
isBillingError: z.boolean().optional(),
createdAt: z.string().datetime(),
updatedAt: z.string().datetime().optional(),
});
export const CachedTurnRequest = z.object({
conversationId: z.string(),
input: Turn.shape.input,
});
export const TurnEvent = z.discriminatedUnion("type", [
z.object({
type: z.literal("message"),
data: Message,
}),
z.object({
type: z.literal("error"),
error: z.string(),
isBillingError: z.boolean().optional(),
}),
z.object({
type: z.literal("done"),
conversationId: z.string(),
turn: Turn,
}),
]);

View file

@ -0,0 +1,76 @@
import { z } from "zod";
import { db } from "@/app/lib/mongodb";
import { ObjectId } from "mongodb";
import { AddTurnData, CreateConversationData, IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface";
import { Conversation } from "@/src/entities/models/conversation";
import { nanoid } from "nanoid";
import { Turn } from "@/src/entities/models/turn";
const DocSchema = Conversation
.omit({
id: true,
});
export class MongoDBConversationsRepository implements IConversationsRepository {
private readonly collection = db.collection<z.infer<typeof DocSchema>>("conversations");
async createConversation(data: z.infer<typeof CreateConversationData>): Promise<z.infer<typeof Conversation>> {
const now = new Date();
const _id = new ObjectId();
const doc = {
...data,
createdAt: now.toISOString(),
}
await this.collection.insertOne({
...doc,
_id,
});
return {
...data,
...doc,
id: _id.toString(),
};
}
async getConversation(id: string): Promise<z.infer<typeof Conversation> | null> {
const result = await this.collection.findOne({
_id: new ObjectId(id),
});
if (!result) {
return null;
}
const { _id, ...rest } = result;
return {
...rest,
id,
};
}
async addTurn(conversationId: string, data: z.infer<typeof AddTurnData>): Promise<z.infer<typeof Turn>> {
// create turn object from data
const turn: z.infer<typeof Turn> = {
...data,
id: nanoid(),
createdAt: new Date().toISOString(),
};
await this.collection.updateOne({
_id: new ObjectId(conversationId),
}, {
$push: {
turns: turn,
},
$set: {
updatedAt: new Date().toISOString(),
},
});
return turn;
}
}

View file

@ -0,0 +1,20 @@
import { ICacheService } from "@/src/application/services/cache.service.interface";
import { redisClient } from "@/app/lib/redis";
export class RedisCacheService implements ICacheService {
async get(key: string): Promise<string | null> {
return await redisClient.get(key);
}
async set(key: string, value: string, ttl?: number): Promise<void> {
if (ttl) {
await redisClient.set(key, value, 'EX', ttl);
} else {
await redisClient.set(key, value);
}
}
async delete(key: string): Promise<boolean> {
return await redisClient.del(key) > 0;
}
}

View file

@ -0,0 +1,38 @@
import { Turn } from "@/src/entities/models/turn";
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { ICreateCachedTurnUseCase } from "@/src/application/use-cases/conversations/create-cached-turn.use-case";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
conversationId: z.string(),
input: Turn.shape.input,
});
export interface ICreateCachedTurnController {
execute(request: z.infer<typeof inputSchema>): Promise<{ key: string }>;
}
export class CreateCachedTurnController implements ICreateCachedTurnController {
private readonly createCachedTurnUseCase: ICreateCachedTurnUseCase;
constructor({
createCachedTurnUseCase,
}: {
createCachedTurnUseCase: ICreateCachedTurnUseCase,
}) {
this.createCachedTurnUseCase = createCachedTurnUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<{ key: string }> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
return await this.createCachedTurnUseCase.execute(result.data);
}
}

View file

@ -0,0 +1,46 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { ICreateConversationUseCase } from "@/src/application/use-cases/conversations/create-conversation.use-case";
import { Conversation } from "@/src/entities/models/conversation";
import { Workflow } from "@/app/lib/types/workflow_types";
const inputSchema = z.object({
userId: z.string(),
projectId: z.string(),
workflow: Workflow,
isLiveWorkflow: z.boolean(),
});
export interface ICreatePlaygroundConversationController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>>;
}
export class CreatePlaygroundConversationController implements ICreatePlaygroundConversationController {
private readonly createConversationUseCase: ICreateConversationUseCase;
constructor({
createConversationUseCase,
}: {
createConversationUseCase: ICreateConversationUseCase,
}) {
this.createConversationUseCase = createConversationUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { userId, projectId, workflow, isLiveWorkflow } = result.data;
// execute use case
return await this.createConversationUseCase.execute({
caller: "user",
userId,
projectId,
workflow,
isLiveWorkflow,
});
}
}

View file

@ -0,0 +1,55 @@
import { TurnEvent } from "@/src/entities/models/turn";
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IRunConversationTurnUseCase } from "@/src/application/use-cases/conversations/run-conversation-turn.use-case";
import { IFetchCachedTurnUseCase } from "@/src/application/use-cases/conversations/fetch-cached-turn.use-case";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
cachedTurnKey: z.string(),
});
export interface IRunCachedTurnController {
execute(request: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown>;
}
export class RunCachedTurnController implements IRunCachedTurnController {
private readonly fetchCachedTurnUseCase: IFetchCachedTurnUseCase;
private readonly runConversationTurnUseCase: IRunConversationTurnUseCase;
constructor({
fetchCachedTurnUseCase,
runConversationTurnUseCase,
}: {
fetchCachedTurnUseCase: IFetchCachedTurnUseCase,
runConversationTurnUseCase: IRunConversationTurnUseCase,
}) {
this.fetchCachedTurnUseCase = fetchCachedTurnUseCase;
this.runConversationTurnUseCase = runConversationTurnUseCase;
}
async *execute(request: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
// fetch the turn
const cachedTurn = await this.fetchCachedTurnUseCase.execute({
...result.data,
key: result.data.cachedTurnKey,
});
// run the turn
yield *this.runConversationTurnUseCase.execute({
caller: result.data.caller,
userId: result.data.userId,
conversationId: cachedTurn.conversationId,
trigger: result.data.caller === "user" ? "chat" : "api",
input: cachedTurn.input,
});
}
}

View file

@ -0,0 +1,93 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { ICreateConversationUseCase } from "@/src/application/use-cases/conversations/create-conversation.use-case";
import { Turn, TurnEvent } from "@/src/entities/models/turn";
import { IRunConversationTurnUseCase } from "@/src/application/use-cases/conversations/run-conversation-turn.use-case";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
conversationId: z.string().optional(),
input: Turn.shape.input,
stream: z.boolean(),
});
type outputSchema = {
conversationId: string;
} & ({
turn: z.infer<typeof Turn>;
} | {
stream: AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown>;
});
export interface IRunTurnController {
execute(request: z.infer<typeof inputSchema>): Promise<outputSchema>;
}
export class RunTurnController implements IRunTurnController {
private readonly createConversationUseCase: ICreateConversationUseCase;
private readonly runConversationTurnUseCase: IRunConversationTurnUseCase;
constructor({
createConversationUseCase,
runConversationTurnUseCase,
}: {
createConversationUseCase: ICreateConversationUseCase,
runConversationTurnUseCase: IRunConversationTurnUseCase,
}) {
this.createConversationUseCase = createConversationUseCase;
this.runConversationTurnUseCase = runConversationTurnUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<outputSchema> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, input } = result.data;
let conversationId = result.data.conversationId;
// if conversationId is not provided, create conversation
if (!conversationId) {
const conversation = await this.createConversationUseCase.execute({
caller,
userId,
apiKey,
projectId,
});
conversationId = conversation.id;
}
// setup stream
const stream = this.runConversationTurnUseCase.execute({
caller,
userId,
apiKey,
conversationId,
trigger: caller === "user" ? "chat" : "api",
input,
});
// if streaming output request, return stream
if (result.data.stream) {
return {
conversationId,
stream,
};
}
// otherwise, return turn data
for await (const event of stream) {
if (event.type === "done") {
return {
conversationId,
turn: event.turn,
};
}
}
throw new Error('No turn data found');
}
}