From 2b5c28e2e8c1e1c3e4dfaacf935b269e2e6af623 Mon Sep 17 00:00:00 2001 From: Arjun <6592213+arkml@users.noreply.github.com> Date: Thu, 26 Feb 2026 20:46:16 +0530 Subject: [PATCH] threads --- .../packages/core/src/knowledge/sync_slack.ts | 181 ++++++++++++++++-- 1 file changed, 170 insertions(+), 11 deletions(-) diff --git a/apps/x/packages/core/src/knowledge/sync_slack.ts b/apps/x/packages/core/src/knowledge/sync_slack.ts index 4e55e7fd..8e5b2f37 100644 --- a/apps/x/packages/core/src/knowledge/sync_slack.ts +++ b/apps/x/packages/core/src/knowledge/sync_slack.ts @@ -66,8 +66,10 @@ function saveState(state: SyncState): void { interface SlackMessage { ts: string; + thread_ts?: string; author: { user_id: string }; content: string; + replies?: SlackMessage[]; } async function fetchMessages(workspaceUrl: string, oldestTs: string): Promise { @@ -78,6 +80,15 @@ async function fetchMessages(workspaceUrl: string, oldestTs: string): Promise { + const cmd = `agent-slack message list "#general" --workspace ${workspaceUrl} --thread-ts ${threadTs} --max-body-chars -1`; + const { stdout } = await execAsync(cmd, { timeout: 30000 }); + const parsed = JSON.parse(stdout); + const messages: SlackMessage[] = parsed.messages || []; + // First message is the parent — return only replies + return messages.slice(1); +} + async function resolveUser(userId: string, workspaceUrl: string): Promise { const cmd = `agent-slack user get ${userId} --workspace ${workspaceUrl}`; const { stdout } = await execAsync(cmd, { timeout: 10000 }); @@ -109,24 +120,148 @@ function workspaceNameFromUrl(url: string): string { } } +interface RenderedMessage { + ts: string; + author: string; + time: string; + content: string; + replies?: RenderedMessage[]; +} + +function parseExistingMessages(filePath: string): RenderedMessage[] { + if (!fs.existsSync(filePath)) return []; + const raw = fs.readFileSync(filePath, 'utf-8'); + const messages: RenderedMessage[] = []; + // Split on --- separators, then parse each block + const sections = raw.split('\n---\n'); + for (const section of sections) { + // Match top-level: ### Author — time\ncontent + const topMatch = section.match(/^[\s]*### (.+?) \u2014 (\d{4}-\d{2}-\d{2} \d{2}:\d{2} UTC)\n([\s\S]*)$/); + if (!topMatch) continue; + const msg: RenderedMessage = { + ts: '', + author: topMatch[1], + time: topMatch[2], + content: '', + replies: [], + }; + // Check if body contains replies (> **Author** — time) + const bodyLines = topMatch[3]; + const replyPattern = /^> \*\*(.+?)\*\* \u2014 (\d{4}-\d{2}-\d{2} \d{2}:\d{2} UTC)$/; + let currentContent: string[] = []; + let currentReply: RenderedMessage | null = null; + + for (const line of bodyLines.split('\n')) { + const rm = line.match(replyPattern); + if (rm) { + // Save previous reply + if (currentReply) { + currentReply.content = currentContent.join('\n').replace(/^> /gm, '').trimEnd(); + msg.replies!.push(currentReply); + } else { + // Lines before first reply are the parent content + msg.content = currentContent.join('\n').trimEnd(); + } + currentReply = { ts: '', author: rm[1], time: rm[2], content: '' }; + currentContent = []; + } else { + currentContent.push(line); + } + } + // Save last reply or parent content + if (currentReply) { + currentReply.content = currentContent.join('\n').replace(/^> /gm, '').trimEnd(); + msg.replies!.push(currentReply); + } else { + msg.content = currentContent.join('\n').trimEnd(); + } + if (msg.replies!.length === 0) delete msg.replies; + messages.push(msg); + } + return messages; +} + +function renderMessage(msg: SlackMessage, userCache: Record): RenderedMessage { + const rendered: RenderedMessage = { + ts: msg.ts, + author: userCache[msg.author.user_id] || msg.author.user_id, + time: formatTimestamp(msg.ts), + content: msg.content, + }; + if (msg.replies && msg.replies.length > 0) { + rendered.replies = msg.replies.map(r => ({ + ts: r.ts, + author: userCache[r.author.user_id] || r.author.user_id, + time: formatTimestamp(r.ts), + content: r.content, + })); + } + return rendered; +} + +function messageKey(msg: RenderedMessage): string { + return `${msg.time}|${msg.author}|${msg.content}`; +} + function buildMarkdown( workspaceUrl: string, workspaceName: string, - messages: SlackMessage[], + existingMessages: RenderedMessage[], + newMessages: SlackMessage[], userCache: Record, ): string { const displayName = workspaceName || workspaceNameFromUrl(workspaceUrl); const now = new Date().toISOString(); + const newRendered = newMessages.map(m => renderMessage(m, userCache)); + + // Deduplicate and merge: new messages replace existing (to pick up new replies) + const seen = new Map(); + + for (const msg of existingMessages) { + seen.set(messageKey(msg), msg); + } + for (const msg of newRendered) { + const key = messageKey(msg); + const existing = seen.get(key); + if (existing) { + // Merge replies: keep existing + add new + if (msg.replies) { + const existingReplies = existing.replies || []; + const replyKeys = new Set(existingReplies.map(r => messageKey(r))); + for (const r of msg.replies) { + if (!replyKeys.has(messageKey(r))) { + existingReplies.push(r); + } + } + existing.replies = existingReplies; + } + } else { + seen.set(key, msg); + } + } + + const allMessages = Array.from(seen.values()); + let md = `# #general \u2014 ${displayName}\n\n`; md += `**Workspace:** ${workspaceUrl}\n`; md += `**Channel:** #general\n`; md += `**Synced:** ${now}\n\n---\n`; - for (const msg of messages) { - const author = userCache[msg.author.user_id] || msg.author.user_id; - const time = formatTimestamp(msg.ts); - md += `\n### ${author} \u2014 ${time}\n${msg.content}\n\n---\n`; + for (const msg of allMessages) { + md += `\n### ${msg.author} \u2014 ${msg.time}\n${msg.content}\n`; + if (msg.replies && msg.replies.length > 0) { + md += '\n'; + for (const reply of msg.replies) { + md += `> **${reply.author}** \u2014 ${reply.time}\n`; + // Indent reply content with > + for (const line of reply.content.split('\n')) { + md += `> ${line}\n`; + } + md += '>\n'; + } + } + md += '\n---\n'; } return md; @@ -201,10 +336,32 @@ async function performSync(): Promise { await ensureRun(); totalMessages += messages.length; + // Fetch thread replies for messages that are thread parents + for (const msg of messages) { + if (msg.thread_ts && msg.thread_ts === msg.ts) { + try { + const replies = await fetchThreadReplies(workspace.url, msg.ts); + if (replies.length > 0) { + msg.replies = replies; + console.log(`[Slack] Fetched ${replies.length} thread replies for ${msg.ts}`); + } + } catch (err) { + console.error(`[Slack] Error fetching thread ${msg.ts}:`, err); + } + } + } + + // Collect all messages + replies for user resolution + const allMsgs: SlackMessage[] = []; + for (const msg of messages) { + allMsgs.push(msg); + if (msg.replies) allMsgs.push(...msg.replies); + } + // Batch-resolve unknown user IDs (from authors + @mentions in content) const unknownIds = new Set(); const mentionPattern = /<@(U[A-Z0-9]+)>/g; - for (const msg of messages) { + for (const msg of allMsgs) { if (msg.author?.user_id && !state.userCache[msg.author.user_id]) { unknownIds.add(msg.author.user_id); } @@ -226,18 +383,20 @@ async function performSync(): Promise { } } - // Replace @mentions in message content with resolved names - for (const msg of messages) { + // Replace @mentions in all message content with resolved names + for (const msg of allMsgs) { msg.content = msg.content.replace(/<@(U[A-Z0-9]+)>/g, (_: string, id: string) => { return `@${state.userCache[id] || id}`; }); } - // Build and write markdown + // Build and write markdown (append to existing) const wsName = workspaceNameFromUrl(workspace.url); - const md = buildMarkdown(workspace.url, workspace.name || wsName, messages, state.userCache); const filename = `${wsName}_general.md`; - fs.writeFileSync(path.join(SYNC_DIR, filename), md); + const filePath = path.join(SYNC_DIR, filename); + const existingMessages = parseExistingMessages(filePath); + const md = buildMarkdown(workspace.url, workspace.name || wsName, existingMessages, messages, state.userCache); + fs.writeFileSync(filePath, md); console.log(`[Slack] Wrote ${filename} (${messages.length} messages)`); // Update lastSyncTs to highest ts seen