add text rag source

This commit is contained in:
Ramnique Singh 2025-03-27 22:44:28 +05:30
parent e317c1eb50
commit a56b2d3a90
10 changed files with 532 additions and 9 deletions

View file

@ -1,10 +1,10 @@
import { FileIcon, FilesIcon, GlobeIcon } from "lucide-react";
import { FileIcon, FilesIcon, FileTextIcon, GlobeIcon } from "lucide-react";
export function DataSourceIcon({
type = undefined,
size = "sm",
}: {
type?: "crawl" | "urls" | "files" | undefined;
type?: "crawl" | "urls" | "files" | "text" | undefined;
size?: "sm" | "md";
}) {
const sizeClass = size === "sm" ? "w-4 h-4" : "w-6 h-6";
@ -13,5 +13,6 @@ export function DataSourceIcon({
{type == "crawl" && <GlobeIcon className={sizeClass} />}
{type == "urls" && <GlobeIcon className={sizeClass} />}
{type == "files" && <FilesIcon className={sizeClass} />}
{type == "text" && <FileTextIcon className={sizeClass} />}
</>;
}

View file

@ -1,4 +1,5 @@
import { z } from "zod";
export const DataSource = z.object({
name: z.string(),
projectId: z.string(),
@ -23,8 +24,13 @@ export const DataSource = z.object({
z.object({
type: z.literal('files'),
}),
z.object({
type: z.literal('text'),
})
]),
});export const DataSourceDoc = z.object({
});
export const DataSourceDoc = z.object({
sourceId: z.string(),
name: z.string(),
version: z.number(),
@ -50,8 +56,13 @@ export const DataSource = z.object({
mimeType: z.string(),
s3Key: z.string(),
}),
z.object({
type: z.literal('text'),
content: z.string(),
}),
]),
});
export const EmbeddingDoc = z.object({
content: z.string(),
sourceId: z.string(),
@ -74,5 +85,4 @@ export const EmbeddingRecord = z.object({
title: z.string(),
name: z.string(),
}),
});
});

View file

@ -13,6 +13,7 @@ import { TableLabel, TableValue } from "./shared";
import { ScrapeSource } from "./scrape-source";
import { FilesSource } from "./files-source";
import { getDataSource } from "../../../../actions/datasource_actions";
import { TextSource } from "./text-source";
export function SourcePage({
sourceId,
@ -118,6 +119,10 @@ export function SourcePage({
<DataSourceIcon type="files" />
<div>File upload</div>
</div>}
{source.data.type === 'text' && <div className="flex gap-1 items-center">
<DataSourceIcon type="text" />
<div>Text</div>
</div>}
</TableValue>
</tr>
<tr>
@ -131,6 +136,7 @@ export function SourcePage({
</PageSection>
{source.data.type === 'urls' && <ScrapeSource projectId={projectId} dataSource={source} handleReload={handleReload} />}
{source.data.type === 'files' && <FilesSource projectId={projectId} dataSource={source} handleReload={handleReload} />}
{source.data.type === 'text' && <TextSource projectId={projectId} dataSource={source} handleReload={handleReload} />}
<PageSection title="Danger zone">
<div className="flex flex-col gap-2 items-start">

View file

@ -0,0 +1,128 @@
"use client";
import { PageSection } from "../../../../lib/components/page-section";
import { WithStringId } from "../../../../lib/types/types";
import { DataSource } from "../../../../lib/types/datasource_types";
import { z } from "zod";
import { useState, useEffect } from "react";
import { Textarea } from "@heroui/react";
import { FormStatusButton } from "../../../../lib/components/form-status-button";
import { Spinner } from "@heroui/react";
import { addDocsToDataSource, deleteDocsFromDataSource, listDocsInDataSource } from "../../../../actions/datasource_actions";
export function TextSource({
projectId,
dataSource,
handleReload,
}: {
projectId: string,
dataSource: WithStringId<z.infer<typeof DataSource>>,
handleReload: () => void;
}) {
const [content, setContent] = useState("");
const [docId, setDocId] = useState<string | null>(null);
const [isLoading, setIsLoading] = useState(true);
const [isSaving, setIsSaving] = useState(false);
useEffect(() => {
let ignore = false;
async function fetchContent() {
setIsLoading(true);
try {
const { files } = await listDocsInDataSource({
projectId,
sourceId: dataSource._id,
limit: 1,
});
console.log('got data', files);
if (!ignore && files.length > 0) {
const doc = files[0];
if (doc.data.type === 'text') {
setContent(doc.data.content);
setDocId(doc._id);
}
}
} catch (error) {
console.error('Error fetching content:', error);
} finally {
setIsLoading(false);
}
}
fetchContent();
return () => {
ignore = true;
};
}, [projectId, dataSource._id]);
async function handleSubmit(formData: FormData) {
setIsSaving(true);
try {
const newContent = formData.get('content') as string;
// Delete existing doc if it exists
if (docId) {
await deleteDocsFromDataSource({
projectId,
sourceId: dataSource._id,
docIds: [docId],
});
}
// Add new doc
await addDocsToDataSource({
projectId,
sourceId: dataSource._id,
docData: [{
name: 'text',
data: {
type: 'text',
content: newContent,
},
}],
});
handleReload();
} finally {
setIsSaving(false);
}
}
if (isLoading) {
return (
<PageSection title="Content">
<div className="flex items-center justify-center gap-2">
<Spinner size="sm" />
<p>Loading content...</p>
</div>
</PageSection>
);
}
return (
<PageSection title="Content">
<form action={handleSubmit} className="flex flex-col gap-4">
<Textarea
name="content"
label="Text content"
labelPlacement="outside"
value={content}
onValueChange={setContent}
minRows={10}
maxRows={20}
variant="bordered"
/>
<FormStatusButton
props={{
type: "submit",
children: "Update content",
className: "self-start",
isLoading: isSaving,
}}
/>
</form>
</PageSection>
);
}

View file

@ -60,6 +60,32 @@ export function Form({
router.push(`/projects/${projectId}/sources/${source._id}`);
}
async function createTextDataSource(formData: FormData) {
const source = await createDataSource({
projectId,
name: formData.get('name') as string,
data: {
type: 'text',
},
status: 'pending',
});
const content = formData.get('content') as string;
await addDocsToDataSource({
projectId,
sourceId: source._id,
docData: [{
name: 'text',
data: {
type: 'text',
content,
},
}],
});
router.push(`/projects/${projectId}/sources/${source._id}`);
}
function handleSourceTypeChange(event: React.ChangeEvent<HTMLSelectElement>) {
setSourceType(event.target.value);
}
@ -75,6 +101,12 @@ export function Form({
...(useRagScraping ? [] : ['urls']),
]}
>
<SelectItem
key="text"
startContent={<DataSourceIcon type="text" />}
>
Text
</SelectItem>
<SelectItem
key="urls"
startContent={<DataSourceIcon type="urls" />}
@ -87,7 +119,7 @@ export function Form({
>
Upload files
</SelectItem>
</Select>
</Select>
{sourceType === "urls" && <form
action={createUrlsDataSource}
@ -159,6 +191,39 @@ export function Form({
}}
/>
</form>}
{sourceType === "text" && <form
action={createTextDataSource}
className="flex flex-col gap-4"
>
<Textarea
required
type="text"
name="content"
label="Text content"
labelPlacement="outside"
minRows={10}
maxRows={30}
/>
<div className="self-start">
<Input
required
type="text"
name="name"
labelPlacement="outside"
placeholder="e.g. Product documentation"
variant="bordered"
/>
</div>
<FormStatusButton
props={{
type: "submit",
children: "Add data source",
className: "self-start",
startContent: <PlusIcon className="w-[24px] h-[24px]" />
}}
/>
</form>}
</div>
</div>;
}

View file

@ -86,6 +86,14 @@ export function SourcesList({
<DataSourceIcon type="urls" />
<div>List URLs</div>
</div>}
{source.data.type == 'text' && <div className="flex gap-1 items-center">
<DataSourceIcon type="text" />
<div>Text</div>
</div>}
{source.data.type == 'files' && <div className="flex gap-1 items-center">
<DataSourceIcon type="files" />
<div>Files</div>
</div>}
</td>
<td className="py-4">
<SelfUpdatingSourceStatus sourceId={source._id} projectId={projectId} initialStatus={source.status} compact={true} />

View file

@ -0,0 +1,285 @@
import '../lib/loadenv';
import { RecursiveCharacterTextSplitter } from "@langchain/textsplitters";
import { z } from 'zod';
import { dataSourceDocsCollection, dataSourcesCollection } from '../lib/mongodb';
import { EmbeddingRecord, DataSourceDoc, DataSource } from "../lib/types/datasource_types";
import { WithId } from 'mongodb';
import { embedMany } from 'ai';
import { embeddingModel } from '../lib/embedding';
import { qdrantClient } from '../lib/qdrant';
import { PrefixLogger } from "../lib/utils";
import crypto from 'crypto';
const splitter = new RecursiveCharacterTextSplitter({
separators: ['\n\n', '\n', '. ', '.', ''],
chunkSize: 1024,
chunkOverlap: 20,
});
const second = 1000;
const minute = 60 * second;
const hour = 60 * minute;
async function runProcessPipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
if (doc.data.type !== 'text') {
throw new Error("Invalid data source type");
}
// split into chunks
logger.log("Splitting into chunks");
const splits = await splitter.createDocuments([doc.data.content]);
// generate embeddings
logger.log("Generating embeddings");
const { embeddings } = await embedMany({
model: embeddingModel,
values: splits.map((split) => split.pageContent)
});
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
vector: embedding,
payload: {
projectId: job.projectId,
sourceId: job._id.toString(),
docId: doc._id.toString(),
content: splits[i].pageContent,
title: doc.name,
name: doc.name,
},
}));
await qdrantClient.upsert("embeddings", {
points,
});
// store content in doc record
logger.log("Storing content in doc record");
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
content: doc.data.content,
status: "ready",
lastUpdatedAt: new Date().toISOString(),
}
});
}
async function runDeletionPipeline(_logger: PrefixLogger, job: WithId<z.infer<typeof DataSource>>, doc: WithId<z.infer<typeof DataSourceDoc>>): Promise<void> {
const logger = _logger
.child(doc._id.toString())
.child(doc.name);
// Delete embeddings from qdrant
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{
key: "projectId",
match: {
value: job.projectId,
}
},
{
key: "sourceId",
match: {
value: job._id.toString(),
}
},
{
key: "docId",
match: {
value: doc._id.toString(),
}
}
],
},
});
// Delete docs from db
logger.log("Deleting doc from db");
await dataSourceDocsCollection.deleteOne({ _id: doc._id });
}
// fetch next job from mongodb
(async () => {
while (true) {
console.log("Polling for job...")
const now = Date.now();
let job: WithId<z.infer<typeof DataSource>> | null = null;
// first try to find a job that needs deleting
job = await dataSourcesCollection.findOneAndUpdate({
status: "deleted",
$or: [
{ attempts: { $exists: false } },
{ attempts: { $lte: 3 } }
]
}, { $set: { lastAttemptAt: new Date().toISOString() }, $inc: { attempts: 1 } }, { returnDocument: "after", sort: { createdAt: 1 } });
if (job === null) {
job = await dataSourcesCollection.findOneAndUpdate(
{
$and: [
{ 'data.type': { $eq: "text" } },
{
$or: [
// if the job has never been attempted
{
status: "pending",
attempts: 0,
},
// if the job was attempted but wasn't completed in the last hour
{
status: "pending",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
// if the job errored out but hasn't been retried 3 times yet
{
status: "error",
attempts: { $lt: 3 },
},
// if the job errored out but hasn't been retried in the last 5 minutes
{
status: "error",
lastAttemptAt: { $lt: new Date(now - 1 * hour).toISOString() },
},
]
}
]
},
{
$set: {
status: "pending",
lastAttemptAt: new Date().toISOString(),
},
$inc: {
attempts: 1
},
},
{ returnDocument: "after", sort: { createdAt: 1 } }
);
}
if (job === null) {
// if no doc found, sleep for a bit and start again
await new Promise(resolve => setTimeout(resolve, 5 * second));
continue;
}
const logger = new PrefixLogger(`${job._id.toString()}-${job.version}`);
logger.log(`Starting job ${job._id}. Type: ${job.data.type}. Status: ${job.status}`);
let errors = false;
try {
if (job.data.type !== 'text') {
throw new Error("Invalid data source type");
}
if (job.status === "deleted") {
// delete all embeddings for this source
logger.log("Deleting embeddings from Qdrant");
await qdrantClient.delete("embeddings", {
filter: {
must: [
{ key: "projectId", match: { value: job.projectId } },
{ key: "sourceId", match: { value: job._id.toString() } },
],
},
});
// delete all docs for this source
logger.log("Deleting docs from db");
await dataSourceDocsCollection.deleteMany({
sourceId: job._id.toString(),
});
// delete the source record from db
logger.log("Deleting source record from db");
await dataSourcesCollection.deleteOne({
_id: job._id,
});
logger.log("Job deleted");
continue;
}
// fetch docs that need updating
const pendingDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: { $in: ["pending", "error"] },
}).toArray();
logger.log(`Found ${pendingDocs.length} docs to process`);
// for each doc
for (const doc of pendingDocs) {
try {
await runProcessPipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error processing doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
// fetch docs that need to be deleted
const deletedDocs = await dataSourceDocsCollection.find({
sourceId: job._id.toString(),
status: "deleted",
}).toArray();
logger.log(`Found ${deletedDocs.length} docs to delete`);
for (const doc of deletedDocs) {
try {
await runDeletionPipeline(logger, job, doc);
} catch (e: any) {
errors = true;
logger.log("Error deleting doc:", e);
await dataSourceDocsCollection.updateOne({
_id: doc._id,
version: doc.version,
}, {
$set: {
status: "error",
error: e.message,
}
});
}
}
} catch (e) {
logger.log("Error processing job; will retry:", e);
await dataSourcesCollection.updateOne({ _id: job._id, version: job.version }, { $set: { status: "error" } });
continue;
}
// mark job as complete
logger.log("Marking job as completed...");
await dataSourcesCollection.updateOne({
_id: job._id,
version: job.version,
}, {
$set: {
status: errors ? "error" : "ready",
...(errors ? { error: "There were some errors processing this job" } : {}),
}
});
}
})();

View file

@ -11,7 +11,8 @@
"setupQdrant": "tsx app/scripts/setup_qdrant.ts",
"deleteQdrant": "tsx app/scripts/delete_qdrant.ts",
"ragUrlsWorker": "tsx app/scripts/rag_urls_worker.ts",
"ragFilesWorker": "tsx app/scripts/rag_files_worker.ts"
"ragFilesWorker": "tsx app/scripts/rag_files_worker.ts",
"ragTextWorker": "tsx app/scripts/rag_text_worker.ts"
},
"dependencies": {
"@ai-sdk/openai": "^0.0.37",

View file

@ -15,7 +15,10 @@ data_sources_collection = db['sources']
data_source_docs_collection = db['source_docs']
qdrant_client = QdrantClient(url=os.environ.get("QDRANT_URL"))
qdrant_client = QdrantClient(
url=os.environ.get("QDRANT_URL"),
api_key=os.environ.get("QDRANT_API_KEY") or None
)
# Initialize OpenAI client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
@ -65,7 +68,7 @@ async def call_rag_tool(
# Create embedding for the query
embed_result = await embed(model=embedding_model, value=query)
print(embed_result)
# print(embed_result)
# Fetch all active data sources for this project
sources = await data_sources_collection.find({
"projectId": project_id,

View file

@ -49,6 +49,9 @@ services:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- API_KEY=${AGENTS_API_KEY}
- REDIS_URL=redis://redis:6379
- MONGODB_URI=${MONGODB_CONNECTION_STRING}
- QDRANT_URL=${QDRANT_URL}
- QDRANT_API_KEY=${QDRANT_API_KEY}
restart: unless-stopped
copilot:
@ -136,6 +139,19 @@ services:
- QDRANT_API_KEY=${QDRANT_API_KEY}
restart: unless-stopped
rag_text_worker:
build:
context: ./apps/rowboat
dockerfile: scripts.Dockerfile
command: ["npm", "run", "ragTextWorker"]
profiles: [ "rag_text_worker" ]
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- MONGODB_CONNECTION_STRING=${MONGODB_CONNECTION_STRING}
- QDRANT_URL=${QDRANT_URL}
- QDRANT_API_KEY=${QDRANT_API_KEY}
restart: unless-stopped
chat_widget:
build:
context: ./apps/chat_widget