Merge pull request #25 from rowboatlabs/rag

rag v2
This commit is contained in:
Ramnique Singh 2025-02-09 22:47:02 +05:30 committed by GitHub
commit e6ae7c965f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 4556 additions and 2372 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,499 @@
'use server';
import { SimulationData, EmbeddingDoc, GetInformationToolResult, AgenticAPIChatRequest, convertFromAgenticAPIChatMessages, WebpageCrawlResponse, Workflow, WorkflowAgent, CopilotAPIRequest, CopilotAPIResponse, CopilotMessage, CopilotWorkflow, convertToCopilotWorkflow, convertToCopilotApiMessage, convertToCopilotMessage, CopilotAssistantMessage, CopilotChatContext, convertToCopilotApiChatContext, WorkflowTool, WorkflowPrompt, EmbeddingRecord } from "../lib/types";
import { generateObject, generateText, embed } from "ai";
import { dataSourceDocsCollection, dataSourcesCollection, embeddingsCollection, webpagesCollection } from "@/app/lib/mongodb";
import { z } from 'zod';
import { openai } from "@ai-sdk/openai";
import FirecrawlApp, { ScrapeResponse } from '@mendable/firecrawl-js';
import { embeddingModel } from "../lib/embedding";
import { apiV1 } from "rowboat-shared";
import { zodToJsonSchema } from 'zod-to-json-schema';
import { Claims, getSession } from "@auth0/nextjs-auth0";
import { callClientToolWebhook, getAgenticApiResponse } from "../lib/utils";
import { assert } from "node:console";
import { check_query_limit } from "../lib/rate_limiting";
import { QueryLimitError } from "../lib/client_utils";
import { projectAuthCheck } from "./project_actions";
import { qdrantClient } from "../lib/qdrant";
import { ObjectId } from "mongodb";
const crawler = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY || '' });
export async function authCheck(): Promise<Claims> {
const { user } = await getSession() || {};
if (!user) {
throw new Error('User not authenticated');
}
return user;
}
export async function scrapeWebpage(url: string): Promise<z.infer<typeof WebpageCrawlResponse>> {
const page = await webpagesCollection.findOne({
"_id": url,
lastUpdatedAt: {
'$gte': new Date(Date.now() - 1000 * 60 * 60 * 24).toISOString(), // 24 hours
},
});
if (page) {
// console.log("found webpage in db", url);
return {
title: page.title,
content: page.contentSimple,
};
}
// otherwise use firecrawl
const scrapeResult = await crawler.scrapeUrl(
url,
{
formats: ['markdown'],
onlyMainContent: true
}
) as ScrapeResponse;
// save the webpage using upsert
await webpagesCollection.updateOne(
{ _id: url },
{
$set: {
title: scrapeResult.metadata?.title || '',
contentSimple: scrapeResult.markdown || '',
lastUpdatedAt: (new Date()).toISOString(),
}
},
{ upsert: true }
);
// console.log("crawled webpage", url);
return {
title: scrapeResult.metadata?.title || '',
content: scrapeResult.markdown || '',
};
}
export async function getAssistantResponse(
projectId: string,
request: z.infer<typeof AgenticAPIChatRequest>,
): Promise<{
messages: z.infer<typeof apiV1.ChatMessage>[],
state: unknown,
rawRequest: unknown,
rawResponse: unknown,
}> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}
const response = await getAgenticApiResponse(request);
return {
messages: convertFromAgenticAPIChatMessages(response.messages),
state: response.state,
rawRequest: request,
rawResponse: response.rawAPIResponse,
};
}
export async function getCopilotResponse(
projectId: string,
messages: z.infer<typeof CopilotMessage>[],
current_workflow_config: z.infer<typeof Workflow>,
context: z.infer<typeof CopilotChatContext> | null,
): Promise<{
message: z.infer<typeof CopilotAssistantMessage>,
rawRequest: unknown,
rawResponse: unknown,
}> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}
// prepare request
const request: z.infer<typeof CopilotAPIRequest> = {
messages: messages.map(convertToCopilotApiMessage),
workflow_schema: JSON.stringify(zodToJsonSchema(CopilotWorkflow)),
current_workflow_config: JSON.stringify(convertToCopilotWorkflow(current_workflow_config)),
context: context ? convertToCopilotApiChatContext(context) : null,
};
console.log(`copilot request`, JSON.stringify(request, null, 2));
// call copilot api
const response = await fetch(process.env.COPILOT_API_URL + '/chat', {
method: 'POST',
body: JSON.stringify(request),
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.COPILOT_API_KEY || 'test'}`,
},
});
if (!response.ok) {
console.error('Failed to call copilot api', response);
throw new Error(`Failed to call copilot api: ${response.statusText}`);
}
// parse and return response
const json: z.infer<typeof CopilotAPIResponse> = await response.json();
console.log(`copilot response`, JSON.stringify(json, null, 2));
if ('error' in json) {
throw new Error(`Failed to call copilot api: ${json.error}`);
}
// remove leading ```json and trailing ```
const msg = convertToCopilotMessage({
role: 'assistant',
content: json.response.replace(/^```json\n/, '').replace(/\n```$/, ''),
});
// validate response schema
assert(msg.role === 'assistant');
if (msg.role === 'assistant') {
for (const part of msg.content.response) {
if (part.type === 'action') {
switch (part.content.config_type) {
case 'tool': {
const test = {
name: 'test',
description: 'test',
parameters: {
type: 'object',
properties: {},
required: [],
},
} as z.infer<typeof WorkflowTool>;
// iterate over each field in part.content.config_changes
// and test if the final object schema is valid
// if not, discard that field
for (const [key, value] of Object.entries(part.content.config_changes)) {
const result = WorkflowTool.safeParse({
...test,
[key]: value,
});
if (!result.success) {
console.log(`discarding field ${key} from ${part.content.config_type}: ${part.content.name}`, result.error.message);
delete part.content.config_changes[key];
}
}
break;
}
case 'agent': {
const test = {
name: 'test',
description: 'test',
type: 'conversation',
instructions: 'test',
prompts: [],
tools: [],
model: 'gpt-4o',
ragReturnType: 'chunks',
ragK: 10,
connectedAgents: [],
controlType: 'retain',
} as z.infer<typeof WorkflowAgent>;
// iterate over each field in part.content.config_changes
// and test if the final object schema is valid
// if not, discard that field
for (const [key, value] of Object.entries(part.content.config_changes)) {
const result = WorkflowAgent.safeParse({
...test,
[key]: value,
});
if (!result.success) {
console.log(`discarding field ${key} from ${part.content.config_type}: ${part.content.name}`, result.error.message);
delete part.content.config_changes[key];
}
}
break;
}
case 'prompt': {
const test = {
name: 'test',
type: 'base_prompt',
prompt: "test",
} as z.infer<typeof WorkflowPrompt>;
// iterate over each field in part.content.config_changes
// and test if the final object schema is valid
// if not, discard that field
for (const [key, value] of Object.entries(part.content.config_changes)) {
const result = WorkflowPrompt.safeParse({
...test,
[key]: value,
});
if (!result.success) {
console.log(`discarding field ${key} from ${part.content.config_type}: ${part.content.name}`, result.error.message);
delete part.content.config_changes[key];
}
}
break;
}
default: {
part.content.error = `Unknown config type: ${part.content.config_type}`;
break;
}
}
}
}
}
return {
message: msg as z.infer<typeof CopilotAssistantMessage>,
rawRequest: request,
rawResponse: json,
};
}
export async function suggestToolResponse(toolId: string, projectId: string, messages: z.infer<typeof apiV1.ChatMessage>[]): Promise<string> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}
const prompt = `
# Your Specific Task:
Here is a chat between a user and a customer support assistant.
The assistant has requested a tool call with ID {{toolID}}.
Your job is to come up with an example of the data that the tool call should return.
The current date is {{date}}.
CONVERSATION:
{{messages}}
`
.replace('{{toolID}}', toolId)
.replace(`{{date}}`, new Date().toISOString())
.replace('{{messages}}', JSON.stringify(messages.map((m) => {
let tool_calls;
if ('tool_calls' in m && m.role == 'assistant') {
tool_calls = m.tool_calls;
}
let { role, content } = m;
return {
role,
content,
tool_calls,
}
})));
// console.log(prompt);
const { object } = await generateObject({
model: openai("gpt-4o"),
prompt: prompt,
schema: z.object({
result: z.any(),
}),
});
return JSON.stringify(object);
}
export async function getInformationTool(
projectId: string,
query: string,
sourceIds: string[],
returnType: z.infer<typeof WorkflowAgent>['ragReturnType'],
k: number,
): Promise<z.infer<typeof GetInformationToolResult>> {
await projectAuthCheck(projectId);
// create embedding for question
const embedResult = await embed({
model: embeddingModel,
value: query,
});
// fetch all data sources for this project
const sources = await dataSourcesCollection.find({
projectId: projectId,
active: true,
}).toArray();
const validSourceIds = sources
.filter(s => sourceIds.includes(s._id.toString())) // id should be in sourceIds
.filter(s => s.active) // should be active
.map(s => s._id.toString());
// if no sources found, return empty response
if (validSourceIds.length === 0) {
return {
results: [],
};
}
// perform qdrant vector search
const qdrantResults = await qdrantClient.query("embeddings", {
query: embedResult.embedding,
filter: {
must: [
{ key: "projectId", match: { value: projectId } },
{ key: "sourceId", match: { any: validSourceIds } },
],
},
limit: k,
with_payload: true,
});
// if return type is chunks, return the chunks
let results = qdrantResults.points.map((point) => {
const { title, name, content, docId, sourceId } = point.payload as z.infer<typeof EmbeddingRecord>['payload'];
return {
title,
name,
content,
docId,
sourceId,
};
});
if (returnType === 'chunks') {
return {
results,
};
}
// otherwise, fetch the doc contents from mongodb
const docs = await dataSourceDocsCollection.find({
_id: { $in: results.map(r => new ObjectId(r.docId)) },
}).toArray();
// map the results to the docs
results = results.map(r => {
const doc = docs.find(d => d._id.toString() === r.docId);
return {
...r,
content: doc?.content || '',
};
});
return {
results,
};
}
export async function simulateUserResponse(
projectId: string,
messages: z.infer<typeof apiV1.ChatMessage>[],
simulationData: z.infer<typeof SimulationData>
): Promise<string> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}
const articlePrompt = `
# Your Specific Task:
## Context:
Here is a help article:
Content:
<START_ARTICLE_CONTENT>
Title: {{title}}
{{content}}
<END_ARTICLE_CONTENT>
## Task definition:
Pretend to be a user reaching out to customer support. Chat with the
customer support assistant, assuming your issue or query is from this article.
Ask follow-up questions and make it real-world like. Don't do dummy
conversations. Your conversation should be a maximum of 5 user turns.
As output, simply provide your (user) turn of conversation.
After you are done with the chat, keep replying with a single word EXIT
in all capitals.
`;
const scenarioPrompt = `
# Your Specific Task:
## Context:
Here is a scenario:
Scenario:
<START_SCENARIO>
{{scenario}}
<END_SCENARIO>
## Task definition:
Pretend to be a user reaching out to customer support. Chat with the
customer support assistant, assuming your issue is based on this scenario.
Ask follow-up questions and make it real-world like. Don't do dummy
conversations. Your conversation should be a maximum of 5 user turns.
As output, simply provide your (user) turn of conversation.
After you are done with the chat, keep replying with a single word EXIT
in all capitals.
`;
const previousChatPrompt = `
# Your Specific Task:
## Context:
Here is a chat between a user and a customer support assistant:
Chat:
<PREVIOUS_CHAT>
{{messages}}
<END_PREVIOUS_CHAT>
## Task definition:
Pretend to be a user reaching out to customer support. Chat with the
customer support assistant, assuming your issue based on this previous chat.
Ask follow-up questions and make it real-world like. Don't do dummy
conversations. Your conversation should be a maximum of 5 user turns.
As output, simply provide your (user) turn of conversation.
After you are done with the chat, keep replying with a single word EXIT
in all capitals.
`;
await projectAuthCheck(projectId);
// flip message assistant / user message
// roles from chat messages
// use only text response messages
const flippedMessages: { role: 'user' | 'assistant', content: string }[] = messages
.filter(m => m.role == 'assistant' || m.role == 'user')
.map(m => ({
role: m.role == 'assistant' ? 'user' : 'assistant',
content: m.content || '',
}));
// simulate user call
let prompt;
if ('articleUrl' in simulationData) {
prompt = articlePrompt
.replace('{{title}}', simulationData.articleTitle || '')
.replace('{{content}}', simulationData.articleContent || '');
}
if ('scenario' in simulationData) {
prompt = scenarioPrompt
.replace('{{scenario}}', simulationData.scenario);
}
if ('chatMessages' in simulationData) {
prompt = previousChatPrompt
.replace('{{messages}}', simulationData.chatMessages);
}
const { text } = await generateText({
model: openai("gpt-4o"),
system: prompt || '',
messages: flippedMessages,
});
return text.replace(/\. EXIT$/, '.');
}
export async function executeClientTool(
toolCall: z.infer<typeof apiV1.AssistantMessageWithToolCalls>['tool_calls'][number],
messages: z.infer<typeof apiV1.ChatMessage>[],
projectId: string,
): Promise<unknown> {
await projectAuthCheck(projectId);
const result = await callClientToolWebhook(toolCall, messages, projectId);
return result;
}

View file

@ -0,0 +1,342 @@
'use server';
import { redirect } from "next/navigation";
import { ObjectId, WithId } from "mongodb";
import { dataSourcesCollection, dataSourceDocsCollection } from "@/app/lib/mongodb";
import { z } from 'zod';
import { GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { projectAuthCheck } from "./project_actions";
import { DataSource, DataSourceDoc, WithStringId } from "../lib/types";
import { uploadsS3Client } from "../lib/uploads_s3_client";
export async function getDataSource(projectId: string, sourceId: string): Promise<WithStringId<z.infer<typeof DataSource>>> {
await projectAuthCheck(projectId);
const source = await dataSourcesCollection.findOne({
_id: new ObjectId(sourceId),
projectId,
});
if (!source) {
throw new Error('Invalid data source');
}
const { _id, ...rest } = source;
return {
...rest,
_id: _id.toString(),
};
}
export async function listDataSources(projectId: string): Promise<WithStringId<z.infer<typeof DataSource>>[]> {
await projectAuthCheck(projectId);
const sources = await dataSourcesCollection.find({
projectId: projectId,
status: { $ne: 'deleted' },
}).toArray();
return sources.map((s) => ({
...s,
_id: s._id.toString(),
}));
}
export async function createDataSource({
projectId,
name,
data,
status = 'pending',
}: {
projectId: string,
name: string,
data: z.infer<typeof DataSource>['data'],
status?: 'pending' | 'ready',
}): Promise<WithStringId<z.infer<typeof DataSource>>> {
await projectAuthCheck(projectId);
const source: z.infer<typeof DataSource> = {
projectId: projectId,
active: true,
name: name,
createdAt: (new Date()).toISOString(),
attempts: 0,
status: status,
version: 1,
data,
};
await dataSourcesCollection.insertOne(source);
const { _id, ...rest } = source as WithId<z.infer<typeof DataSource>>;
return {
...rest,
_id: _id.toString(),
};
}
export async function recrawlWebDataSource(projectId: string, sourceId: string) {
await projectAuthCheck(projectId);
const source = await getDataSource(projectId, sourceId);
if (source.data.type !== 'urls') {
throw new Error('Invalid data source type');
}
// mark all files as queued
await dataSourceDocsCollection.updateMany({
sourceId: sourceId,
}, {
$set: {
status: 'pending',
lastUpdatedAt: (new Date()).toISOString(),
attempts: 0,
}
});
// mark data source as pending
await dataSourcesCollection.updateOne({
_id: new ObjectId(sourceId),
}, {
$set: {
status: 'pending',
lastUpdatedAt: (new Date()).toISOString(),
attempts: 0,
},
$inc: {
version: 1,
},
});
}
export async function deleteDataSource(projectId: string, sourceId: string) {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
// mark data source as deleted
await dataSourcesCollection.updateOne({
_id: new ObjectId(sourceId),
}, {
$set: {
status: 'deleted',
lastUpdatedAt: (new Date()).toISOString(),
attempts: 0,
},
$inc: {
version: 1,
},
});
redirect(`/projects/${projectId}/sources`);
}
export async function toggleDataSource(projectId: string, sourceId: string, active: boolean) {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
await dataSourcesCollection.updateOne({
"_id": new ObjectId(sourceId),
"projectId": projectId,
}, {
$set: {
"active": active,
}
});
}
export async function addDocsToDataSource({
projectId,
sourceId,
docData,
}: {
projectId: string,
sourceId: string,
docData: {
_id?: string,
name: string,
data: z.infer<typeof DataSourceDoc>['data']
}[]
}): Promise<void> {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
await dataSourceDocsCollection.insertMany(docData.map(doc => {
const record: z.infer<typeof DataSourceDoc> = {
sourceId,
name: doc.name,
status: 'pending',
createdAt: new Date().toISOString(),
data: doc.data,
version: 1,
};
if (!doc._id) {
return record;
}
const recordWithId = record as WithId<z.infer<typeof DataSourceDoc>>;
recordWithId._id = new ObjectId(doc._id);
return recordWithId;
}));
await dataSourcesCollection.updateOne(
{ _id: new ObjectId(sourceId) },
{
$set: {
status: 'pending',
attempts: 0,
lastUpdatedAt: new Date().toISOString(),
},
$inc: {
version: 1,
},
}
);
}
export async function listDocsInDataSource({
projectId,
sourceId,
page = 1,
limit = 10,
}: {
projectId: string,
sourceId: string,
page?: number,
limit?: number,
}): Promise<{
files: WithStringId<z.infer<typeof DataSourceDoc>>[],
total: number
}> {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
// Get total count
const total = await dataSourceDocsCollection.countDocuments({
sourceId,
status: { $ne: 'deleted' },
});
// Fetch docs with pagination
const docs = await dataSourceDocsCollection.find({
sourceId,
status: { $ne: 'deleted' },
})
.skip((page - 1) * limit)
.limit(limit)
.toArray();
return {
files: docs.map(f => ({ ...f, _id: f._id.toString() })),
total
};
}
export async function deleteDocsFromDataSource({
projectId,
sourceId,
docIds,
}: {
projectId: string,
sourceId: string,
docIds: string[],
}): Promise<void> {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
// mark for deletion
await dataSourceDocsCollection.updateMany(
{
sourceId,
_id: {
$in: docIds.map(id => new ObjectId(id))
}
},
{
$set: {
status: "deleted",
lastUpdatedAt: new Date().toISOString(),
},
$inc: {
version: 1,
},
}
);
// mark data source as pending
await dataSourcesCollection.updateOne({
_id: new ObjectId(sourceId),
}, {
$set: {
status: 'pending',
attempts: 0,
lastUpdatedAt: new Date().toISOString(),
},
$inc: {
version: 1,
},
});
}
export async function getDownloadUrlForFile(
projectId: string,
sourceId: string,
fileId: string
): Promise<string> {
await projectAuthCheck(projectId);
await getDataSource(projectId, sourceId);
// fetch s3 key for file
const file = await dataSourceDocsCollection.findOne({
sourceId,
_id: new ObjectId(fileId),
'data.type': 'file',
});
if (!file) {
throw new Error('File not found');
}
if (file.data.type !== 'file') {
throw new Error('File not found');
}
const command = new GetObjectCommand({
Bucket: process.env.UPLOADS_S3_BUCKET,
Key: file.data.s3Key,
});
return await getSignedUrl(uploadsS3Client, command, { expiresIn: 60 }); // URL valid for 1 minute
}
export async function getUploadUrlsForFilesDataSource(
projectId: string,
sourceId: string,
files: { name: string; type: string; size: number }[]
): Promise<{
fileId: string,
presignedUrl: string,
s3Key: string,
}[]> {
await projectAuthCheck(projectId);
const source = await getDataSource(projectId, sourceId);
if (source.data.type !== 'files') {
throw new Error('Invalid files data source');
}
const urls: {
fileId: string,
presignedUrl: string,
s3Key: string,
}[] = [];
for (const file of files) {
const fileId = new ObjectId().toString();
const projectIdPrefix = projectId.slice(0, 2); // 2 characters from the start of the projectId
const s3Key = `datasources/files/${projectIdPrefix}/${projectId}/${sourceId}/${fileId}/${file.name}`;
// Generate presigned URL
const command = new PutObjectCommand({
Bucket: process.env.UPLOADS_S3_BUCKET,
Key: s3Key,
ContentType: file.type,
});
const presignedUrl = await getSignedUrl(uploadsS3Client, command, { expiresIn: 10 * 60 }); // valid for 10 minutes
urls.push({
fileId,
presignedUrl,
s3Key,
});
}
return urls;
}

View file

@ -0,0 +1,209 @@
'use server';
import { redirect } from "next/navigation";
import { ObjectId } from "mongodb";
import { dataSourcesCollection, embeddingsCollection, projectsCollection, agentWorkflowsCollection, scenariosCollection, projectMembersCollection, apiKeysCollection, dataSourceDocsCollection } from "@/app/lib/mongodb";
import { z } from 'zod';
import crypto from 'crypto';
import { revalidatePath } from "next/cache";
import { templates } from "../lib/project_templates";
import { authCheck } from "./actions";
import { ApiKey, WithStringId, Project } from "../lib/types";
export async function projectAuthCheck(projectId: string) {
const user = await authCheck();
const membership = await projectMembersCollection.findOne({
projectId,
userId: user.sub,
});
if (!membership) {
throw new Error('User not a member of project');
}
}
export async function createProject(formData: FormData) {
const user = await authCheck();
// ensure that projects created by this user is less than
// configured limit
const projectsLimit = Number(process.env.MAX_PROJECTS_PER_USER) || 0;
if (projectsLimit > 0) {
const count = await projectsCollection.countDocuments({
createdByUserId: user.sub,
});
if (count >= projectsLimit) {
throw new Error('You have reached your project limit. Please upgrade your plan.');
}
}
const name = formData.get('name') as string;
const templateKey = formData.get('template') as string;
const projectId = crypto.randomUUID();
const chatClientId = crypto.randomBytes(16).toString('base64url');
const secret = crypto.randomBytes(32).toString('hex');
// create project
await projectsCollection.insertOne({
_id: projectId,
name: name,
createdAt: (new Date()).toISOString(),
lastUpdatedAt: (new Date()).toISOString(),
createdByUserId: user.sub,
chatClientId,
secret,
nextWorkflowNumber: 1,
});
// add first workflow version
const { agents, prompts, tools, startAgent } = templates[templateKey];
await agentWorkflowsCollection.insertOne({
_id: new ObjectId(),
projectId,
agents,
prompts,
tools,
startAgent,
createdAt: (new Date()).toISOString(),
lastUpdatedAt: (new Date()).toISOString(),
name: `Version 1`,
});
// add user to project
await projectMembersCollection.insertOne({
userId: user.sub,
projectId: projectId,
createdAt: (new Date()).toISOString(),
lastUpdatedAt: (new Date()).toISOString(),
});
redirect(`/projects/${projectId}/workflow`);
}
export async function getProjectConfig(projectId: string): Promise<WithStringId<z.infer<typeof Project>>> {
await projectAuthCheck(projectId);
const project = await projectsCollection.findOne({
_id: projectId,
});
if (!project) {
throw new Error('Project config not found');
}
return project;
}
export async function listProjects(): Promise<z.infer<typeof Project>[]> {
const user = await authCheck();
const memberships = await projectMembersCollection.find({
userId: user.sub,
}).toArray();
const projectIds = memberships.map((m) => m.projectId);
const projects = await projectsCollection.find({
_id: { $in: projectIds },
}).toArray();
return projects;
}
export async function rotateSecret(projectId: string): Promise<string> {
await projectAuthCheck(projectId);
const secret = crypto.randomBytes(32).toString('hex');
await projectsCollection.updateOne(
{ _id: projectId },
{ $set: { secret } }
);
return secret;
}
export async function updateWebhookUrl(projectId: string, url: string) {
await projectAuthCheck(projectId);
await projectsCollection.updateOne(
{ _id: projectId },
{ $set: { webhookUrl: url } }
);
}
export async function createApiKey(projectId: string): Promise<WithStringId<z.infer<typeof ApiKey>>> {
await projectAuthCheck(projectId);
// count existing keys
const count = await apiKeysCollection.countDocuments({ projectId });
if (count >= 3) {
throw new Error('Maximum number of API keys reached');
}
// create key
const key = crypto.randomBytes(32).toString('hex');
const doc: z.infer<typeof ApiKey> = {
projectId,
key,
createdAt: new Date().toISOString(),
};
await apiKeysCollection.insertOne(doc);
const { _id, ...rest } = doc as WithStringId<z.infer<typeof ApiKey>>;
return { ...rest, _id: _id.toString() };
}
export async function deleteApiKey(projectId: string, id: string) {
await projectAuthCheck(projectId);
await apiKeysCollection.deleteOne({ projectId, _id: new ObjectId(id) });
}
export async function listApiKeys(projectId: string): Promise<WithStringId<z.infer<typeof ApiKey>>[]> {
await projectAuthCheck(projectId);
const keys = await apiKeysCollection.find({ projectId }).toArray();
return keys.map(k => ({ ...k, _id: k._id.toString() }));
}
export async function updateProjectName(projectId: string, name: string) {
await projectAuthCheck(projectId);
await projectsCollection.updateOne({ _id: projectId }, { $set: { name } });
revalidatePath(`/projects/${projectId}`, 'layout');
}
export async function deleteProject(projectId: string) {
await projectAuthCheck(projectId);
// delete api keys
await apiKeysCollection.deleteMany({
projectId,
});
// delete embeddings
const sources = await dataSourcesCollection.find({
projectId,
}, {
projection: {
_id: true,
}
}).toArray();
const ids = sources.map(s => s._id);
// delete data sources
await embeddingsCollection.deleteMany({
sourceId: { $in: ids.map(i => i.toString()) },
});
await dataSourcesCollection.deleteMany({
_id: {
$in: ids,
}
});
// delete project members
await projectMembersCollection.deleteMany({
projectId,
});
// delete workflows
await agentWorkflowsCollection.deleteMany({
projectId,
});
// delete scenarios
await scenariosCollection.deleteMany({
projectId,
});
// delete project
await projectsCollection.deleteOne({
_id: projectId,
});
redirect('/projects');
}

View file

@ -0,0 +1,51 @@
'use server';
import { ObjectId } from "mongodb";
import { scenariosCollection } from "@/app/lib/mongodb";
import { z } from 'zod';
import { Scenario, WithStringId } from "../lib/types";
import { projectAuthCheck } from "./project_actions";
export async function getScenarios(projectId: string): Promise<WithStringId<z.infer<typeof Scenario>>[]> {
await projectAuthCheck(projectId);
const scenarios = await scenariosCollection.find({ projectId }).toArray();
return scenarios.map(s => ({ ...s, _id: s._id.toString() }));
}
export async function createScenario(projectId: string, name: string, description: string): Promise<string> {
await projectAuthCheck(projectId);
const now = new Date().toISOString();
const result = await scenariosCollection.insertOne({
projectId,
name,
description,
lastUpdatedAt: now,
createdAt: now,
});
return result.insertedId.toString();
}
export async function updateScenario(projectId: string, scenarioId: string, name: string, description: string) {
await projectAuthCheck(projectId);
await scenariosCollection.updateOne({
"_id": new ObjectId(scenarioId),
"projectId": projectId,
}, {
$set: {
name,
description,
lastUpdatedAt: new Date().toISOString(),
}
});
}
export async function deleteScenario(projectId: string, scenarioId: string) {
await projectAuthCheck(projectId);
await scenariosCollection.deleteOne({
"_id": new ObjectId(scenarioId),
"projectId": projectId,
});
}

View file

@ -0,0 +1,240 @@
'use server';
import { ObjectId, WithId } from "mongodb";
import { projectsCollection, agentWorkflowsCollection } from "@/app/lib/mongodb";
import { z } from 'zod';
import { templates } from "../lib/project_templates";
import { projectAuthCheck } from "./project_actions";
import { Workflow, WithStringId } from "../lib/types";
export async function createWorkflow(projectId: string): Promise<WithStringId<z.infer<typeof Workflow>>> {
await projectAuthCheck(projectId);
// get the next workflow number
const doc = await projectsCollection.findOneAndUpdate({
_id: projectId,
}, {
$inc: {
nextWorkflowNumber: 1,
},
}, {
returnDocument: 'after'
});
if (!doc) {
throw new Error('Project not found');
}
const nextWorkflowNumber = doc.nextWorkflowNumber;
// create the workflow
const { agents, prompts, tools, startAgent } = templates['default'];
const workflow = {
agents,
prompts,
tools,
startAgent,
projectId,
createdAt: new Date().toISOString(),
lastUpdatedAt: new Date().toISOString(),
name: `Version ${nextWorkflowNumber}`,
};
const { insertedId } = await agentWorkflowsCollection.insertOne(workflow);
const { _id, ...rest } = workflow as WithId<z.infer<typeof Workflow>>;
return {
...rest,
_id: insertedId.toString(),
};
}
export async function cloneWorkflow(projectId: string, workflowId: string): Promise<WithStringId<z.infer<typeof Workflow>>> {
await projectAuthCheck(projectId);
const workflow = await agentWorkflowsCollection.findOne({
_id: new ObjectId(workflowId),
projectId,
});
if (!workflow) {
throw new Error('Workflow not found');
}
// create a new workflow with the same content
const newWorkflow = {
...workflow,
_id: new ObjectId(),
name: `Copy of ${workflow.name || 'Unnamed workflow'}`,
createdAt: new Date().toISOString(),
lastUpdatedAt: new Date().toISOString(),
};
const { insertedId } = await agentWorkflowsCollection.insertOne(newWorkflow);
const { _id, ...rest } = newWorkflow as WithId<z.infer<typeof Workflow>>;
return {
...rest,
_id: insertedId.toString(),
};
}
export async function renameWorkflow(projectId: string, workflowId: string, name: string) {
await projectAuthCheck(projectId);
await agentWorkflowsCollection.updateOne({
_id: new ObjectId(workflowId),
projectId,
}, {
$set: {
name,
lastUpdatedAt: new Date().toISOString(),
},
});
}
export async function saveWorkflow(projectId: string, workflowId: string, workflow: z.infer<typeof Workflow>) {
await projectAuthCheck(projectId);
// check if workflow exists
const existingWorkflow = await agentWorkflowsCollection.findOne({
_id: new ObjectId(workflowId),
projectId,
});
if (!existingWorkflow) {
throw new Error('Workflow not found');
}
// ensure that this is not the published workflow for this project
const publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
if (publishedWorkflowId && publishedWorkflowId === workflowId) {
throw new Error('Cannot save published workflow');
}
// update the workflow, except name and description
const { _id, name, ...rest } = workflow as WithId<z.infer<typeof Workflow>>;
await agentWorkflowsCollection.updateOne({
_id: new ObjectId(workflowId),
}, {
$set: {
...rest,
lastUpdatedAt: new Date().toISOString(),
},
});
}
export async function publishWorkflow(projectId: string, workflowId: string) {
await projectAuthCheck(projectId);
// check if workflow exists
const existingWorkflow = await agentWorkflowsCollection.findOne({
_id: new ObjectId(workflowId),
projectId,
});
if (!existingWorkflow) {
throw new Error('Workflow not found');
}
// publish the workflow
await projectsCollection.updateOne({
"_id": projectId,
}, {
$set: {
publishedWorkflowId: workflowId,
}
});
}
export async function fetchPublishedWorkflowId(projectId: string): Promise<string | null> {
await projectAuthCheck(projectId);
const project = await projectsCollection.findOne({
_id: projectId,
});
return project?.publishedWorkflowId || null;
}
export async function fetchWorkflow(projectId: string, workflowId: string): Promise<WithStringId<z.infer<typeof Workflow>>> {
await projectAuthCheck(projectId);
// fetch workflow
const workflow = await agentWorkflowsCollection.findOne({
_id: new ObjectId(workflowId),
projectId,
});
if (!workflow) {
throw new Error('Workflow not found');
}
const { _id, ...rest } = workflow;
return {
...rest,
_id: _id.toString(),
};
}
export async function listWorkflows(
projectId: string,
page: number = 1,
limit: number = 10
): Promise<{
workflows: (WithStringId<z.infer<typeof Workflow>>)[];
total: number;
publishedWorkflowId: string | null;
}> {
await projectAuthCheck(projectId);
// fetch total count
const total = await agentWorkflowsCollection.countDocuments({ projectId });
// fetch published workflow
let publishedWorkflowId: string | null = null;
let publishedWorkflow: WithId<z.infer<typeof Workflow>> | null = null;
if (page === 1) {
publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
if (publishedWorkflowId) {
publishedWorkflow = await agentWorkflowsCollection.findOne({
_id: new ObjectId(publishedWorkflowId),
projectId,
}, {
projection: {
_id: 1,
name: 1,
description: 1,
createdAt: 1,
lastUpdatedAt: 1,
},
});
}
}
// fetch workflows with pagination
let workflows: WithId<z.infer<typeof Workflow>>[] = await agentWorkflowsCollection.find(
{
projectId,
...(publishedWorkflowId ? {
_id: {
$ne: new ObjectId(publishedWorkflowId)
}
} : {}),
},
{
sort: { lastUpdatedAt: -1 },
projection: {
_id: 1,
name: 1,
description: 1,
createdAt: 1,
lastUpdatedAt: 1,
},
skip: (page - 1) * limit,
limit: limit,
}
).toArray();
workflows = [
...(publishedWorkflow ? [publishedWorkflow] : []),
...workflows,
];
// return workflows
return {
workflows: workflows.map((w) => {
const { _id, ...rest } = w;
return {
...rest,
_id: _id.toString(),
};
}),
total,
publishedWorkflowId,
};
}

View file

@ -1,16 +1,17 @@
import { FileIcon, FilesIcon, GlobeIcon } from "lucide-react";
export function DataSourceIcon({
type = undefined,
size = "sm",
}: {
type?: "crawl" | "urls" | undefined;
type?: "crawl" | "urls" | "files" | undefined;
size?: "sm" | "md";
}) {
const sizeClass = size === "sm" ? "w-4 h-4" : "w-6 h-6";
return <>
{type === undefined && <svg className={sizeClass} aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="none" viewBox="0 0 24 24">
<path stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" strokeWidth="1" d="M19 6c0 1.657-3.134 3-7 3S5 7.657 5 6m14 0c0-1.657-3.134-3-7-3S5 4.343 5 6m14 0v6M5 6v6m0 0c0 1.657 3.134 3 7 3s7-1.343 7-3M5 12v6c0 1.657 3.134 3 7 3s7-1.343 7-3v-6" />
</svg>}
{type == "crawl" && <svg className={`${sizeClass} lucide lucide-globe`} xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="1" strokeLinecap="round" strokeLinejoin="round"><circle cx="12" cy="12" r="10" /><path d="M12 2a14.5 14.5 0 0 0 0 20 14.5 14.5 0 0 0 0-20" /><path d="M2 12h20" /></svg>}
{type == "urls" && <svg className={`${sizeClass} lucide lucide-globe`} xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="1" strokeLinecap="round" strokeLinejoin="round"><circle cx="12" cy="12" r="10" /><path d="M12 2a14.5 14.5 0 0 0 0 20 14.5 14.5 0 0 0 0-20" /><path d="M2 12h20" /></svg>}
{type === undefined && <FileIcon className={sizeClass} />}
{type == "crawl" && <GlobeIcon className={sizeClass} />}
{type == "urls" && <GlobeIcon className={sizeClass} />}
{type == "files" && <FilesIcon className={sizeClass} />}
</>;
}

View file

@ -1,11 +1,12 @@
import { MongoClient } from "mongodb";
import { PlaygroundChat, DataSource, EmbeddingDoc, Project, Webpage, ChatClientId, Workflow, Scenario, ProjectMember, ApiKey } from "./types";
import { PlaygroundChat, DataSource, EmbeddingDoc, Project, Webpage, ChatClientId, Workflow, Scenario, ProjectMember, ApiKey, DataSourceDoc } from "./types";
import { z } from 'zod';
const client = new MongoClient(process.env["MONGODB_CONNECTION_STRING"] || "mongodb://localhost:27017");
export const db = client.db("rowboat");
export const dataSourcesCollection = db.collection<z.infer<typeof DataSource>>("sources");
export const dataSourceDocsCollection = db.collection<z.infer<typeof DataSourceDoc>>("source_docs");
export const embeddingsCollection = db.collection<z.infer<typeof EmbeddingDoc>>("embeddings");
export const projectsCollection = db.collection<z.infer<typeof Project>>("projects");
export const projectMembersCollection = db.collection<z.infer<typeof ProjectMember>>("project_members");

View file

@ -0,0 +1,7 @@
import {QdrantClient} from '@qdrant/js-client-rest';
// TO connect to Qdrant running locally
export const qdrantClient = new QdrantClient({
url: process.env.QDRANT_URL,
...(process.env.QDRANT_API_KEY ? { apiKey: process.env.QDRANT_API_KEY } : {}),
});

View file

@ -54,30 +54,58 @@ export const DataSource = z.object({
name: z.string(),
projectId: z.string(),
active: z.boolean().default(true),
status: z.union([z.literal('new'), z.literal('processing'), z.literal('completed'), z.literal('error')]),
detailedStatus: z.string().optional(),
status: z.union([
z.literal('pending'),
z.literal('ready'),
z.literal('error'),
z.literal('deleted'),
]),
version: z.number(),
error: z.string().optional(),
attempts: z.number().default(0).optional(),
createdAt: z.string().datetime(),
lastUpdatedAt: z.string().datetime().optional(),
attempts: z.number(),
lastAttemptAt: z.string().datetime().optional(),
pendingRefresh: z.boolean().default(false).optional(),
data: z.discriminatedUnion('type', [
z.object({
type: z.literal('crawl'),
startUrl: z.string(),
limit: z.number(),
firecrawlId: z.string().optional(),
oxylabsId: z.string().optional(),
crawledUrls: z.string().optional(),
type: z.literal('urls'),
}),
z.object({
type: z.literal('urls'),
urls: z.array(z.string()),
scrapedUrls: z.string().optional(),
missingUrls: z.string().optional(),
type: z.literal('files'),
}),
]),
});
export const DataSourceDoc = z.object({
sourceId: z.string(),
name: z.string(),
version: z.number(),
status: z.union([
z.literal('pending'),
z.literal('ready'),
z.literal('error'),
z.literal('deleted'),
]),
content: z.string().optional(),
createdAt: z.string().datetime(),
lastUpdatedAt: z.string().datetime().optional(),
error: z.string().optional(),
data: z.discriminatedUnion('type', [
z.object({
type: z.literal('url'),
url: z.string(),
}),
z.object({
type: z.literal('file'),
name: z.string(),
size: z.number(),
mimeType: z.string(),
s3Key: z.string(),
}),
]),
})
export const EmbeddingDoc = z.object({
content: z.string(),
sourceId: z.string(),
@ -118,9 +146,10 @@ export const ApiKey = z.object({
export const GetInformationToolResultItem = z.object({
title: z.string(),
name: z.string(),
content: z.string(),
url: z.string(),
score: z.number().optional(),
docId: z.string(),
sourceId: z.string(),
});
export const GetInformationToolResult = z.object({
@ -152,6 +181,19 @@ export const AgenticAPIChatMessage = z.object({
]).optional(),
});
export const EmbeddingRecord = z.object({
id: z.string().uuid(),
vector: z.array(z.number()),
payload: z.object({
projectId: z.string(),
sourceId: z.string(),
docId: z.string(),
content: z.string(),
title: z.string(),
name: z.string(),
}),
});
export const WorkflowAgent = z.object({
name: z.string(),
type: z.union([

View file

@ -0,0 +1,9 @@
import { S3Client } from "@aws-sdk/client-s3";
export const uploadsS3Client = new S3Client({
region: process.env.UPLOADS_AWS_REGION || 'us-east-1',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID || '',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || '',
},
});

View file

@ -3,7 +3,7 @@
import { Metadata } from "next";
import { Spinner, Textarea, Button, Dropdown, DropdownMenu, DropdownItem, DropdownTrigger, Modal, ModalContent, ModalHeader, ModalBody, ModalFooter, Input, useDisclosure, Divider } from "@nextui-org/react";
import { ReactNode, useEffect, useState, useCallback } from "react";
import { getProjectConfig, updateProjectName, updateWebhookUrl, createApiKey, deleteApiKey, listApiKeys, deleteProject, rotateSecret } from "@/app/actions";
import { getProjectConfig, updateProjectName, updateWebhookUrl, createApiKey, deleteApiKey, listApiKeys, deleteProject, rotateSecret } from "@/app/actions/project_actions";
import { CopyButton } from "@/app/lib/components/copy-button";
import { EditableField } from "@/app/lib/components/editable-field";
import { EyeIcon, EyeOffIcon, CopyIcon, MoreVerticalIcon, PlusIcon, EllipsisVerticalIcon } from "lucide-react";

View file

@ -4,7 +4,7 @@ import Link from "next/link";
import { useEffect, useState } from "react";
import clsx from "clsx";
import Menu from "./menu";
import { getProjectConfig } from "@/app/actions";
import { getProjectConfig } from "@/app/actions/project_actions";
import { ChevronsLeftIcon, ChevronsRightIcon, FolderOpenIcon, PanelLeftCloseIcon, PanelLeftOpenIcon } from "lucide-react";
export function Nav({

View file

@ -1,5 +1,5 @@
'use client';
import { getAssistantResponse, simulateUserResponse } from "@/app/actions";
import { getAssistantResponse, simulateUserResponse } from "@/app/actions/actions";
import { useEffect, useState } from "react";
import { Messages } from "./messages";
import z from "zod";

View file

@ -3,7 +3,7 @@ import { Button, Spinner, Textarea } from "@nextui-org/react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import z from "zod";
import { GetInformationToolResult, WebpageCrawlResponse, Workflow, WorkflowTool } from "@/app/lib/types";
import { executeClientTool, getInformationTool, scrapeWebpage, suggestToolResponse } from "@/app/actions";
import { executeClientTool, getInformationTool, scrapeWebpage, suggestToolResponse } from "@/app/actions/actions";
import MarkdownContent from "@/app/lib/components/markdown-content";
import Link from "next/link";
import { apiV1 } from "rowboat-shared";
@ -293,14 +293,15 @@ function GetInformationToolCall({
{typedResult && typedResult.results.length === 0 && <div>No matches found.</div>}
{typedResult && typedResult.results.length > 0 && <ul className="list-disc ml-6">
{typedResult.results.map((result, index) => {
return <li key={'' + index}>
<Link target="_blank" className="underline" href={result.url}>
{result.url}
</Link>
return <li key={'' + index} className="mb-2">
<ExpandableContent
label={result.title || result.name}
content={result.content}
expanded={false}
/>
</li>
})}
</ul>
}
</ul>}
</div>}
</div>
</div>

View file

@ -2,7 +2,7 @@
import { Button, Dropdown, DropdownItem, DropdownMenu, DropdownTrigger, Input, Spinner, Textarea } from "@nextui-org/react";
import { useState, useEffect } from "react";
import { getScenarios, createScenario, updateScenario, deleteScenario } from "@/app/actions";
import { getScenarios, createScenario, updateScenario, deleteScenario } from "@/app/actions/scenario_actions";
import { Scenario, WithStringId } from "@/app/lib/types";
import { z } from "zod";
import { EditableField } from "@/app/lib/components/editable-field";

View file

@ -3,7 +3,7 @@ import { Input, Textarea } from "@nextui-org/react";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";
import { SimulationData } from "@/app/lib/types";
import { z } from "zod";
import { scrapeWebpage } from "@/app/actions";
import { scrapeWebpage } from "@/app/actions/actions";
import { ScenarioList } from "./scenario-list";
export function SimulateURLOption({

View file

@ -1,6 +1,6 @@
'use client';
import { deleteDataSource } from "@/app/actions";
import { deleteDataSource } from "@/app/actions/datasource_actions";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";
export function DeleteSource({

View file

@ -0,0 +1,290 @@
"use client";
import { PageSection } from "@/app/lib/components/PageSection";
import { DataSource, DataSourceDoc, WithStringId } from "@/app/lib/types";
import { z } from "zod";
import { useCallback, useEffect, useState } from "react";
import { useDropzone } from "react-dropzone";
import { deleteDocsFromDataSource, getUploadUrlsForFilesDataSource, addDocsToDataSource, getDownloadUrlForFile, listDocsInDataSource } from "@/app/actions/datasource_actions";
import { RelativeTime } from "@primer/react";
import { Pagination, Spinner } from "@nextui-org/react";
import { DownloadIcon } from "lucide-react";
function FileListItem({
projectId,
sourceId,
file,
onDelete,
}: {
projectId: string,
sourceId: string,
file: WithStringId<z.infer<typeof DataSourceDoc>>,
onDelete: (fileId: string) => Promise<void>;
}) {
const [isDeleting, setIsDeleting] = useState(false);
const [isDownloading, setIsDownloading] = useState(false);
const handleDeleteClick = async () => {
setIsDeleting(true);
try {
await onDelete(file._id);
} finally {
setIsDeleting(false);
}
};
const handleDownloadClick = async () => {
setIsDownloading(true);
try {
const url = await getDownloadUrlForFile(projectId, sourceId, file._id);
window.open(url, '_blank');
} catch (error) {
console.error('Download failed:', error);
// TODO: Add error handling
} finally {
setIsDownloading(false);
}
};
if (file.data.type !== 'file') {
return null;
}
return (
<div className="flex items-center justify-between p-3 bg-gray-50 rounded">
<div>
<div className="flex items-center gap-2">
<p className="font-medium">{file.name}</p>
<div className="shrink-0">
{isDownloading ? (
<Spinner size="sm" />
) : (
<button
onClick={handleDownloadClick}
className={`shrink-0 text-gray-500 hover:text-gray-700`}
>
<DownloadIcon className="w-4 h-4" />
</button>
)}
</div>
</div>
<p className="text-sm text-gray-500">
uploaded <RelativeTime date={new Date(file.createdAt)} /> - {formatFileSize(file.data.size)}
</p>
</div>
<div className="flex gap-2 items-center">
<button
onClick={handleDeleteClick}
disabled={isDeleting}
className={`${isDeleting ? 'text-gray-400' : 'text-red-600 hover:text-red-800'}`}
>
{isDeleting ? (
<Spinner size="sm" />
) : (
'Delete'
)}
</button>
</div>
</div>
);
}
function PaginatedFileList({
projectId,
sourceId,
handleReload,
onDelete,
}: {
projectId: string,
sourceId: string,
handleReload: () => void;
onDelete: (fileId: string) => Promise<void>;
}) {
const [files, setFiles] = useState<WithStringId<z.infer<typeof DataSourceDoc>>[]>([]);
const [page, setPage] = useState(1);
const [total, setTotal] = useState(0);
const [loading, setLoading] = useState(false);
const totalPages = Math.ceil(total / 10);
useEffect(() => {
let ignore = false;
async function fetchFiles() {
setLoading(true);
try {
const { files, total } = await listDocsInDataSource({
projectId,
sourceId,
page,
limit: 10,
});
if (!ignore) {
setFiles(files);
setTotal(total);
}
} catch (error) {
console.error('Error fetching files:', error);
} finally {
setLoading(false);
}
}
fetchFiles();
return () => {
ignore = true;
}
}, [projectId, sourceId, page]);
return (
<div className="mt-6">
<h3 className="text-lg font-semibold mb-3">Uploaded Files</h3>
{loading && <div className="flex items-center justify-center gap-2">
<Spinner size="sm" />
<p>Loading list...</p>
</div>}
{!loading && files.length === 0 && <div className="flex items-center justify-center gap-2">
<p>No files uploaded yet</p>
</div>}
{!loading && files.length > 0 && <div className="space-y-2">
{files.map(file => (
<FileListItem
key={file._id}
file={file}
projectId={projectId}
sourceId={sourceId}
onDelete={onDelete}
/>
))}
{totalPages > 1 && <Pagination
total={totalPages}
page={page}
onChange={setPage}
/>}
</div>}
</div>
)
}
export function FilesSource({
projectId,
dataSource,
handleReload,
}: {
projectId: string,
dataSource: WithStringId<z.infer<typeof DataSource>>,
handleReload: () => void;
}) {
const [uploading, setUploading] = useState(false);
const [fileListKey, setFileListKey] = useState(0);
const onDrop = useCallback(async (acceptedFiles: File[]) => {
setUploading(true);
try {
const urls = await getUploadUrlsForFilesDataSource(projectId, dataSource._id, acceptedFiles.map(file => ({
name: file.name,
type: file.type,
size: file.size,
})));
// Upload files in parallel
await Promise.all(acceptedFiles.map(async (file, index) => {
await fetch(urls[index].presignedUrl, {
method: 'PUT',
body: file,
headers: {
'Content-Type': file.type,
},
});
}));
// After successful uploads, update the database with file information
await addDocsToDataSource({
projectId,
sourceId: dataSource._id,
docData: acceptedFiles.map((file, index) => ({
_id: urls[index].fileId,
name: file.name,
data: {
type: 'file',
name: file.name,
size: file.size,
mimeType: file.type,
s3Key: urls[index].s3Key,
},
})),
});
handleReload();
setFileListKey(prev => prev + 1);
} catch (error) {
console.error('Upload failed:', error);
// TODO: Add error handling
} finally {
setUploading(false);
}
}, [projectId, dataSource._id, handleReload]);
const { getRootProps, getInputProps, isDragActive } = useDropzone({
onDrop,
disabled: uploading,
accept: {
'application/pdf': ['.pdf'],
'text/plain': ['.txt'],
'application/msword': ['.doc'],
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['.docx'],
},
});
const handleDelete = async (docId: string) => {
await deleteDocsFromDataSource({
projectId,
sourceId: dataSource._id,
docIds: [docId],
});
handleReload();
setFileListKey(fileListKey + 1);
};
return (
<PageSection title="Upload files">
<div
{...getRootProps()}
className={`border-2 border-dashed rounded-lg p-8 text-center cursor-pointer
${isDragActive ? 'border-blue-500 bg-blue-50' : 'border-gray-300'}`}
>
<input {...getInputProps()} />
{uploading ? (
<div className="flex items-center justify-center gap-2">
<Spinner size="sm" />
<p>Uploading files...</p>
</div>
) : isDragActive ? (
<p>Drop the files here...</p>
) : (
<div>
<p>Drag and drop files here, or click to select files</p>
<p className="text-sm text-gray-500">
Supported file types: PDF, TXT, DOC, DOCX
</p>
</div>
)}
</div>
<PaginatedFileList
key={fileListKey}
projectId={projectId}
sourceId={dataSource._id}
handleReload={handleReload}
onDelete={handleDelete}
/>
</PageSection>
);
}
function formatFileSize(bytes: number): string {
if (bytes === 0) return '0 Bytes';
const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
}

View file

@ -1,9 +1,4 @@
import { notFound } from "next/navigation";
import { dataSourcesCollection } from "@/app/lib/mongodb";
import { ObjectId } from "mongodb";
import { Metadata } from "next";
import { SourcePage } from "./source-page";
import { getDataSource } from "@/app/actions";
export default async function Page({
params,

View file

@ -0,0 +1,273 @@
"use client";
import { PageSection } from "@/app/lib/components/PageSection";
import { DataSource, DataSourceDoc, WithStringId } from "@/app/lib/types";
import { z } from "zod";
import { Recrawl } from "./web-recrawl";
import { deleteDocsFromDataSource, listDocsInDataSource, recrawlWebDataSource, addDocsToDataSource } from "@/app/actions/datasource_actions";
import { useState, useEffect } from "react";
import { Spinner } from "@nextui-org/react";
import { Pagination } from "@nextui-org/react";
import { ExternalLinkIcon } from "lucide-react";
import { Textarea } from "@nextui-org/react";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";
import { PlusIcon } from "lucide-react";
function UrlListItem({
file,
onDelete,
}: {
file: WithStringId<z.infer<typeof DataSourceDoc>>,
onDelete: (fileId: string) => Promise<void>;
}) {
const [isDeleting, setIsDeleting] = useState(false);
const handleDeleteClick = async () => {
setIsDeleting(true);
try {
await onDelete(file._id);
} finally {
setIsDeleting(false);
}
};
if (file.data.type !== 'url') {
return null;
}
return (
<div className="flex items-center justify-between p-3 bg-gray-50 rounded">
<div>
<div className="flex items-center gap-2">
<p className="font-medium">{file.name}</p>
<div className="shrink-0">
<a href={file.data.url} target="_blank" rel="noopener noreferrer">
<ExternalLinkIcon className="w-4 h-4" />
</a>
</div>
</div>
</div>
<div className="flex gap-2 items-center">
<button
onClick={handleDeleteClick}
disabled={isDeleting}
className={`${isDeleting ? 'text-gray-400' : 'text-red-600 hover:text-red-800'}`}
>
{isDeleting ? (
<Spinner size="sm" />
) : (
'Delete'
)}
</button>
</div>
</div>
);
}
function UrlList({
projectId,
sourceId,
onDelete,
}: {
projectId: string,
sourceId: string,
onDelete: (fileId: string) => Promise<void>,
}) {
const [files, setFiles] = useState<WithStringId<z.infer<typeof DataSourceDoc>>[]>([]);
const [loading, setLoading] = useState(true);
const [page, setPage] = useState(1);
const [total, setTotal] = useState(0);
const totalPages = Math.ceil(total / 10);
useEffect(() => {
let ignore = false;
async function fetchFiles() {
setLoading(true);
try {
const { files, total } = await listDocsInDataSource({ projectId, sourceId, page, limit: 10 });
if (!ignore) {
setFiles(files);
setTotal(total);
}
} catch (error) {
console.error('Error fetching files:', error);
} finally {
setLoading(false);
}
}
fetchFiles();
return () => {
ignore = true;
};
}, [projectId, sourceId, page]);
return (
<div className="mt-6">
<h3 className="text-lg font-semibold mb-3">URLs</h3>
{loading && <div className="flex items-center justify-center gap-2">
<Spinner size="sm" />
<p>Loading list...</p>
</div>}
{!loading && files.length === 0 && <div className="flex items-center justify-center gap-2">
<p>No files uploaded yet</p>
</div>}
{!loading && files.length > 0 && <div className="space-y-2">
{files.map(file => (
<UrlListItem
key={file._id}
file={file}
onDelete={onDelete}
/>
))}
{totalPages > 1 && <Pagination
total={totalPages}
page={page}
onChange={setPage}
/>}
</div>}
</div>
)
}
function AddUrls({
projectId,
sourceId,
onAdd,
}: {
projectId: string,
sourceId: string,
onAdd: () => void,
}) {
const [isAdding, setIsAdding] = useState(false);
const [showForm, setShowForm] = useState(false);
async function handleSubmit(formData: FormData) {
setIsAdding(true);
try {
const urls = formData.get('urls') as string;
const urlsArray = urls.split('\n')
.map(url => url.trim())
.filter(url => url.length > 0);
const first100Urls = urlsArray.slice(0, 100);
await addDocsToDataSource({
projectId,
sourceId,
docData: first100Urls.map(url => ({
name: url,
data: {
type: 'url',
url,
},
})),
});
onAdd();
setShowForm(false); // Hide form after successful submission
} finally {
setIsAdding(false);
}
}
return (
<div>
{!showForm ? (
<FormStatusButton
props={{
onClick: () => setShowForm(true),
children: "Add more URLs",
className: "self-start",
startContent: <PlusIcon className="w-[24px] h-[24px]" />,
}}
/>
) : (
<div className="space-y-4">
<form action={handleSubmit} className="flex flex-col gap-4">
<Textarea
required
type="text"
name="urls"
label="Add more URLs (one per line)"
minRows={5}
maxRows={10}
labelPlacement="outside"
placeholder="https://example.com"
variant="bordered"
/>
<div className="flex gap-2">
<FormStatusButton
props={{
type: "submit",
children: "Add URLs",
className: "self-start",
startContent: <PlusIcon className="w-[24px] h-[24px]" />,
isLoading: isAdding,
}}
/>
<button
type="button"
onClick={() => setShowForm(false)}
className="text-gray-500 hover:text-gray-700"
>
Cancel
</button>
</div>
</form>
</div>
)}
</div>
);
}
export function ScrapeSource({
projectId,
dataSource,
handleReload,
}: {
projectId: string,
dataSource: WithStringId<z.infer<typeof DataSource>>,
handleReload: () => void;
}) {
const [fileListKey, setFileListKey] = useState(0);
async function handleRefresh() {
await recrawlWebDataSource(projectId, dataSource._id);
handleReload();
setFileListKey(prev => prev + 1);
}
async function handleDelete(docId: string) {
await deleteDocsFromDataSource({
projectId,
sourceId: dataSource._id,
docIds: [docId],
});
handleReload();
setFileListKey(prev => prev + 1);
}
return <>
<PageSection title="Add URLs">
<AddUrls
projectId={projectId}
sourceId={dataSource._id}
onAdd={() => handleReload()}
/>
</PageSection>
<PageSection title="Index details">
<UrlList
projectId={projectId}
sourceId={dataSource._id}
onDelete={handleDelete}
/>
</PageSection>
{(dataSource.status === 'ready' || dataSource.status === 'error') && <PageSection title="Refresh">
<div className="flex flex-col gap-2 items-start">
<p>Scrape the URLs again to fetch updated content:</p>
<Recrawl projectId={projectId} sourceId={dataSource._id} handleRefresh={handleRefresh} />
</div>
</PageSection>}
</>;
}

View file

@ -0,0 +1,13 @@
export function UrlList({ urls }: { urls: string }) {
return <pre className="max-w-[450px] border p-1 border-gray-300 rounded overflow-auto min-h-7 max-h-52 text-nowrap">
{urls}
</pre>;
}
export function TableLabel({ children, className }: { children: React.ReactNode, className?: string }) {
return <th className={`font-medium text-gray-800 text-left align-top pr-4 py-4 ${className}`}>{children}</th>;
}
export function TableValue({ children, className }: { children: React.ReactNode, className?: string }) {
return <td className={`align-top py-4 ${className}`}>{children}</td>;
}

View file

@ -1,29 +1,17 @@
'use client';
import { DataSource } from "@/app/lib/types";
import { DataSource, WithStringId } from "@/app/lib/types";
import { PageSection } from "@/app/lib/components/PageSection";
import { ToggleSource } from "../toggle-source";
import { Link, Spinner } from "@nextui-org/react";
import { Spinner } from "@nextui-org/react";
import { SourceStatus } from "../source-status";
import { DeleteSource } from "./delete";
import { Recrawl } from "./web-recrawl";
import { useSearchParams } from "next/navigation";
import { useEffect, useState } from "react";
import { getDataSource, recrawlWebDataSource } from "@/app/actions";
import { DataSourceIcon } from "@/app/lib/components/datasource-icon";
import { z } from "zod";
function UrlList({ urls }: { urls: string }) {
return <pre className="max-w-[450px] border p-1 border-gray-300 rounded overflow-auto min-h-7 max-h-52 text-nowrap">
{urls}
</pre>;
}
function TableLabel({ children, className }: { children: React.ReactNode, className?: string }) {
return <th className={`font-medium text-gray-800 text-left align-top pr-4 py-4 ${className}`}>{children}</th>;
}
function TableValue({ children, className }: { children: React.ReactNode, className?: string }) {
return <td className={`align-top py-4 ${className}`}>{children}</td>;
}
import { TableLabel, TableValue } from "./shared";
import { ScrapeSource } from "./scrape-source";
import { FilesSource } from "./files-source";
import { getDataSource } from "@/app/actions/datasource_actions";
export function SourcePage({
sourceId,
@ -32,16 +20,25 @@ export function SourcePage({
sourceId: string;
projectId: string;
}) {
const searchParams = useSearchParams();
const [source, setSource] = useState<z.infer<typeof DataSource> | null>(null);
const [source, setSource] = useState<WithStringId<z.infer<typeof DataSource>> | null>(null);
const [isLoading, setIsLoading] = useState(true);
// fetch source daat first time
async function handleReload() {
setIsLoading(true);
const updatedSource = await getDataSource(projectId, sourceId);
setSource(updatedSource);
setIsLoading(false);
}
// fetch source data first time
useEffect(() => {
let ignore = false;
async function fetchSource() {
setIsLoading(true);
const source = await getDataSource(projectId, sourceId);
if (!ignore) {
setSource(source);
setIsLoading(false);
}
}
fetchSource();
@ -59,7 +56,7 @@ export function SourcePage({
if (!source) {
return;
}
if (source.status !== 'processing' && source.status !== 'new') {
if (source.status !== 'pending') {
return;
}
@ -83,13 +80,9 @@ export function SourcePage({
};
}, [source, projectId, sourceId]);
async function handleRefresh() {
await recrawlWebDataSource(projectId, sourceId);
const updatedSource = await getDataSource(projectId, sourceId);
setSource(updatedSource);
}
if (!source) {
if (!source || isLoading) {
return <div className="flex items-center gap-2">
<Spinner size="sm" />
<div>Loading...</div>
@ -116,14 +109,14 @@ export function SourcePage({
<tr>
<TableLabel>Type:</TableLabel>
<TableValue>
{source.data.type === 'crawl' && <div className="flex gap-1 items-center">
<DataSourceIcon type="crawl" />
<div>Crawl URLs</div>
</div>}
{source.data.type === 'urls' && <div className="flex gap-1 items-center">
<DataSourceIcon type="urls" />
<div>Specify URLs</div>
</div>}
{source.data.type === 'files' && <div className="flex gap-1 items-center">
<DataSourceIcon type="files" />
<div>File upload</div>
</div>}
</TableValue>
</tr>
<tr>
@ -132,77 +125,12 @@ export function SourcePage({
<SourceStatus status={source.status} projectId={projectId} />
</TableValue>
</tr>
{source.data.type === 'urls' && source.data.missingUrls && <tr>
<TableLabel className="text-red-500">Errors:</TableLabel>
<TableValue>
<div>Some URLs could not be scraped. See the list below.</div>
</TableValue>
</tr>}
</tbody>
</table>
</PageSection>
{source.data.type === 'crawl' && <PageSection title="Crawl details">
<table className="table-auto">
<tbody>
<tr>
<TableLabel>Starting URL:</TableLabel>
<TableValue>
<Link
href={source.data.startUrl}
target="_blank"
showAnchorIcon
color="foreground"
underline="always"
>
{source.data.startUrl}
</Link>
</TableValue>
</tr>
<tr>
<TableLabel>Limit:</TableLabel>
<TableValue>
{source.data.limit} pages
</TableValue>
</tr>
{source.data.crawledUrls && <tr>
<TableLabel>Crawled URLs:</TableLabel>
<TableValue>
<UrlList urls={source.data.crawledUrls} />
</TableValue>
</tr>}
</tbody>
</table>
</PageSection>}
{source.data.type === 'urls' && <PageSection title="Index details">
<table className="table-auto">
<tbody>
<tr>
<TableLabel>Input URLs:</TableLabel>
<TableValue>
<UrlList urls={source.data.urls.join('\n')} />
</TableValue>
</tr>
{source.data.scrapedUrls && <tr>
<TableLabel>Scraped URLs:</TableLabel>
<TableValue>
<UrlList urls={source.data.scrapedUrls} />
</TableValue>
</tr>}
{source.data.missingUrls && <tr>
<TableLabel className="text-red-500">The following URLs could not be scraped:</TableLabel>
<TableValue>
<UrlList urls={source.data.missingUrls} />
</TableValue>
</tr>}
</tbody>
</table>
</PageSection>}
{(source.status === 'completed' || source.status === 'error') && (source.data.type === 'crawl' || source.data.type === 'urls') && <PageSection title="Refresh">
<div className="flex flex-col gap-2 items-start">
<p>{source.data.type === 'crawl' ? 'Crawl' : 'Scrape'} the URLs again to fetch updated content:</p>
<Recrawl projectId={projectId} sourceId={sourceId} handleRefresh={handleRefresh} />
</div>
</PageSection>}
{source.data.type === 'urls' && <ScrapeSource projectId={projectId} dataSource={source} handleReload={handleReload} />}
{source.data.type === 'files' && <FilesSource projectId={projectId} dataSource={source} handleReload={handleReload} />}
<PageSection title="Danger zone">
<div className="flex flex-col gap-2 items-start">
<p>Delete this data source:</p>

View file

@ -1,7 +1,6 @@
'use client';
import { recrawlWebDataSource } from "@/app/actions";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";
import { RefreshCwIcon } from "lucide-react";
export function Recrawl({
projectId,
@ -16,9 +15,7 @@ export function Recrawl({
<FormStatusButton
props={{
type: "submit",
startContent: <svg className="w-6 h-6" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="none" viewBox="0 0 24 24">
<path stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" strokeWidth="1" d="M17.651 7.65a7.131 7.131 0 0 0-12.68 3.15M18.001 4v4h-4m-7.652 8.35a7.13 7.13 0 0 0 12.68-3.15M6 20v-4h4" />
</svg>,
startContent: <RefreshCwIcon />,
children: "Refresh",
}}
/>

View file

@ -1,9 +1,11 @@
'use client';
import { Input, Select, SelectItem, Textarea } from "@nextui-org/react"
import { useState } from "react";
import { createCrawlDataSource, createUrlsDataSource } from "@/app/actions";
import { createDataSource, addDocsToDataSource } from "@/app/actions/datasource_actions";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";
import { DataSourceIcon } from "@/app/lib/components/datasource-icon";
import { PlusIcon } from "lucide-react";
import { useRouter } from "next/navigation";
export function Form({
projectId
@ -11,9 +13,62 @@ export function Form({
projectId: string;
}) {
const [sourceType, setSourceType] = useState("");
const router = useRouter();
// const createCrawlDataSourceWithProjectId = createCrawlDataSource.bind(null, projectId);
const createUrlsDataSourceWithProjectId = createUrlsDataSource.bind(null, projectId);
// async function createCrawlDataSource(formData: FormData) {
// const source = await createDataSource({
// projectId,
// name: formData.get('name') as string,
// data: {
// type: 'crawl',
// startUrl: formData.get('startUrl') as string,
// limit: parseInt(formData.get('limit') as string),
// },
// status: 'queued',
// });
// router.push(`/projects/${projectId}/sources/${source._id}`);
// }
async function createUrlsDataSource(formData: FormData) {
const source = await createDataSource({
projectId,
name: formData.get('name') as string,
data: {
type: 'urls',
},
status: 'pending',
});
const urls = formData.get('urls') as string;
const urlsArray = urls.split('\n').map(url => url.trim()).filter(url => url.length > 0);
// pick first 100
const first100Urls = urlsArray.slice(0, 100);
await addDocsToDataSource({
projectId,
sourceId: source._id,
docData: first100Urls.map(url => ({
name: url,
data: {
type: 'url',
url,
},
})),
});
router.push(`/projects/${projectId}/sources/${source._id}`);
}
async function createFilesDataSource(formData: FormData) {
const source = await createDataSource({
projectId,
name: formData.get('name') as string,
data: {
type: 'files',
},
status: 'ready',
});
router.push(`/projects/${projectId}/sources/${source._id}`);
}
function handleSourceTypeChange(event: React.ChangeEvent<HTMLSelectElement>) {
setSourceType(event.target.value);
@ -40,7 +95,14 @@ export function Form({
>
Scrape URLs
</SelectItem>
</Select>
<SelectItem
key="files"
value="files"
startContent={<DataSourceIcon type="files" />}
>
Upload files
</SelectItem>
</Select>
{/* {sourceType === "crawl" && <form
action={createCrawlDataSourceWithProjectId}
@ -99,7 +161,7 @@ export function Form({
</form>} */}
{sourceType === "urls" && <form
action={createUrlsDataSourceWithProjectId}
action={createUrlsDataSource}
className="flex flex-col gap-4"
>
<Textarea
@ -136,9 +198,35 @@ export function Form({
type: "submit",
children: "Add data source",
className: "self-start",
startContent: <svg className="w-[24px] h-[24px]" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="none" viewBox="0 0 24 24">
<path stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" strokeWidth="1" d="M5 12h14m-7 7V5" />
</svg>,
startContent: <PlusIcon className="w-[24px] h-[24px]" />
}}
/>
</form>}
{sourceType === "files" && <form
action={createFilesDataSource}
className="flex flex-col gap-4"
>
<div className="self-start">
<Input
required
type="text"
name="name"
label="Name this data source"
labelPlacement="outside"
placeholder="e.g. Documentation files"
variant="bordered"
/>
</div>
<div className="text-sm">
<p>You will be able to upload files in the next step</p>
</div>
<FormStatusButton
props={{
type: "submit",
children: "Add data source",
className: "self-start",
startContent: <PlusIcon className="w-[24px] h-[24px]" />
}}
/>
</form>}

View file

@ -1,5 +1,5 @@
'use client';
import { getUpdatedSourceStatus } from "@/app/actions";
import { getDataSource } from "@/app/actions/datasource_actions";
import { DataSource } from "@/app/lib/types";
import { useEffect, useState } from "react";
import { z } from 'zod';
@ -19,33 +19,29 @@ export function SelfUpdatingSourceStatus({
const [status, setStatus] = useState(initialStatus);
useEffect(() => {
console.log("in effect i'm here")
let unmounted = false;
if (status !== 'processing' && status !== 'new') {
return;
let ignore = false;
let timeoutId: NodeJS.Timeout | null = null;
async function check() {
if (ignore) {
return;
}
const source = await getDataSource(projectId, sourceId);
setStatus(source.status);
timeoutId = setTimeout(check, 15 * 1000);
}
function check() {
if (unmounted) {
return;
}
if (status !== 'processing' && status !== 'new') {
return;
}
console.log("i'm here")
getUpdatedSourceStatus(projectId, sourceId)
.then((updatedStatus) => {
console.log("updatedStatus", updatedStatus)
setStatus(updatedStatus);
setTimeout(check, 15 * 1000);
});
if (status == 'pending') {
timeoutId = setTimeout(check, 15 * 1000);
}
setTimeout(check, 15 * 1000);
return () => {
unmounted = true;
ignore = true;
if (timeoutId) {
clearTimeout(timeoutId);
}
};
});
}, [status, projectId, sourceId]);
return <SourceStatus status={status} compact={compact} projectId={projectId} />;
}

View file

@ -24,7 +24,7 @@ export function SourceStatus({
There was an unexpected error while processing this resource.
</div>}
</div>}
{status == 'processing' && <div className="flex flex-col gap-1 items-start">
{status == 'pending' && <div className="flex flex-col gap-1 items-start">
<div className="flex gap-1 items-center">
<Spinner size="sm" />
<div className="text-gray-400">
@ -35,20 +35,7 @@ export function SourceStatus({
This source is being processed. This may take a few minutes.
</div>}
</div>}
{status == 'new' && <div className="flex flex-col gap-1 items-start">
<div className="flex gap-1 items-center">
<svg className="w-[24px] h-[24px]" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="none" viewBox="0 0 24 24">
<path stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" strokeWidth="1" d="M12 8v4l3 3m6-3a9 9 0 1 1-18 0 9 9 0 0 1 18 0Z" />
</svg>
<div>
Queued
</div>
</div>
{!compact && <div className="text-sm text-gray-400">
This source is waiting to be processed.
</div>}
</div>}
{status === 'completed' && <div className="flex flex-col gap-1 items-start">
{status === 'ready' && <div className="flex flex-col gap-1 items-start">
<div className="flex gap-1 items-center">
<svg className="w-6 h-6" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path fillRule="evenodd" d="M2 12C2 6.477 6.477 2 12 2s10 4.477 10 10-4.477 10-10 10S2 17.523 2 12Zm13.707-1.293a1 1 0 0 0-1.414-1.414L11 12.586l-1.793-1.793a1 1 0 0 0-1.414 1.414l2.5 2.5a1 1 0 0 0 1.414 0l4-4Z" clipRule="evenodd" />

View file

@ -7,7 +7,7 @@ import { DataSourceIcon } from "@/app/lib/components/datasource-icon";
import { useEffect, useState } from "react";
import { DataSource, WithStringId } from "@/app/lib/types";
import { z } from "zod";
import { listSources } from "@/app/actions";
import { listDataSources } from "@/app/actions/datasource_actions";
export function SourcesList({
projectId,
@ -22,7 +22,7 @@ export function SourcesList({
async function fetchSources() {
setLoading(true);
const sources = await listSources(projectId);
const sources = await listDataSources(projectId);
if (!ignore) {
setSources(sources);
setLoading(false);
@ -81,13 +81,9 @@ export function SourcesList({
</Link>
</td>
<td className="py-4">
{source.data.type == 'crawl' && <div className="flex gap-1 items-center">
<DataSourceIcon type="crawl" />
<div>Crawl URLs</div>
</div>}
{source.data.type == 'urls' && <div className="flex gap-1 items-center">
<DataSourceIcon type="urls" />
<div>Specify URLs</div>
<div>List URLs</div>
</div>}
</td>
<td className="py-4">

View file

@ -1,5 +1,5 @@
'use client';
import { toggleDataSource } from "@/app/actions";
import { toggleDataSource } from "@/app/actions/datasource_actions";
import { Spinner } from "@nextui-org/react";
import { Switch } from "@nextui-org/react";
import { useState } from "react";
@ -39,6 +39,6 @@ export function ToggleSource({
</Switch>
{loading && <Spinner size="sm" />}
</div>
{!compact && !isActive && <p className="text-sm text-red-800">This data source will not be used in chats.</p>}
{!compact && !isActive && <p className="text-sm text-red-800">This data source will not be used for RAG.</p>}
</div>;
}

View file

@ -5,7 +5,8 @@ import { useCallback, useEffect, useState } from "react";
import { WorkflowEditor } from "./workflow_editor";
import { WorkflowSelector } from "./workflow_selector";
import { Spinner } from "@nextui-org/react";
import { cloneWorkflow, createWorkflow, fetchPublishedWorkflowId, fetchWorkflow, listSources } from "@/app/actions";
import { cloneWorkflow, createWorkflow, fetchPublishedWorkflowId, fetchWorkflow } from "@/app/actions/workflow_actions";
import { listDataSources } from "@/app/actions/datasource_actions";
export function App({
projectId,
@ -23,7 +24,7 @@ export function App({
setLoading(true);
const workflow = await fetchWorkflow(projectId, workflowId);
const publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
const dataSources = await listSources(projectId);
const dataSources = await listDataSources(projectId);
// Store the selected workflow ID in local storage
localStorage.setItem(`lastWorkflowId_${projectId}`, workflowId);
setWorkflow(workflow);
@ -43,7 +44,7 @@ export function App({
setLoading(true);
const workflow = await createWorkflow(projectId);
const publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
const dataSources = await listSources(projectId);
const dataSources = await listDataSources(projectId);
// Store the selected workflow ID in local storage
localStorage.setItem(`lastWorkflowId_${projectId}`, workflow._id);
setWorkflow(workflow);
@ -56,7 +57,7 @@ export function App({
setLoading(true);
const workflow = await cloneWorkflow(projectId, workflowId);
const publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
const dataSources = await listSources(projectId);
const dataSources = await listDataSources(projectId);
// Store the selected workflow ID in local storage
localStorage.setItem(`lastWorkflowId_${projectId}`, workflow._id);
setWorkflow(workflow);

View file

@ -4,7 +4,7 @@ import { ActionButton, Pane } from "./pane";
import { useEffect, useRef, useState, createContext, useContext, useCallback } from "react";
import { CopilotAssistantMessage, CopilotMessage, CopilotUserMessage, Workflow, CopilotChatContext, CopilotAssistantMessageActionPart } from "@/app/lib/types";
import { z } from "zod";
import { getCopilotResponse } from "@/app/actions";
import { getCopilotResponse } from "@/app/actions/actions";
import { Action } from "./copilot_actions";
import clsx from "clsx";
import { Action as WorkflowDispatch } from "./workflow_editor";

View file

@ -18,7 +18,7 @@ import {
} from "@/components/ui/resizable"
import { Copilot } from "./copilot";
import { apiV1 } from "rowboat-shared";
import { publishWorkflow, renameWorkflow, saveWorkflow } from "@/app/actions";
import { publishWorkflow, renameWorkflow, saveWorkflow } from "@/app/actions/workflow_actions";
import { PublishedBadge } from "./published_badge";
import { BackIcon, HamburgerIcon, WorkflowIcon } from "@/app/lib/components/icons";
import { CopyIcon, Layers2Icon, RadioIcon, RedoIcon, UndoIcon } from "lucide-react";

View file

@ -4,7 +4,7 @@ import { z } from "zod";
import { useEffect, useState, useCallback } from "react";
import { PublishedBadge } from "./published_badge";
import { RelativeTime } from "@primer/react";
import { listWorkflows } from "@/app/actions";
import { listWorkflows } from "@/app/actions/workflow_actions";
import { Button, Divider, Pagination } from "@nextui-org/react";
import { WorkflowIcon } from "@/app/lib/components/icons";
import { PlusIcon } from "lucide-react";

View file

@ -6,7 +6,7 @@ import { Project } from "../lib/types";
import { default as NextLink } from "next/link";
import { useEffect, useState } from "react";
import { z } from "zod";
import { listProjects } from "../actions";
import { listProjects } from "../actions/project_actions";
import { useRouter } from 'next/navigation';
export default function App() {

View file

@ -1,6 +1,6 @@
'use client';
import { cn, Input } from "@nextui-org/react";
import { createProject } from "@/app/actions";
import { createProject } from "@/app/actions/project_actions";
import { templates } from "@/app/lib/project_templates";
import { WorkflowTemplate } from "@/app/lib/types";
import { FormStatusButton } from "@/app/lib/components/FormStatusButton";

View file

@ -1,994 +0,0 @@
import '../lib/loadenv';
import FirecrawlApp, { CrawlStatusResponse, ErrorResponse, FirecrawlDocument } from '@mendable/firecrawl-js';
import { RecursiveCharacterTextSplitter } from "@langchain/textsplitters";
import { z } from 'zod';
import { Document } from '@langchain/core/documents';
import * as fs from 'fs/promises';
import { dataSourcesCollection, embeddingsCollection, webpagesCollection } from '../lib/mongodb';
import { DataSource, EmbeddingDoc } from '../lib/types';
import { WithId } from 'mongodb';
import assert from 'assert';
import { embedMany, generateText } from 'ai';
import { embeddingModel } from '../lib/embedding';
import { openai } from '@ai-sdk/openai';
import { WriteStream } from 'fs';
import * as cheerio from 'cheerio';
import { ObjectId } from 'mongodb';
import path from 'path';
const oxylabsUsername = process.env.OXYLABS_USERNAME;
const oxylabsPassword = process.env.OXYLABS_PASSWORD;
const firecrawl = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY });
const oxylabsHttpAuth = {
'Authorization': 'Basic ' + Buffer.from(`${oxylabsUsername}:${oxylabsPassword}`).toString('base64'),
}
const firecrawlHttpAuth = {
'Authorization': `Bearer ${process.env.FIRECRAWL_API_KEY}`,
}
type Webpage = {
title: string,
url: string,
markdown: string,
html: string,
}
const splitter = new RecursiveCharacterTextSplitter({
separators: ['\n\n', '\n', '. ', '.', ''],
chunkSize: 1024,
chunkOverlap: 20,
});
type OxylabsDocument = {
url: string,
content: string,
}
const second = 1000;
const minute = 60 * second;
const hour = 60 * minute;
const day = 24 * hour;
const firecrawlStatusPollInterval = 60 * second;
const oxylabsStatusPollInterval = 60 * second;
// create a PrefixLogger class that wraps console.log with a prefix
// and allows chaining with a parent logger
class PrefixLogger {
private prefix: string;
private parent: PrefixLogger | null;
constructor(prefix: string, parent: PrefixLogger | null = null) {
this.prefix = prefix;
this.parent = parent;
}
log(...args: any[]) {
const timestamp = new Date().toISOString();
const prefix = '[' + this.prefix + ']';
if (this.parent) {
this.parent.log(prefix, ...args);
} else {
console.log(timestamp, prefix, ...args);
}
}
child(childPrefix: string): PrefixLogger {
return new PrefixLogger(childPrefix, this);
}
}
/*
const source: z.infer<typeof SourceSchema> = {
_id: new ObjectId(),
url: "https://www.example.com",
type: "web",
status: 'processing',
createdAt: new Date().toISOString(),
};
*/
async function retryable<T>(fn: () => Promise<T>, maxAttempts: number = 3): Promise<T> {
let attempts = 0;
while (true) {
try {
return await fn();
} catch (e) {
attempts++;
if (attempts >= maxAttempts) {
throw e;
}
}
}
}
async function batchMode<InputType, OutputType>(opts: {
batchSize: number,
} & ({
outputFilePath: string,
processBatch: (batch: InputType[]) => Promise<OutputType[]>
} | {
processBatch: (batch: InputType[]) => Promise<void>
}) & ({
input: InputType[],
} | {
inputFilePath: string,
})) {
let inFile: fs.FileHandle | null = null;
let outFile: fs.FileHandle | null = null;
let ws: WriteStream | null = null;
if ('inputFilePath' in opts) {
inFile = await fs.open(opts.inputFilePath || '', 'r');
}
if ('outputFilePath' in opts) {
outFile = await fs.open(opts.outputFilePath, 'w');
ws = outFile.createWriteStream();
}
let batch: InputType[] = [];
async function process() {
const processed = await opts.processBatch(batch);
if (ws && processed?.length) {
for (const doc of processed) {
ws.write(JSON.stringify(doc) + '\n');
}
}
batch = [];
}
try {
if ('input' in opts) {
for (const doc of opts.input) {
batch.push(doc);
if (batch.length < opts.batchSize) {
continue;
}
await process();
}
} else {
assert(inFile);
for await (const line of inFile.readLines()) {
const parsed: InputType = JSON.parse(line);
batch.push(parsed);
if (batch.length < opts.batchSize) {
continue;
}
await process();
}
}
// if there are any leftover documents
if (batch.length > 0) {
await process();
}
} catch (e) {
throw e;
} finally {
if (ws) {
ws.close();
}
if (outFile) {
await outFile.close();
}
if (inFile) {
await inFile.close();
}
}
}
async function scrapeUsingOxylabs(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource> & { data: { type: 'urls' } }>) {
const logger = _logger.child('scrapeUsingOxylabs');
// disable this for now
throw new Error("OxyLabs scraping is disabled for now");
await batchMode({
input: job.data.urls,
outputFilePath: 'crawled-oxylabs.jsonl',
batchSize: 5,
processBatch: async (batch: string[]) => {
const results = await Promise.all(batch.map(async (url) => {
try {
logger.log("Scraping URL", url);
const response = await retryable(async () => {
const res = await fetch('https://realtime.oxylabs.io/v1/queries', {
method: 'POST',
body: JSON.stringify({
'source': 'universal',
'url': url,
'context': [
{ 'key': 'follow_redirects', 'value': true }
]
}),
headers: {
'Content-Type': 'application/json',
...oxylabsHttpAuth,
}
});
if (!res.ok) {
throw new Error(`Unable to scrape URL: ${url} with status: ${res.status} and text: ${res.statusText}, body: ${await res.text()}`);
}
return res;
}, 3); // Retry up to 3 times
const parsed: {
"results": {
"url": string,
"content": string,
"status_code": number,
}[],
} = await response.json();
const result = parsed.results[0];
if (!result) {
throw new Error("No results found for URL: " + url);
}
if (result.status_code !== 200) {
throw new Error("Non-200 status code for URL: " + url);
}
return result;
} catch (e) {
logger.log("Error scraping URL: " + url, e);
return null;
}
}));
return results.filter(r => r !== null);
}
});
}
async function scrapeUsingFirecrawl(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource> & { data: { type: 'urls' } }>) {
const logger = _logger.child('scrapeUsingFirecrawl');
await batchMode({
input: job.data.urls,
outputFilePath: 'crawled-firecrawl.jsonl',
batchSize: 1, // how many firecrawl requests to make at a time
processBatch: async (batch: string[]): Promise<FirecrawlDocument[]> => {
const results = await Promise.all(batch.map(async (url) => {
try {
logger.log("Scraping URL", url);
const result = await retryable(async () => {
const scrapeResult = await firecrawl.scrapeUrl(url, {
formats: ['html', 'markdown'],
onlyMainContent: true,
excludeTags: ['script', 'style', 'noscript', 'img',]
});
if (!scrapeResult.success) {
throw new Error("Unable to scrape URL: " + url);
}
return scrapeResult;
}, 3); // Retry up to 3 times
return result;
} catch (e) {
logger.log("Error scraping URL: " + url, e);
return null;
}
}));
return results.filter(r => r !== null);
}
});
}
async function crawlUsingFirecrawl(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>> & { data: { type: 'crawl' } }) {
const logger = _logger.child('crawlUsingFirecrawl');
// empty the output file before starting
await fs.writeFile('crawled-firecrawl.jsonl', '');
// check if we have an existing firecrawl ID
// if not, start a new crawl job
let firecrawlId = job.data.firecrawlId;
if (!firecrawlId) {
logger.log('Starting firecrawl crawl...');
// start crawl
const result = await retryable(async () => {
const response = await fetch('https://api.firecrawl.dev/v1/crawl', {
method: 'POST',
signal: AbortSignal.timeout(1 * minute),
headers: {
'Content-Type': 'application/json',
...firecrawlHttpAuth,
},
body: JSON.stringify({
url: job.data.startUrl,
limit: job.data.limit,
maxDepth: 2,
scrapeOptions: {
formats: ['html', 'markdown'],
onlyMainContent: true,
}
}),
});
if (!response.ok) {
throw new Error("Unable to call /crawl API: " + response.statusText);
}
return response;
}, 3);
const parsed = await result.json();
if (!parsed.success) {
throw new Error("Unable to start crawl: parsed.succes = false");
}
const crawlId = parsed.id;
logger.log("Firecrawl job started with ID", crawlId);
firecrawlId = crawlId;
await dataSourcesCollection.updateOne({
_id: job._id,
}, {
$set: {
'data.firecrawlId': firecrawlId,
}
});
} else {
logger.log("Using existing firecrawl job with ID", firecrawlId);
}
// wait for crawl job to complete
let counter = 0;
let resp: CrawlStatusResponse;
while (true) {
// wait for 60s
await new Promise(resolve => setTimeout(resolve, firecrawlStatusPollInterval));
// check status
resp = await retryable(async (): Promise<CrawlStatusResponse> => {
logger.log("Polling firecrawl status...")
const result = await fetch(`https://api.firecrawl.dev/v1/crawl/${firecrawlId}`, {
signal: AbortSignal.timeout(1 * minute),
headers: {
...firecrawlHttpAuth,
}
});
if (!result.ok) {
throw new Error("Unable to fetch crawl status: " + result.statusText);
}
const parsed = await result.json();
if (!parsed.success) {
throw new Error("Unable to fetch crawl status: " + parsed.error);
}
return parsed;
}, 3);
if (resp.status !== 'completed') {
continue;
}
break;
}
// open a file and append data line by line
logger.log("First page collected from firecrawl: ", resp.data.length);
counter += resp.data.length;
const file = await fs.open('crawled-firecrawl.jsonl', 'w');
const ws = file.createWriteStream();
try {
for (const doc of resp.data) {
if (doc && doc.metadata?.statusCode === 200) {
ws.write(JSON.stringify(doc) + '\n');
}
}
let nextUrl = resp.next;
while (nextUrl) {
const parsed = await retryable(async () => {
// fetch next page from firecrawl and pass on the firecrawl api key
// as a bearer token
assert(nextUrl);
const result = await fetch(nextUrl, {
signal: AbortSignal.timeout(1 * minute),
headers: {
Authorization: `Bearer ${process.env["FIRECRAWL_API_KEY"]}`,
}
});
if (!result.ok) {
throw new Error("Unable to fetch next page from firecrawl: " + result.statusText);
}
return await result.json();
}, 3);
logger.log("Next page collected from firecrawl: ", parsed.data.length);
counter += parsed.data.length;
for (const doc of parsed.data) {
if (doc && doc.metadata?.statusCode === 200) {
ws.write(JSON.stringify(doc) + '\n');
}
}
nextUrl = parsed.next;
}
} catch (e) {
throw e;
} finally {
ws.close();
await file.close();
}
}
async function crawlUsingOxylabs(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>> & { data: { type: 'crawl' } }) {
const logger = _logger.child('crawlUsingOxyLabs');
// empty the output file before starting
await fs.writeFile('crawled-oxylabs.jsonl', '');
// disable this for now
throw new Error("OxyLabs crawling is disabled for now");
// check if we have an existing oxylabs ID
// if not, start a new crawl job
let oxylabsId = job.data.oxylabsId;
if (!oxylabsId) {
oxylabsId = await retryable(async () => {
// if url ends with a slash, remove it
let url = job.data.startUrl;
if (job.data.startUrl.endsWith('/')) {
url = url.slice(0, -1);
}
// create a regex for the starting url
// that matches any subpath
const baseRegex = (new RegExp(url)).toString().slice(1, -1);
const subpathRegex = (new RegExp(`${url}/.*`)).toString().slice(1, -1);
/*
const escapedOrigin = url.origin.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
const escapedPathname = url.pathname.replace(/[.*+?^${}()|[\]\\]/g, '\\$&').replace(/\/$/, '');
const regex = new RegExp(`${escapedOrigin}${escapedPathname}(/.*)?`);
*/
logger.log(`Starting crawl for ${url}`);
// Initiate a new Web Crawler job
const response = await fetch('https://ect.oxylabs.io/v1/jobs', {
method: 'POST',
headers: {
...oxylabsHttpAuth,
'Content-Type': 'application/json',
},
body: JSON.stringify({
url: url.toString(),
filters: {
crawl: [baseRegex, subpathRegex],
process: [baseRegex, subpathRegex],
max_depth: 2,
max_urls: job.data.limit,
},
scrape_params: {
source: "universal",
user_agent_type: "desktop",
render: "html",
},
output: {
type_: "html", // Changed from "sitemap" to "html"
aggregate_chunk_size_bytes: 100 * 1024 * 1024, // 100 MB
},
context: {
follow_redirects: true,
},
}),
});
if (!response.ok) {
const errorBody = await response.text();
throw new Error(`HTTP error! status: ${response.status}, ${response.statusText}, body: ${errorBody}`);
}
const jobData = await response.json();
const jobId = jobData.id;
logger.log(`Crawl job initiated. Job ID: ${jobId}`);
return jobId;
}, 3);
await dataSourcesCollection.updateOne({
_id: job._id,
}, {
$set: {
'data.oxylabsId': oxylabsId,
}
});
} else {
logger.log("Using existing oxylabs job with ID", oxylabsId);
}
// Poll for job completion
while (true) {
await new Promise(resolve => setTimeout(resolve, oxylabsStatusPollInterval));
logger.log(`Checking job status...`);
const jobStatusResponse = await retryable(async () => {
const response = await fetch(`https://ect.oxylabs.io/v1/jobs/${oxylabsId}`, {
headers: oxylabsHttpAuth,
});
if (!response.ok) {
throw new Error(`Unable to fetch job status: ${response.statusText}`);
}
return response;
}, 3);
const jobStatus = await jobStatusResponse.json();
const jobCompleted = jobStatus.events.some((event: { event: string; status: string }) =>
event.event === "job_results_aggregated" && event.status === "done"
);
if (jobCompleted) {
break;
}
}
logger.log('Crawl job completed successfully');
// Get the list of aggregate result chunks
logger.log('Fetching aggregate result chunks...');
const aggregateResponse = await retryable(async () => {
const response = await fetch(`https://ect.oxylabs.io/v1/jobs/${oxylabsId}/aggregate`, {
headers: oxylabsHttpAuth,
});
if (!response.ok) {
throw new Error(`Unable to fetch aggregate results: ${response.statusText}`);
}
return response;
}, 3);
const aggregateData = await aggregateResponse.json();
logger.log('Aggregate chunks response:', JSON.stringify(aggregateData));
// Download and write JSONL content to file
const file = await fs.open('crawled-oxylabs.jsonl', 'w');
const ws = file.createWriteStream();
try {
for (const chunk of aggregateData.chunk_urls) {
logger.log("Fetching chunk", chunk.href);
// During development, i found that oxylabs returns http URLs
// Convert chunk href URL to https if it's http
const secureChunkUrl = new URL(chunk.href);
if (secureChunkUrl.protocol === 'http:') {
secureChunkUrl.protocol = 'https:';
}
const chunkResponse = await retryable(async () => {
const response = await fetch(secureChunkUrl.toString(), {
headers: oxylabsHttpAuth,
});
if (!response.ok) {
throw new Error(`Failed to fetch chunk: ${response.status} ${response.statusText}`);
}
if (!response.body) {
throw new Error("No body in chunk response");
}
return response;
}, 3);
const chunkContent = await chunkResponse.text();
ws.write(chunkContent);
if (!chunkContent.endsWith('\n')) {
ws.write('\n');
}
logger.log("Wrote chunk to file", chunk.href);
}
} catch (e) {
throw e;
} finally {
ws.close();
await file.close();
}
}
async function mergeFirecrawlAndOxylabs(_logger: PrefixLogger): Promise<Set<string>> {
const logger = _logger.child('mergeFirecrawlAndOxylabs');
const urlSet = new Set<string>();
const outputFile = await fs.open('crawled.jsonl', 'w');
const outputStream = outputFile.createWriteStream();
let firecrawlCount = 0;
let oxylabsCount = 0;
try {
// Read Firecrawl JSONL file
const firecrawlFile = await fs.open('crawled-firecrawl.jsonl', 'r');
try {
for await (const line of firecrawlFile.readLines()) {
// if line is empty, skip it
if (line.trim() === '') {
continue;
}
const fcDoc: FirecrawlDocument = JSON.parse(line);
urlSet.add(fcDoc.metadata?.sourceURL || '');
const webpage = {
url: fcDoc.metadata?.sourceURL || '',
markdown: fcDoc.markdown || '',
title: fcDoc.metadata?.title || '',
html: fcDoc.html || '',
} as Webpage;
outputStream.write(JSON.stringify(webpage) + '\n');
firecrawlCount++;
}
} catch (e) {
throw e;
} finally {
await firecrawlFile.close();
}
/*
// Read OxyLabs JSONL file
const oxylabsFile = await fs.open('crawled-oxylabs.jsonl', 'r');
try {
let lineNumber = 0;
for await (const line of oxylabsFile.readLines()) {
lineNumber++;
// if line is empty, skip it
if (line.trim() === '') {
continue;
}
let oxDoc: OxylabsDocument;
try {
oxDoc = JSON.parse(line);
} catch (e) {
logger.log("Error parsing line number", lineNumber);
throw e;
}
if (urlSet.has(oxDoc.url)) {
continue;
}
urlSet.add(oxDoc.url);
// parse the html using cheerio
// and extract the title
const $ = cheerio.load(oxDoc.content);
const title = $('title').text();
const webpage = {
url: oxDoc.url,
markdown: '',
title: title,
html: oxDoc.content,
} as Webpage;
outputStream.write(JSON.stringify(webpage) + '\n');
oxylabsCount++;
}
} catch (e) {
throw e;
} finally {
await oxylabsFile.close();
}
*/
} catch (e) {
throw e;
} finally {
outputStream.end();
await outputFile.close();
}
logger.log(`Merged Firecrawl and OxyLabs data. Total unique URLs: ${urlSet.size}`);
logger.log(`URLs crawled by Firecrawl: ${firecrawlCount}`);
logger.log(`URLs crawled by OxyLabs: ${oxylabsCount}`);
return urlSet;
}
async function saveWebpagesToMongodb(logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>) {
await batchMode({
inputFilePath: 'rewritten.jsonl',
batchSize: 100,
processBatch: async (batch: Webpage[]) => {
// perform a bulkwrite operation on the mongodb webpages collection
// it is possible that the webpage already exists in the collection
// in which case we should update the existing document, otherwise
// we should insert a new document with _id = sourceURL
const bulkWriteOps = [];
for (const doc of batch) {
bulkWriteOps.push({
updateOne: {
filter: { _id: doc.url },
update: {
$set: {
title: doc.title,
contentSimple: doc.markdown,
lastUpdatedAt: new Date().toISOString(),
}
},
upsert: true,
}
});
}
if (bulkWriteOps.length === 0) {
return;
}
await webpagesCollection.bulkWrite(bulkWriteOps);
logger.log("Saved webpage contents to mongo", batch.length);
}
});
}
async function dummyRewrite(_logger: PrefixLogger) {
const logger = _logger.child('dummyRewrite');
await batchMode({
inputFilePath: 'crawled.jsonl',
outputFilePath: 'rewritten.jsonl',
batchSize: 10,
processBatch: async (batch: Webpage[]): Promise<Webpage[]> => {
return batch;
}
});
}
async function rewrite(_logger: PrefixLogger) {
const logger = _logger.child('rewrite');
await batchMode({
inputFilePath: 'crawled.jsonl',
outputFilePath: 'rewritten.jsonl',
batchSize: 10,
processBatch: async (batch: Webpage[]): Promise<Webpage[]> => {
// use cheerio to strip extraneous tags and attributes
batch.forEach((doc) => {
const $ = cheerio.load(doc.html);
[
"aside",
"audio",
"button",
"canvas",
"embed",
"footer",
"form",
"header",
"iframe",
"img",
"input",
"link",
"meta",
"nav",
"noscript",
"object",
"script",
"select",
"style",
"svg",
"textarea",
"video"
].forEach((tag) => {
$(tag).remove();
});
// Remove comments
$('*').contents().filter(function () {
return this.type === 'comment';
}).remove();
// Remove most attributes, but keep some for semantic meaning
$('*').each(function () {
const attrsToKeep = ['href', 'src', 'alt', 'title'];
const attrs = $(this).attr();
for (const attr in attrs) {
if (!attrsToKeep.includes(attr)) {
$(this).removeAttr(attr);
}
}
});
// Remove empty elements
$('*').filter(function () {
return $(this).text().trim() === '' && $(this).children().length === 0;
}).remove();
doc.html = $.html();
});
const prompt = `
Rewrite the below html article as Markdown by removing all extra content that does not belong in the main help content for the topic. Extra content can include extraneous links, generic website text, any content about related articles, etc.
Tip: Such content will generally be placed at the start and / or at the end of the article. You can identify the topic from the article's URL and/or Title.
Strictly do not make any other changes to the article.
<START_ARTICLE_CONTENT>
Title: {{title}}
{{content}}
<END_ARTICLE_CONTENT>
`,
rewritten = await Promise.all(batch.map(async (doc) => {
try {
// if doc already contains markdown, skip it
// if (doc.markdown) {
// return doc;
// }
const now = Date.now();
const { text } = await generateText({
model: openai('gpt-4o'),
prompt: prompt
.replace('{{title}}', doc.title)
.replace('{{content}}', doc.html),
});
// log the time taken (in s) to rewrite the text
logger.log("\tCompleted rewrite", doc.url, (Date.now() - now) / 1000, "s");
return {
...doc,
markdown: text,
};
} catch (e) {
return doc;
}
}));
logger.log("Rewrote batch of documents", batch.length);
return rewritten;
}
});
}
async function chunk(logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>) {
await batchMode({
inputFilePath: 'rewritten.jsonl',
outputFilePath: 'chunked.jsonl',
batchSize: 1000,
processBatch: async (batch: Webpage[]): Promise<Document[]> => {
const results = [];
for await (const doc of batch) {
const splits = await splitter.createDocuments([doc.markdown]);
splits.forEach((split) => {
split.metadata.sourceURL = doc.url;
split.metadata.title = doc.title;
split.metadata.sourceId = job._id.toString();
});
results.push(...splits);
}
logger.log("Chunked batch of documents", batch.length);
return results;
}
});
}
async function embeddings(logger: PrefixLogger) {
await batchMode({
inputFilePath: 'chunked.jsonl',
outputFilePath: 'embeddings.jsonl',
batchSize: 200,
processBatch: async (batch: Document[]): Promise<z.infer<typeof EmbeddingDoc>[]> => {
const { embeddings } = await embedMany({
model: embeddingModel,
values: batch.map((doc) => doc.pageContent)
});
logger.log("Embedded batch of documents", batch.length);
return batch.map((doc, i) => ({
sourceId: doc.metadata.sourceId as string,
content: doc.pageContent,
metadata: {
sourceURL: doc.metadata.sourceURL as string,
title: doc.metadata.title as string,
},
embeddings: embeddings[i],
}));
}
});
}
async function mongodb(logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>) {
logger.log("Deleting old embeddings...");
await embeddingsCollection.deleteMany({ sourceId: job._id.toString() });
await batchMode({
inputFilePath: 'embeddings.jsonl',
batchSize: 100,
processBatch: async (batch: z.infer<typeof EmbeddingDoc>[]) => {
await embeddingsCollection.insertMany(batch);
logger.log("Inserted batch of documents", batch.length);
}
});
}
// fetch next job from mongodb
(async () => {
while (true) {
console.log("Polling for job...")
const now = Date.now();
const job = await dataSourcesCollection.findOneAndUpdate(
{
$and: [
{ 'data.type': { $in: ["crawl", "urls"] } },
{
$or: [
{ status: "new" },
{
status: "error",
attempts: { $lt: 3 },
},
{
status: "error",
lastAttemptAt: { $lt: new Date(now - 5 * minute).toISOString() },
},
{
status: "processing",
lastAttemptAt: { $lt: new Date(now - 12 * hour).toISOString() },
}
]
}
]
},
{
$set: {
status: "processing",
lastAttemptAt: new Date().toISOString(),
},
$inc: {
attempts: 1
},
},
{ returnDocument: "after", sort: { createdAt: 1 } }
);
if (job === null) {
// if no doc found, sleep for a bit and start again
await new Promise(resolve => setTimeout(resolve, 5 * second));
continue;
}
// pick a job as a test from db
// const job = await dataSourcesCollection.findOne({
// _id: new ObjectId("6715e9218a128eae83550cc9"),
// });
// assert(job !== null);
const logger = new PrefixLogger(job._id.toString());
logger.log(`Starting job ${job._id}. Type: ${job.data.type}`);
try {
let firecrawlResult;
let oxylabsResult;
if (job.data.type === "crawl") {
// Run the crawl using firecrawl and oxylabs in parallel
// If both fail, throw an error; if one fails, log the error and continue
logger.log("Starting Firecrawl and OxyLabs crawls in parallel...");
[firecrawlResult, oxylabsResult] = await Promise.allSettled([
crawlUsingFirecrawl(logger, job as WithId<z.infer<typeof DataSource>> & { data: { type: 'crawl' } }),
crawlUsingOxylabs(logger, job as WithId<z.infer<typeof DataSource>> & { data: { type: 'crawl' } }),
]);
} else if (job.data.type === "urls") {
// scrape the urls using firecrawl and oxylabs in parallel
logger.log("Starting Firecrawl and OxyLabs scrapes in parallel...");
[firecrawlResult, oxylabsResult] = await Promise.allSettled([
scrapeUsingFirecrawl(logger, job as WithId<z.infer<typeof DataSource>> & { data: { type: 'urls' } }),
scrapeUsingOxylabs(logger, job as WithId<z.infer<typeof DataSource>> & { data: { type: 'urls' } }),
]);
}
assert(firecrawlResult !== undefined);
assert(oxylabsResult !== undefined);
if (firecrawlResult.status === 'rejected' && oxylabsResult.status === 'rejected') {
logger.log('Both Firecrawl and OxyLabs jobs failed', {
firecrawlError: firecrawlResult.reason,
oxylabsError: oxylabsResult.reason
});
throw new Error('Both Firecrawl and OxyLabs jobs failed');
}
if (firecrawlResult.status === 'rejected') {
logger.log('Firecrawl job failed, but OxyLabs succeeded:', {
error: firecrawlResult.reason
});
}
if (oxylabsResult.status === 'rejected') {
logger.log('OxyLabs job failed, but Firecrawl succeeded:', {
error: oxylabsResult.reason
});
}
// merge the firecrawl and oxylabs results
const crawledUrls = await mergeFirecrawlAndOxylabs(logger);
// update the job with the crawled urls
if (job.data.type === "crawl") {
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { 'data.crawledUrls': Array.from(crawledUrls).join('\n') } });
} else if (job.data.type === "urls") {
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { 'data.scrapedUrls': Array.from(crawledUrls).join('\n') } });
}
// rewrite the merged results as simplified html and markdown
// await rewrite(logger);
await dummyRewrite(logger);
await saveWebpagesToMongodb(logger, job);
await chunk(logger, job);
await embeddings(logger);
await mongodb(logger, job);
// if this is a scrape urls job, compare the input urls with the scraped urls
// if there are any urls that were not scraped, set a missingUrls field on the job
if (job.data.type === "urls") {
const missingUrls = job.data.urls.filter((url: string) => !crawledUrls.has(url));
if (missingUrls.length > 0) {
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { 'data.missingUrls': missingUrls.join('\n') } });
} else {
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { 'data.missingUrls': null } });
}
}
} catch (e) {
logger.log("Error processing job; will retry:", e);
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { status: "error" } });
continue;
}
// mark job as complete
logger.log("Marking job as completed...");
await dataSourcesCollection.updateOne({ _id: job._id }, { $set: { status: "completed" } });
// break;
}
})();

View file

@ -0,0 +1,9 @@
import '../lib/loadenv';
import { qdrantClient } from '../lib/qdrant';
(async () => {
await qdrantClient.deleteCollection('embeddings');
const { collections } = await qdrantClient.getCollections();
console.log(collections);
})();

View file

@ -0,0 +1,339 @@
import '../lib/loadenv';
import FirecrawlApp from '@mendable/firecrawl-js';
import { RecursiveCharacterTextSplitter } from "@langchain/textsplitters";
import { z } from 'zod';
import { dataSourceDocsCollection, dataSourcesCollection } from '../lib/mongodb';
import { DataSource, DataSourceDoc, EmbeddingRecord } from '../lib/types';
import { WithId } from 'mongodb';
import { embedMany } from 'ai';
import { embeddingModel } from '../lib/embedding';
import { qdrantClient } from '../lib/qdrant';
import { PrefixLogger } from './shared';
import { GoogleGenerativeAI } from "@google/generative-ai";
import { GetObjectCommand } from "@aws-sdk/client-s3";
import { uploadsS3Client } from '../lib/uploads_s3_client';
const splitter = new RecursiveCharacterTextSplitter({
separators: ['\n\n', '\n', '. ', '.', ''],
chunkSize: 1024,
chunkOverlap: 20,
});
const second = 1000;
const minute = 60 * second;
const hour = 60 * minute;
const day = 24 * hour;
// Configure Google Gemini API
const genAI = new GoogleGenerativeAI(process.env.GOOGLE_API_KEY || '');
async function getFileContent(s3Key: string): Promise<Buffer> {
const command = new GetObjectCommand({
Bucket: process.env.UPLOADS_S3_BUCKET,
Key: s3Key,
});
const response = await uploadsS3Client.send(command);
const chunks: Buffer[] = [];
for await (const chunk of response.Body as any) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
async function retryable<T>(fn: () => Promise<T>, maxAttempts: number = 3): Promise<T> {
let attempts = 0;
while (true) {
try {
return await fn();
} catch (e) {
attempts++;
if (attempts >= maxAttempts) {
throw e;
}
}
}
}
async function runProcessPipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
// Get file content from S3
logger.log("Fetching file from S3");
if (doc.data.type !== 'file') {
throw new Error("Invalid data source type");
}
const fileData = await getFileContent(doc.data.s3Key);
// Use Gemini to extract text content
logger.log("Extracting content using Gemini");
const model = genAI.getGenerativeModel({ model: "gemini-2.0-flash-001" });
const prompt = "Extract and return only the text content from this document in markdown format. Exclude any formatting instructions or additional commentary.";
const result = await model.generateContent([
{
inlineData: {
data: fileData.toString('base64'),
mimeType: doc.data.mimeType
}
},
prompt
]);
const markdown = result.response.text();
// split into chunks
logger.log("Splitting into chunks");
const splits = await splitter.createDocuments([markdown]);
// generate embeddings
logger.log("Generating embeddings");
const { embeddings } = await embedMany({
model: embeddingModel,
values: splits.map((split) => split.pageContent)
});
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
vector: embedding,
payload: {
projectId: job.projectId,
sourceId: job._id.toString(),
docId: doc._id.toString(),
content: splits[i].pageContent,
title: doc.name,
name: doc.name,
},
}));
await qdrantClient.upsert("embeddings", {
points,
});
// store content in doc record
logger.log("Storing content in doc record");
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
content: markdown,
status: "ready",
lastUpdatedAt: new Date().toISOString(),
}
});
}
async function runDeletionPipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
// Delete embeddings from qdrant
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{
key: "projectId",
match: {
value: job.projectId,
}
},
{
key: "sourceId",
match: {
value: job._id.toString(),
}
},
{
key: "docId",
match: {
value: doc._id.toString(),
}
}
],
},
});
// Delete docs from db
logger.log("Deleting doc from db");
await dataSourceDocsCollection.deleteOne({ _id: doc._id });
}
// fetch next job from mongodb
(async () => {
while (true) {
console.log("Polling for job...")
const now = Date.now();
let job: WithId<z.infer<typeof DataSource>> | null = null;
// first try to find a job that needs deleting
job = await dataSourcesCollection.findOneAndUpdate({
status: "deleted",
$or: [
{ attempts: { $exists: false } },
{ attempts: { $lte: 3 } }
]
}, { $set: { lastAttemptAt: new Date().toISOString() }, $inc: { attempts: 1 } }, { returnDocument: "after", sort: { createdAt: 1 } });
if (job === null) {
job = await dataSourcesCollection.findOneAndUpdate(
{
$and: [
{ 'data.type': { $eq: "files" } },
{
$or: [
// if the job has never been attempted
{
status: "pending",
attempts: 0,
},
// if the job was attempted but wasn't completed in the last hour
{
status: "pending",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
// if the job errored out but hasn't been retried 3 times yet
{
status: "error",
attempts: { $lt: 3 },
},
// if the job errored out but hasn't been retried in the last 5 minutes
{
status: "error",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
]
}
]
},
{
$set: {
status: "pending",
lastAttemptAt: new Date().toISOString(),
},
$inc: {
attempts: 1
},
},
{ returnDocument: "after", sort: { createdAt: 1 } }
);
}
if (job === null) {
// if no doc found, sleep for a bit and start again
await new Promise(resolve => setTimeout(resolve, 5 * second));
continue;
}
const logger = new PrefixLogger(`${job._id.toString()}-${job.version}`);
logger.log(`Starting job ${job._id}. Type: ${job.data.type}. Status: ${job.status}`);
let errors = false;
try {
if (job.data.type !== 'files') {
throw new Error("Invalid data source type");
}
if (job.status === "deleted") {
// delete all embeddings for this source
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{ key: "projectId", match: { value: job.projectId } },
{ key: "sourceId", match: { value: job._id.toString() } },
],
},
});
// delete all docs for this source
logger.log("Deleting docs from db");
await dataSourceDocsCollection.deleteMany({
sourceId: job._id.toString(),
});
// delete the source record from db
logger.log("Deleting source record from db");
await dataSourcesCollection.deleteOne({
_id: job._id,
});
logger.log("Job deleted");
continue;
}
// fetch docs that need updating
const pendingDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: { $in: ["pending", "error"] },
}).toArray();
logger.log(`Found ${pendingDocs.length} docs to process`);
// for each doc
for (const doc of pendingDocs) {
try {
await runProcessPipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error processing doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
// fetch docs that need to be deleted
const deletedDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: "deleted",
}).toArray();
logger.log(`Found ${deletedDocs.length} docs to delete`);
for (const doc of deletedDocs) {
try {
await runDeletionPipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error deleting doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
} catch (e) {
logger.log("Error processing job; will retry:", e);
await dataSourcesCollection.updateOne({ _id: job._id, version: job.version }, { $set: { status: "error" } });
continue;
}
// mark job as complete
logger.log("Marking job as completed...");
await dataSourcesCollection.updateOne({
_id: job._id,
version: job.version,
}, {
$set: {
status: errors ? "error" : "ready",
...(errors ? { error: "There were some errors processing this job" } : {}),
}
});
}
})();

View file

@ -0,0 +1,339 @@
import '../lib/loadenv';
import FirecrawlApp from '@mendable/firecrawl-js';
import { RecursiveCharacterTextSplitter } from "@langchain/textsplitters";
import { z } from 'zod';
import { dataSourceDocsCollection, dataSourcesCollection } from '../lib/mongodb';
import { DataSource, DataSourceDoc, EmbeddingRecord } from '../lib/types';
import { WithId } from 'mongodb';
import { embedMany } from 'ai';
import { embeddingModel } from '../lib/embedding';
import { qdrantClient } from '../lib/qdrant';
import { PrefixLogger } from './shared';
const firecrawl = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY });
const firecrawlHttpAuth = {
'Authorization': `Bearer ${process.env.FIRECRAWL_API_KEY}`,
}
type Webpage = {
title: string,
url: string,
markdown: string,
html: string,
}
const splitter = new RecursiveCharacterTextSplitter({
separators: ['\n\n', '\n', '. ', '.', ''],
chunkSize: 1024,
chunkOverlap: 20,
});
const second = 1000;
const minute = 60 * second;
const hour = 60 * minute;
const day = 24 * hour;
const firecrawlStatusPollInterval = 60 * second;
/*
const source: z.infer<typeof SourceSchema> = {
_id: new ObjectId(),
url: "https://www.example.com",
type: "web",
status: 'processing',
createdAt: new Date().toISOString(),
};
*/
async function retryable<T>(fn: () => Promise<T>, maxAttempts: number = 3): Promise<T> {
let attempts = 0;
while (true) {
try {
return await fn();
} catch (e) {
attempts++;
if (attempts >= maxAttempts) {
throw e;
}
}
}
}
async function runScrapePipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
// scrape the url using firecrawl
logger.log("Scraping using Firecrawl");
const scrapeResult = await retryable(async () => {
if (doc.data.type !== 'url') {
throw new Error("Invalid data source type");
}
const scrapeResult = await firecrawl.scrapeUrl(doc.data.url, {
formats: ['markdown'],
onlyMainContent: true,
excludeTags: ['script', 'style', 'noscript', 'img',]
});
if (!scrapeResult.success) {
throw new Error("Unable to scrape URL: " + doc.data.url);
}
return scrapeResult;
}, 3); // Retry up to 3 times
// split into chunks
logger.log("Splitting into chunks");
const splits = await splitter.createDocuments([scrapeResult.markdown || '']);
// generate embeddings
logger.log("Generating embeddings");
const { embeddings } = await embedMany({
model: embeddingModel,
values: splits.map((split) => split.pageContent)
});
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
vector: embedding,
payload: {
projectId: job.projectId,
sourceId: job._id.toString(),
docId: doc._id.toString(),
content: splits[i].pageContent,
title: scrapeResult.metadata?.title || '',
name: doc.name,
},
}));
await qdrantClient.upsert("embeddings", {
points,
});
// store scraped markdown in doc record
logger.log("Storing scraped markdown in doc record");
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
content: scrapeResult.markdown,
status: "ready",
lastUpdatedAt: new Date().toISOString(),
}
});
}
async function runDeletionPipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
// Delete embeddings from qdrant
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{
key: "projectId",
match: {
value: job.projectId,
}
},
{
key: "sourceId",
match: {
value: job._id.toString(),
}
},
{
key: "docId",
match: {
value: doc._id.toString(),
}
}
],
},
});
// Delete docs from db
logger.log("Deleting doc from db");
await dataSourceDocsCollection.deleteOne({ _id: doc._id });
}
// fetch next job from mongodb
(async () => {
while (true) {
console.log("Polling for job...")
const now = Date.now();
let job: WithId<z.infer<typeof DataSource>> | null = null;
// first try to find a job that needs deleting
job = await dataSourcesCollection.findOneAndUpdate({
status: "deleted",
$or: [
{ attempts: { $exists: false } },
{ attempts: { $lte: 3 } }
]
}, { $set: { lastAttemptAt: new Date().toISOString() }, $inc: { attempts: 1 } }, { returnDocument: "after", sort: { createdAt: 1 } });
if (job === null) {
job = await dataSourcesCollection.findOneAndUpdate(
{
$and: [
{ 'data.type': { $eq: "urls" } },
{
$or: [
// if the job has never been attempted
{
status: "pending",
attempts: 0,
},
// if the job was attempted but wasn't completed in the last hour
{
status: "pending",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
// if the job errored out but hasn't been retried 3 times yet
{
status: "error",
attempts: { $lt: 3 },
},
// if the job errored out but hasn't been retried in the last 5 minutes
{
status: "error",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
]
}
]
},
{
$set: {
status: "pending",
lastAttemptAt: new Date().toISOString(),
},
$inc: {
attempts: 1
},
},
{ returnDocument: "after", sort: { createdAt: 1 } }
);
}
if (job === null) {
// if no doc found, sleep for a bit and start again
await new Promise(resolve => setTimeout(resolve, 5 * second));
continue;
}
const logger = new PrefixLogger(`${job._id.toString()}-${job.version}`);
logger.log(`Starting job ${job._id}. Type: ${job.data.type}. Status: ${job.status}`);
let errors = false;
try {
if (job.data.type !== 'urls') {
throw new Error("Invalid data source type");
}
if (job.status === "deleted") {
// delete all embeddings for this source
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{ key: "projectId", match: { value: job.projectId } },
{ key: "sourceId", match: { value: job._id.toString() } },
],
},
});
// delete all docs for this source
logger.log("Deleting docs from db");
await dataSourceDocsCollection.deleteMany({
sourceId: job._id.toString(),
});
// delete the source record from db
logger.log("Deleting source record from db");
await dataSourcesCollection.deleteOne({
_id: job._id,
});
logger.log("Job deleted");
continue;
}
// fetch docs that need updating
const pendingDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: { $in: ["pending", "error"] },
}).toArray();
logger.log(`Found ${pendingDocs.length} docs to process`);
// for each doc
for (const doc of pendingDocs) {
try {
await runScrapePipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error processing doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
// fetch docs that need to be deleted
const deletedDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: "deleted",
}).toArray();
logger.log(`Found ${deletedDocs.length} docs to delete`);
for (const doc of deletedDocs) {
try {
await runDeletionPipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error deleting doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
} catch (e) {
logger.log("Error processing job; will retry:", e);
await dataSourcesCollection.updateOne({ _id: job._id, version: job.version }, { $set: { status: "error" } });
continue;
}
// mark job as complete
logger.log("Marking job as completed...");
await dataSourcesCollection.updateOne({
_id: job._id,
version: job.version,
}, {
$set: {
status: errors ? "error" : "ready",
...(errors ? { error: "There were some errors processing this job" } : {}),
}
});
}
})();

View file

@ -0,0 +1,14 @@
import '../lib/loadenv';
import { qdrantClient } from '../lib/qdrant';
(async () => {
await qdrantClient.createCollection('embeddings', {
vectors: {
size: 1536,
distance: 'Dot',
},
});
const { collections } = await qdrantClient.getCollections();
console.log(collections);
})();

View file

@ -0,0 +1,26 @@
// create a PrefixLogger class that wraps console.log with a prefix
// and allows chaining with a parent logger
export class PrefixLogger {
private prefix: string;
private parent: PrefixLogger | null;
constructor(prefix: string, parent: PrefixLogger | null = null) {
this.prefix = prefix;
this.parent = parent;
}
log(...args: any[]) {
const timestamp = new Date().toISOString();
const prefix = '[' + this.prefix + ']';
if (this.parent) {
this.parent.log(prefix, ...args);
} else {
console.log(timestamp, prefix, ...args);
}
}
child(childPrefix: string): PrefixLogger {
return new PrefixLogger(childPrefix, this);
}
}

File diff suppressed because it is too large Load diff

View file

@ -7,12 +7,17 @@
"build": "next build",
"start": "next start",
"lint": "next lint",
"crawlUrls": "tsx app/scripts/crawlUrls.ts",
"oxytest": "tsx app/scripts/oxytest.ts"
"setupQdrant": "tsx app/scripts/setup_qdrant.ts",
"deleteQdrant": "tsx app/scripts/delete_qdrant.ts",
"ragUrlsWorker": "tsx app/scripts/rag_urls_worker.ts",
"ragFilesWorker": "tsx app/scripts/rag_files_worker.ts"
},
"dependencies": {
"@ai-sdk/openai": "^0.0.37",
"@auth0/nextjs-auth0": "^3.5.0",
"@aws-sdk/client-s3": "^3.743.0",
"@aws-sdk/s3-request-presigner": "^3.743.0",
"@google/generative-ai": "^0.21.0",
"@langchain/core": "^0.3.7",
"@langchain/textsplitters": "^0.1.0",
"@mendable/firecrawl-js": "^1.0.3",
@ -20,6 +25,7 @@
"@nextui-org/system": "^2.2.5",
"@nextui-org/theme": "^2.2.9",
"@primer/react": "^36.27.0",
"@qdrant/js-client-rest": "^1.13.0",
"ai": "^3.3.28",
"cheerio": "^1.0.0",
"class-variance-authority": "^0.7.1",
@ -35,6 +41,7 @@
"react": "^18.3.1",
"react-diff-viewer-continued": "^3.4.0",
"react-dom": "^18.3.1",
"react-dropzone": "^14.3.5",
"react-markdown": "^9.0.1",
"react-resizable-panels": "^2.1.7",
"redis": "^4.7.0",