feat: ship Slack as a knowledge source, hardened for production (#596)

* index slack and add to home page

* filter only useful slack messages in homr

* feat: bundle agent-slack CLI and route all calls through shared executor

Pins agent-slack@0.9.3, bundles it next to main.cjs (replaces the startup npm install -g), adds a structured-result executor with bundled/global/PATH resolution, a slack:cliStatus IPC probe, and a PATH shim so the Copilot skill keeps working.

* feat: surface Slack failures and add cross-OS auth fallbacks

Classify agent-slack errors (not_authed/rate_limited/network/bad_channel), persist per-source sync status with rate-limit backoff, and expose it via slack:knowledgeStatus. Fix the Settings Enable bounce-back with actionable copy, a browser-paste (parse-curl) fallback, and a Windows quit-Slack-and-import button; add home-feed empty/error states.

* feat: rank Slack home feed deterministically by recency

Drop the per-load LLM ranker (cost/latency/model dependency) in favor of a stronger deterministic filter + recency ordering. The filter now removes system messages, emoji/reaction-only posts, bare greetings/acks, and empty bodies, with a durable-signal escape hatch. Expand tests to one describe per noise class plus ordering/cap/volume coverage.

* fix: hide Slack knowledge Save button once saved

Only show the Save button when the channel list or enabled toggle differs from the last-persisted config, so it disappears after a successful save and reappears when a new channel is entered.

---------

Co-authored-by: Gagancreates <gaganp000999@gmail.com>
This commit is contained in:
arkml 2026-06-18 01:22:27 +05:30 committed by GitHub
parent 2ddec07712
commit 79162ebc69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 2979 additions and 66 deletions

View file

@ -2,6 +2,7 @@ import { z, ZodType } from "zod";
import * as path from "path";
import * as fs from "fs/promises";
import { executeCommand, executeCommandAbortable } from "./command-executor.js";
import { agentSlackShimEnv } from "../../slack/agent-slack-exec.js";
import { resolveSkill, availableSkills } from "../assistant/skills/index.js";
import { executeTool, listServers, listTools } from "../../mcp/mcp.js";
import container from "../../di/container.js";
@ -740,6 +741,9 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
try {
const rootDir = path.resolve(WorkDir);
const workingDir = cwd ? path.resolve(rootDir, cwd) : rootDir;
// Make `agent-slack` resolvable for skill-authored shell
// commands; the shim forwards to the bundled CLI.
const env = agentSlackShimEnv(path.join(rootDir, 'bin'));
// TODO: Re-enable this check
// const rootPrefix = rootDir.endsWith(path.sep)
@ -758,6 +762,7 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
if (ctx?.signal) {
const { promise, process: proc } = executeCommandAbortable(command, {
cwd: workingDir,
env,
signal: ctx.signal,
onData: (chunk: string) => {
ctx.publish({
@ -788,7 +793,7 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
}
// Fallback to original for backward compatibility
const result = await executeCommand(command, { cwd: workingDir });
const result = await executeCommand(command, { cwd: workingDir, env });
return {
success: result.exitCode === 0,

View file

@ -18,6 +18,9 @@ import { buildKnowledgeIndex, formatIndexForPrompt } from './knowledge_index.js'
import { limitEventItems } from './limit_event_items.js';
import { commitAll } from './version_history.js';
import { getTagDefinitions } from './tag_system.js';
import { knowledgeSourcesRepo } from './sources/repo.js';
import { syncSlackKnowledgeSources } from './sources/sync_slack.js';
import type { KnowledgeSourceConfig } from './sources/types.js';
/**
* Build obsidian-style knowledge graph by running topic extraction
@ -35,12 +38,11 @@ const LEGACY_SUGGESTED_TOPICS_KNOWLEDGE_PATH = path.join(WorkDir, 'knowledge', '
// Configuration for the graph builder service
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
const SOURCE_FOLDERS = [
'gmail_sync',
path.join('knowledge', 'Meetings', 'fireflies'),
path.join('knowledge', 'Meetings', 'granola'),
path.join('knowledge', 'Meetings', 'rowboat'),
];
function getEnabledFileSources(): KnowledgeSourceConfig[] {
return knowledgeSourcesRepo
.listEnabledSources()
.filter(source => source.provider !== 'voice_memo');
}
// Voice memos are now created directly in knowledge/Voice Memos/<date>/
const VOICE_MEMOS_KNOWLEDGE_DIR = path.join(NOTES_OUTPUT_DIR, 'Voice Memos');
@ -643,6 +645,15 @@ export async function processAllSources(): Promise<void> {
let anyFilesProcessed = false;
try {
const slackFiles = await syncSlackKnowledgeSources();
if (slackFiles.length > 0) {
console.log(`[GraphBuilder] Slack sync wrote ${slackFiles.length} artifact files`);
}
} catch (error) {
console.error('[GraphBuilder] Error syncing Slack knowledge sources:', error);
}
// Process voice memos first (they get moved to knowledge/)
try {
const voiceMemosProcessed = await processVoiceMemosForKnowledge();
@ -654,12 +665,13 @@ export async function processAllSources(): Promise<void> {
}
const state = loadState();
const folderChanges: { folder: string; sourceDir: string; files: string[] }[] = [];
const folderChanges: { source: KnowledgeSourceConfig; sourceDir: string; files: string[] }[] = [];
const countsByFolder: Record<string, number> = {};
const allFiles: string[] = [];
const fileSources = getEnabledFileSources();
for (const folder of SOURCE_FOLDERS) {
const sourceDir = path.join(WorkDir, folder);
for (const source of fileSources) {
const sourceDir = path.join(WorkDir, source.artifactDir);
// Skip if folder doesn't exist
if (!fs.existsSync(sourceDir)) {
@ -671,7 +683,7 @@ export async function processAllSources(): Promise<void> {
let filesToProcess = getFilesToProcess(sourceDir, state);
// For gmail_sync, only process emails that have been labeled AND don't have noise filter tags
if (folder === 'gmail_sync') {
if (source.provider === 'gmail') {
filesToProcess = filesToProcess.filter(filePath => {
try {
const content = fs.readFileSync(filePath, 'utf-8');
@ -690,13 +702,13 @@ export async function processAllSources(): Promise<void> {
}
if (filesToProcess.length > 0) {
console.log(`[GraphBuilder] Found ${filesToProcess.length} new/changed files in ${folder}`);
folderChanges.push({ folder, sourceDir, files: filesToProcess });
countsByFolder[folder] = filesToProcess.length;
console.log(`[GraphBuilder] Found ${filesToProcess.length} new/changed files in ${source.id}`);
folderChanges.push({ source, sourceDir, files: filesToProcess });
countsByFolder[source.id] = filesToProcess.length;
allFiles.push(...filesToProcess);
}
} catch (error) {
console.error(`[GraphBuilder] Error processing ${folder}:`, error);
console.error(`[GraphBuilder] Error processing ${source.id}:`, error);
// Continue with other folders even if one fails
}
}
@ -706,7 +718,7 @@ export async function processAllSources(): Promise<void> {
service: 'graph',
message: 'Syncing knowledge graph',
trigger: 'timer',
config: { sources: SOURCE_FOLDERS },
config: { sources: fileSources.map(source => source.id) },
});
const relativeFiles = allFiles.map(filePath => path.relative(WorkDir, filePath));
@ -770,7 +782,8 @@ export async function processAllSources(): Promise<void> {
*/
export async function init() {
console.log('[GraphBuilder] Starting Knowledge Graph Builder Service...');
console.log(`[GraphBuilder] Monitoring folders: ${SOURCE_FOLDERS.join(', ')}, knowledge/Voice Memos`);
const sourceFolders = getEnabledFileSources().map(source => source.artifactDir);
console.log(`[GraphBuilder] Monitoring folders: ${sourceFolders.join(', ')}, knowledge/Voice Memos`);
console.log(`[GraphBuilder] Will check for new content every ${SYNC_INTERVAL_MS / 1000} seconds`);
// Initial run

View file

@ -30,16 +30,16 @@ tools:
**Current date and time:** ${new Date().toISOString()}
Sources (emails, meetings, voice memos) are processed in roughly chronological order. This means:
Sources (emails, meetings, voice memos, Slack messages, and connected-tool artifacts) are processed in roughly chronological order. This means:
- Earlier sources may reference events that have since occurred later sources will provide updates.
- If a source mentions a future meeting or deadline, it may already be in the past by now. Use the current date above to reason about what is past vs. upcoming.
- Don't treat old commitments as still "open" if later sources or the current date suggest they've likely been resolved.
# Task
You are a memory agent. You are given one or more source files (emails, meeting transcripts, or voice memos) to process. **The files in a request are independent of each other** they are batched together only for efficiency, not because they are related. Process each source file on its own terms (see "Source Scoping" below). For each source file you will:
You are a memory agent. You are given one or more source files (emails, meeting transcripts, voice memos, Slack messages, or other connected-tool artifacts) to process. **The files in a request are independent of each other** they are batched together only for efficiency, not because they are related. Process each source file on its own terms (see "Source Scoping" below). For each source file you will:
1. **Determine source type (meeting or email)**
1. **Determine source type (meeting, email, voice memo, Slack, or connected-tool artifact)**
2. **Evaluate if the source is worth processing**
3. **Search for all existing related notes**
4. **Resolve entities to canonical names**
@ -49,7 +49,7 @@ You are a memory agent. You are given one or more source files (emails, meeting
8. Create new notes or update existing notes
9. **Apply state changes to existing notes**
The core rule: **Both meetings and emails can create notes, but emails require personalized content and a new People/Organization note from an email also requires the user to have replied at least once in the thread (the Email Reply Gate). Emails can always update existing notes regardless.**
The core rule: **Meetings and voice memos can create notes freely. Emails require personalized content and a new People/Organization note from an email also requires the user to have replied at least once in the thread (the Email Reply Gate). Slack and connected-tool artifacts can update existing notes when they carry clear state changes, decisions, commitments, or project facts; they should create new notes only when the artifact clearly identifies a durable person, organization, project, or topic worth tracking.**
# Source Scoping (Batch Isolation) READ FIRST
@ -75,7 +75,7 @@ You have full read access to the existing knowledge directory. Use this extensiv
# Inputs
1. **source_file**: Path to a single file to process (email or meeting transcript)
1. **source_file**: Path to a single file to process (email, meeting transcript, voice memo, Slack message, or connected-tool artifact)
2. **knowledge_folder**: Path to Obsidian vault (read/write access)
3. **user**: Information about the owner of this memory
- name: e.g., "Arj"
@ -170,7 +170,7 @@ ${renderNoteEffectRules()}
# Step 0: Determine Source Type
Read the source file and determine if it's a meeting or email.
Read the source file and determine its source type.
\`\`\`
file-readText({ path: "{source_file}" })
\`\`\`
@ -191,10 +191,22 @@ file-readText({ path: "{source_file}" })
- Has frontmatter \`path:\` field like \`Voice Memos/YYYY-MM-DD/...\`
- Has \`## Transcript\` section
**Slack indicators:**
- YAML frontmatter has \`source: slack\`
- Source file path is under \`knowledge_sources/slack/\`
- Contains fields like \`Workspace:\`, \`Channel:\`, \`Author:\`, \`Thread TS:\`, or a \`## Message\` section
**Connected-tool artifact indicators:**
- YAML frontmatter has \`source:\` set to a provider like \`github\`, \`linear\`, \`jira\`, \`notion\`, etc.
- Source file path is under \`knowledge_sources/<provider>/\`
- Contains issue, PR, task, ticket, comment, status, or project metadata
**Set processing mode:**
- \`source_type = "meeting"\` → Can create new notes
- \`source_type = "email"\` → Can create notes if personalized and relevant
- \`source_type = "voice_memo"\` → Can create new notes (treat like meetings)
- \`source_type = "slack"\` → Prefer updating existing project/person/topic notes; create new notes only for clear durable entities
- \`source_type = "connected_tool"\` → Prefer updating existing project/topic notes; create new notes only for durable projects, organizations, repositories, issues, or initiatives
---
@ -240,6 +252,22 @@ Emails containing calendar invites (\`.ics\` attachments or inline calendar data
## For Meetings and Voice Memos
Always process no filtering needed.
## For Slack Messages
Process Slack messages only when they contain durable knowledge:
- Decisions, approvals, changes in project status, blockers, owners, deadlines, handoffs, or commitments
- Facts about people, organizations, projects, customers, product areas, repositories, issues, or incidents
- Meaningful summaries in long threads
Skip Slack messages that are only acknowledgements, greetings, jokes, reactions, short coordination with no durable outcome, or vague statements that cannot be resolved to a known entity. For ambiguous updates like "x is done", update an existing note only if \`x\` resolves clearly from the message, channel, thread, or existing knowledge index. If it does not resolve clearly, skip rather than inventing a fact.
## For Connected-Tool Artifacts
Process artifacts from GitHub, Linear, Jira, and similar tools when they carry project or work-state changes:
- Issue/PR/task created, assigned, closed, merged, reopened, blocked, or reprioritized
- Status, owner, milestone, deadline, release, incident, customer, or decision changes
- Comments that clarify requirements, decisions, blockers, or commitments
Skip routine metadata churn and duplicated notifications unless they change durable knowledge.
## For Emails Read YAML Frontmatter
Emails have YAML frontmatter with labels prepended by the labeling agent:

View file

@ -0,0 +1,126 @@
import { describe, expect, it } from 'vitest';
import {
filterSlackHomeCandidatesForRelevance,
rankSlackHomeMessages,
SlackHomeRankCandidate,
} from './rank_slack_home.js';
function slackTs(dateMs: number): string {
return `${Math.floor(dateMs / 1000)}.000000`;
}
const NOW = Date.parse('2026-06-04T18:00:00Z');
const recent = (text: string, id = text): SlackHomeRankCandidate => ({
id,
channelName: 'general',
text,
ts: slackTs(NOW - 5 * 60 * 1000),
});
function keptIds(candidates: SlackHomeRankCandidate[]): string[] {
return filterSlackHomeCandidatesForRelevance(candidates, NOW).map(c => c.id);
}
describe('filterSlackHomeCandidatesForRelevance', () => {
describe('routine standup logistics', () => {
it('drops stale standup logistics but keeps recent ones and durable updates', () => {
const nineHoursAgo = NOW - 9 * 60 * 60 * 1000;
const twelveHoursAgo = NOW - 12 * 60 * 60 * 1000;
const thirtyMinutesAgo = NOW - 30 * 60 * 1000;
const candidates: SlackHomeRankCandidate[] = [
{ id: 'stale-standup-schedule', channelName: 'general', text: 'standup at 4pm possible?', ts: slackTs(nineHoursAgo) },
{ id: 'stale-standup-sick', channelName: 'general', text: 'ill skip todays standup I am having stomach ache and not feeling well', ts: slackTs(twelveHoursAgo) },
{ id: 'durable-issue-update', channelName: 'general', text: 'is the icon issue fixed for windows?', ts: slackTs(twelveHoursAgo) },
{ id: 'recent-standup-schedule', channelName: 'general', text: 'standup at 4pm possible?', ts: slackTs(thirtyMinutesAgo) },
];
expect(keptIds(candidates)).toEqual(['durable-issue-update', 'recent-standup-schedule']);
});
});
describe('system / automated messages', () => {
it('drops channel join/leave, topic, rename and call notices', () => {
const candidates = [
recent('Alex has joined the channel', 'join'),
recent('Sam has left the channel', 'leave'),
recent('Alex set the channel topic: Q3 planning', 'topic'),
recent('Sam renamed the channel to design-team', 'rename'),
recent('Alex started a huddle', 'huddle'),
recent('Real question: can someone review my PR?', 'real'),
];
expect(keptIds(candidates)).toEqual(['real']);
});
it('keeps a system-shaped message that carries a durable signal', () => {
const candidates = [recent('Priya set the channel topic: incident response war room', 'topic-incident')];
expect(keptIds(candidates)).toEqual(['topic-incident']);
});
});
describe('emoji / reaction-only', () => {
it('drops emoji-only, shortcode-only and punctuation-only posts', () => {
const candidates = [
recent('👍', 'thumbs'),
recent('🎉🎉🎉', 'party'),
recent(':tada: :rocket:', 'shortcodes'),
recent('!!!', 'punct'),
recent('🚀 shipping the new pricing page today', 'real'),
];
expect(keptIds(candidates)).toEqual(['real']);
});
});
describe('greetings / acknowledgements', () => {
it('drops bare greetings and acks but keeps anything with content', () => {
const candidates = [
recent('thanks!', 'thanks'),
recent('gm', 'gm'),
recent('lgtm', 'lgtm'),
recent('+1', 'plus1'),
recent('sounds good', 'sg'),
recent('ok', 'ok'),
recent('ok, the deploy is blocked on the migration', 'ok-with-content'),
recent('thanks for fixing the outage', 'thanks-durable'),
];
// 'ok-with-content' kept (has content); 'thanks-durable' kept (durable signal).
expect(keptIds(candidates)).toEqual(['ok-with-content', 'thanks-durable']);
});
});
it('drops empty-text candidates', () => {
expect(keptIds([recent(' ', 'blank'), recent('a real message here', 'real')])).toEqual(['real']);
});
});
describe('rankSlackHomeMessages (deterministic)', () => {
it('orders surviving candidates newest-first and caps at the limit', async () => {
const mk = (id: string, minutesAgo: number): SlackHomeRankCandidate => ({
id, channelName: 'general', text: `update ${id}`, ts: slackTs(NOW - minutesAgo * 60 * 1000),
});
const candidates = [mk('old', 50), mk('newest', 1), mk('mid', 20)];
expect(await rankSlackHomeMessages(candidates, 5)).toEqual(['newest', 'mid', 'old']);
expect(await rankSlackHomeMessages(candidates, 2)).toEqual(['newest', 'mid']);
});
it('filters noise before ranking', async () => {
const candidates = [
recent('👍', 'emoji'),
recent('Alex has joined the channel', 'join'),
recent('can you review the pricing proposal?', 'real'),
];
expect(await rankSlackHomeMessages(candidates, 5)).toEqual(['real']);
});
it('handles a high-volume batch: caps output and preserves recency order', async () => {
const candidates: SlackHomeRankCandidate[] = Array.from({ length: 150 }, (_, i) => ({
id: `m${i}`,
channelName: 'general',
// i=0 is newest; larger i is older.
text: `status update number ${i}`,
ts: slackTs(NOW - i * 60 * 1000),
}));
const ranked = await rankSlackHomeMessages(candidates, 5);
expect(ranked).toEqual(['m0', 'm1', 'm2', 'm3', 'm4']);
});
});

View file

@ -0,0 +1,92 @@
export type SlackHomeRankCandidate = {
id: string;
workspaceName?: string;
channelName?: string;
author?: string;
text: string;
ts: string;
};
const EXPIRED_ROUTINE_AGE_MS = 2 * 60 * 60 * 1000;
const ROUTINE_EVENT_RE = /\b(stand[-\s]?up|daily\s+(sync|scrum|standup)|scrum|check[-\s]?in)\b/i;
const ROUTINE_LOGISTICS_RE = /\b(skip|skipping|miss|missing|can't|cannot|cant|won't|wont|join|attend|possible|move|reschedule|shift|late|running\s+late|stomach|sick|not\s+feeling|headache|doctor|appointment|today|todays|today's|tomorrow|at\s+\d{1,2}(:\d{2})?\s*(am|pm)?)\b/i;
// Durable signals always win: a message matching any of these is kept even if
// it would otherwise look like noise (a system message, a "done", etc.).
const DURABLE_SIGNAL_RE = /\b(blocker|blocked|decision|decided|owner|deadline|shipped|fixed|done|launched|deployed|merged|bug|issue|incident|outage|customer|contract|pricing|proposal|launch|release|handoff|review|approval|approved)\b/i;
// Slack system / automated messages render as plain narration like
// "<name> has joined the channel". They carry no human content, so drop them.
const SYSTEM_MESSAGE_RE = /\b(has joined the channel|has left the channel|was added to|has been added|set the channel (topic|purpose|description)|cleared the channel (topic|purpose)|renamed the channel|archived the channel|un-?archived the channel|pinned a message|joined the (call|huddle)|started a (call|huddle)|set up a call)\b/i;
// Greetings / acknowledgements with no informational content. Anchored to the
// whole (trimmed) message so "ok" drops but "ok, the deploy is blocked" stays.
const TRIVIAL_RE = /^(hi|hello+|hey+|yo|gm|gn|good\s*(morning|night|evening|afternoon)|morning|thanks?|thank\s*you|ty|thx|tysm|np|no\s*problem|ok(ay)?|k|got\s*it|gotcha|lgtm|\+1|nice|cool|great|awesome|perfect|done|yes+|yep|yup|no+|nope|sure|sounds?\s*good|sg|welcome|congrats?|congratulations)[\s.!?]*$/i;
const EMOJI_SHORTCODE_RE = /:[a-z0-9_+-]+:/gi;
function slackTsToMs(ts: string): number | null {
const seconds = Number(ts.split('.')[0]);
if (!Number.isFinite(seconds)) return null;
return seconds * 1000;
}
// Newest-first recency ordering, capped at limit. The Home card shows "latest
// messages", so recency is the ordering once noise is filtered out.
function timeRank(candidates: SlackHomeRankCandidate[], limit: number): string[] {
return [...candidates]
.sort((a, b) => Number(b.ts) - Number(a.ts))
.slice(0, limit)
.map(candidate => candidate.id);
}
// What remains after removing :shortcodes:, unicode emoji/symbols, punctuation
// and whitespace. Empty ⇒ the message was emoji/reaction-only.
function strippedToCore(text: string): string {
return text
.replace(EMOJI_SHORTCODE_RE, '')
.replace(/[\s\p{P}\p{S}]/gu, '')
.trim();
}
function isExpiredRoutineLogistics(candidate: SlackHomeRankCandidate, nowMs: number): boolean {
const sentAtMs = slackTsToMs(candidate.ts);
if (sentAtMs === null) return false;
if (nowMs - sentAtMs < EXPIRED_ROUTINE_AGE_MS) return false;
const text = candidate.text.replace(/\s+/g, ' ').trim();
if (!ROUTINE_EVENT_RE.test(text)) return false;
if (DURABLE_SIGNAL_RE.test(text)) return false;
return ROUTINE_LOGISTICS_RE.test(text);
}
// Low-value classes that never belong on Home: empty bodies, Slack system
// messages, emoji/reaction-only posts, and bare greetings/acks. A durable
// signal overrides all of these.
function isLowValueNoise(candidate: SlackHomeRankCandidate): boolean {
const text = candidate.text.replace(/\s+/g, ' ').trim();
if (!text) return true;
if (DURABLE_SIGNAL_RE.test(text)) return false;
if (SYSTEM_MESSAGE_RE.test(text)) return true;
if (TRIVIAL_RE.test(text)) return true;
return strippedToCore(text).length === 0;
}
export function filterSlackHomeCandidatesForRelevance(
candidates: SlackHomeRankCandidate[],
nowMs = Date.now(),
): SlackHomeRankCandidate[] {
return candidates.filter(candidate =>
!isExpiredRoutineLogistics(candidate, nowMs) && !isLowValueNoise(candidate));
}
// Deterministic Home feed: drop noise, then order by recency and cap. No LLM
// call — the filter does the de-noising and recency does the ordering.
// (kept async so the IPC caller's contract is unchanged.)
export async function rankSlackHomeMessages(
candidates: SlackHomeRankCandidate[],
limit: number,
): Promise<string[]> {
return timeRank(filterSlackHomeCandidatesForRelevance(candidates), limit);
}

View file

@ -0,0 +1,113 @@
import fs from 'fs';
import path from 'path';
import { WorkDir } from '../../config/config.js';
import {
KnowledgeSourceConfig,
KnowledgeSourcesFile,
type KnowledgeSourcesFile as KnowledgeSourcesFileType,
} from './types.js';
const CONFIG_FILE = path.join(WorkDir, 'config', 'knowledge_sources.json');
const BUILTIN_SOURCES: KnowledgeSourceConfig[] = [
{
id: 'gmail',
provider: 'gmail',
enabled: true,
artifactDir: 'gmail_sync',
syncMode: 'file',
scopes: [],
},
{
id: 'fireflies-meetings',
provider: 'meeting',
enabled: true,
artifactDir: path.join('knowledge', 'Meetings', 'fireflies'),
syncMode: 'file',
scopes: [],
},
{
id: 'granola-meetings',
provider: 'meeting',
enabled: true,
artifactDir: path.join('knowledge', 'Meetings', 'granola'),
syncMode: 'file',
scopes: [],
},
{
id: 'rowboat-meetings',
provider: 'meeting',
enabled: true,
artifactDir: path.join('knowledge', 'Meetings', 'rowboat'),
syncMode: 'file',
scopes: [],
},
];
function ensureConfigDir(): void {
fs.mkdirSync(path.dirname(CONFIG_FILE), { recursive: true });
}
function mergeBuiltinSources(config: KnowledgeSourcesFileType): KnowledgeSourcesFileType {
const byId = new Map(config.sources.map(source => [source.id, source]));
for (const builtin of BUILTIN_SOURCES) {
if (!byId.has(builtin.id)) {
byId.set(builtin.id, builtin);
}
}
return { sources: Array.from(byId.values()) };
}
export interface IKnowledgeSourcesRepo {
getConfig(): KnowledgeSourcesFileType;
setConfig(config: KnowledgeSourcesFileType): void;
listEnabledSources(): KnowledgeSourceConfig[];
upsertSource(source: KnowledgeSourceConfig): KnowledgeSourcesFileType;
}
export class FSKnowledgeSourcesRepo implements IKnowledgeSourcesRepo {
getConfig(): KnowledgeSourcesFileType {
try {
if (!fs.existsSync(CONFIG_FILE)) {
const config = { sources: BUILTIN_SOURCES };
this.setConfig(config);
return config;
}
const parsed = KnowledgeSourcesFile.parse(JSON.parse(fs.readFileSync(CONFIG_FILE, 'utf-8')));
const merged = mergeBuiltinSources(parsed);
if (merged.sources.length !== parsed.sources.length) {
this.setConfig(merged);
}
return merged;
} catch (error) {
console.error('[KnowledgeSources] Failed to load config:', error);
return { sources: BUILTIN_SOURCES };
}
}
setConfig(config: KnowledgeSourcesFileType): void {
const validated = KnowledgeSourcesFile.parse(mergeBuiltinSources(config));
ensureConfigDir();
fs.writeFileSync(CONFIG_FILE, JSON.stringify(validated, null, 2), 'utf-8');
}
listEnabledSources(): KnowledgeSourceConfig[] {
return this.getConfig().sources.filter(source => source.enabled);
}
upsertSource(source: KnowledgeSourceConfig): KnowledgeSourcesFileType {
const validated = KnowledgeSourceConfig.parse(source);
const config = this.getConfig();
const existingIndex = config.sources.findIndex(item => item.id === validated.id);
if (existingIndex >= 0) {
config.sources[existingIndex] = validated;
} else {
config.sources.push(validated);
}
this.setConfig(config);
return this.getConfig();
}
}
export const knowledgeSourcesRepo = new FSKnowledgeSourcesRepo();

View file

@ -0,0 +1,182 @@
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import type { KnowledgeSourceConfig } from './types.js';
// WorkDir is resolved when config.js loads, so the env override must be in
// place before sync_slack.js (which imports it) is loaded — hence the
// dynamic imports in beforeAll.
const tmpWorkDir = fs.mkdtempSync(path.join(os.tmpdir(), 'slack-sync-test-'));
process.env.ROWBOAT_WORKDIR = tmpWorkDir;
const sourceA: KnowledgeSourceConfig = {
id: 'slack-a',
provider: 'slack',
enabled: true,
artifactDir: 'knowledge_sources/slack',
syncMode: 'poll',
intervalMs: 5 * 60 * 1000,
scopes: [{ type: 'channel', id: 'C-AAA', name: '#alpha' }],
};
const sourceB: KnowledgeSourceConfig = {
...sourceA,
id: 'slack-b',
scopes: [{ type: 'channel', id: 'C-BBB', name: '#beta' }],
};
vi.mock('./repo.js', () => ({
knowledgeSourcesRepo: {
listEnabledSources: vi.fn(() => [sourceA, sourceB]),
getConfig: vi.fn(() => ({ sources: [sourceA, sourceB] })),
},
}));
vi.mock('../../services/service_logger.js', () => ({
serviceLogger: {
startRun: vi.fn(async () => ({ service: 'slack', runId: 'test-run', startedAt: Date.now() })),
log: vi.fn(async () => { }),
},
}));
vi.mock('../../events/producer.js', () => ({
createEvent: vi.fn(async () => { }),
}));
vi.mock('../../slack/agent-slack-exec.js', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../slack/agent-slack-exec.js')>();
return { ...actual, runAgentSlack: vi.fn() };
});
type SyncModule = typeof import('./sync_slack.js');
type ExecModule = typeof import('../../slack/agent-slack-exec.js');
let sync: SyncModule;
let execMock: ReturnType<typeof vi.mocked<ExecModule['runAgentSlack']>>;
const stateFile = path.join(tmpWorkDir, 'slack_knowledge_sync_state.json');
function readState() {
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
}
/** Rewind a source's lastSyncAt so it counts as due again. */
function rewindSource(sourceId: string, ms: number) {
const state = readState();
state.sources[sourceId].lastSyncAt = new Date(Date.now() - ms).toISOString();
fs.writeFileSync(stateFile, JSON.stringify(state), 'utf-8');
}
const okEmpty = { ok: true as const, stdout: '[]', data: [] };
const rateLimited = {
ok: false as const, kind: 'rate_limited' as const, stderr: 'ratelimited',
message: 'A rate-limit has been reached, you may retry this request in 30 seconds',
};
const badChannel = {
ok: false as const, kind: 'bad_channel' as const, stderr: '',
message: 'Could not resolve channel name: #alpha',
};
beforeAll(async () => {
sync = await import('./sync_slack.js');
const exec = await import('../../slack/agent-slack-exec.js');
execMock = vi.mocked(exec.runAgentSlack);
});
beforeEach(() => {
execMock.mockReset();
fs.rmSync(stateFile, { force: true });
});
afterAll(() => {
fs.rmSync(tmpWorkDir, { recursive: true, force: true });
});
describe('syncSlackKnowledgeSources status persistence', () => {
it('records ok status and lastSyncAt per source', async () => {
execMock.mockResolvedValue(okEmpty);
await sync.syncSlackKnowledgeSources();
const state = readState();
for (const id of ['slack-a', 'slack-b']) {
expect(state.sources[id].lastStatus).toBe('ok');
expect(Date.parse(state.sources[id].lastSyncAt)).toBeGreaterThan(Date.now() - 60_000);
expect(state.sources[id].lastError).toBeUndefined();
}
});
it('persists lastError and lets other sources continue past a bad one', async () => {
execMock.mockImplementation(async (args: string[]) =>
args.includes('C-AAA') ? badChannel : okEmpty);
await sync.syncSlackKnowledgeSources();
const state = readState();
expect(state.sources['slack-a']).toMatchObject({
lastStatus: 'error',
lastError: { kind: 'bad_channel', message: 'Could not resolve channel name: #alpha' },
});
// slack-b synced despite slack-a failing
expect(state.sources['slack-b'].lastStatus).toBe('ok');
expect(execMock.mock.calls.some(call => call[0].includes('C-BBB'))).toBe(true);
});
it('stops the run on rate limit without touching later sources', async () => {
execMock.mockResolvedValue(rateLimited);
await sync.syncSlackKnowledgeSources();
const state = readState();
expect(state.sources['slack-a'].lastError.kind).toBe('rate_limited');
expect(state.sources['slack-b']).toBeUndefined();
expect(execMock.mock.calls.every(call => !call[0].includes('C-BBB'))).toBe(true);
});
it('grows backoff on consecutive rate limits and resets it on success', async () => {
execMock.mockResolvedValue(rateLimited);
await sync.syncSlackKnowledgeSources();
expect(readState().sources['slack-a'].backoffMultiplier).toBe(2);
rewindSource('slack-a', 60 * 60 * 1000);
await sync.syncSlackKnowledgeSources();
expect(readState().sources['slack-a'].backoffMultiplier).toBe(4);
rewindSource('slack-a', 60 * 60 * 1000);
execMock.mockResolvedValue(okEmpty);
await sync.syncSlackKnowledgeSources();
expect(readState().sources['slack-a'].backoffMultiplier).toBeUndefined();
expect(readState().sources['slack-a'].lastStatus).toBe('ok');
});
it('does not re-sync a rate-limited source before its backed-off interval elapses', async () => {
execMock.mockResolvedValue(rateLimited);
// First run rate-limits slack-a and breaks before slack-b.
await sync.syncSlackKnowledgeSources();
execMock.mockClear();
// Second run: slack-a is backed off (not due) but slack-b never ran, so
// it's still due. slack-a must not be retried; slack-b may be.
await sync.syncSlackKnowledgeSources();
expect(execMock.mock.calls.every(call => !call[0].includes('C-AAA'))).toBe(true);
});
});
describe('effectiveIntervalMs', () => {
it('multiplies the base interval by the backoff and caps at 30 minutes', () => {
expect(sync.effectiveIntervalMs(sourceA, undefined)).toBe(5 * 60 * 1000);
expect(sync.effectiveIntervalMs(sourceA, { backoffMultiplier: 2 })).toBe(10 * 60 * 1000);
expect(sync.effectiveIntervalMs(sourceA, { backoffMultiplier: 4 })).toBe(20 * 60 * 1000);
expect(sync.effectiveIntervalMs(sourceA, { backoffMultiplier: 8 })).toBe(30 * 60 * 1000);
expect(sync.effectiveIntervalMs(sourceA, { backoffMultiplier: 1024 })).toBe(30 * 60 * 1000);
});
});
describe('getSlackKnowledgeSyncStatus', () => {
it('reports per-source status with nextDueAt from interval + backoff', async () => {
execMock.mockImplementation(async (args: string[]) =>
args.includes('C-AAA') ? badChannel : okEmpty);
await sync.syncSlackKnowledgeSources();
const statuses = sync.getSlackKnowledgeSyncStatus();
const a = statuses.find(s => s.id === 'slack-a');
const b = statuses.find(s => s.id === 'slack-b');
expect(a).toMatchObject({ enabled: true, lastStatus: 'error', lastError: { kind: 'bad_channel' } });
expect(b).toMatchObject({ enabled: true, lastStatus: 'ok' });
// nextDueAt ≈ lastSyncAt + 5 min
expect(Date.parse(b!.nextDueAt!)).toBeCloseTo(Date.parse(b!.lastSyncAt!) + 5 * 60 * 1000, -3);
});
});

View file

@ -0,0 +1,479 @@
import fs from 'fs';
import path from 'path';
import { WorkDir } from '../../config/config.js';
import { AgentSlackRunError, runAgentSlack as execAgentSlack } from '../../slack/agent-slack-exec.js';
import type { AgentSlackErrorKind } from '../../slack/agent-slack-exec.js';
import { serviceLogger } from '../../services/service_logger.js';
import { limitEventItems } from '../limit_event_items.js';
import { createEvent } from '../../events/producer.js';
import { knowledgeSourcesRepo } from './repo.js';
import type { KnowledgeArtifact, KnowledgeSourceConfig, KnowledgeSourceScope } from './types.js';
const DEFAULT_LIMIT = 100;
const DEFAULT_SYNC_INTERVAL_MS = 5 * 60 * 1000;
const DEFAULT_RECENT_BACKFILL_SECONDS = 6 * 60 * 60;
const STATE_FILE = path.join(WorkDir, 'slack_knowledge_sync_state.json');
const ARTIFACT_ROOT = path.join(WorkDir, 'knowledge_sources', 'slack');
export type SlackSourceSyncState = {
/** Time of the last sync attempt (success or failure). */
lastSyncAt?: string;
lastStatus?: 'ok' | 'error';
lastError?: { kind: AgentSlackErrorKind | 'unknown'; message: string };
/** Rate-limit backoff: multiplies the source interval; reset on success. */
backoffMultiplier?: number;
};
type SlackSyncState = {
lastSyncAt?: string;
sources?: Record<string, SlackSourceSyncState>;
channels: Record<string, { lastSeenTs?: string }>;
};
type SlackMessage = {
ts?: string;
thread_ts?: string;
user?: string;
username?: string;
text?: string;
body?: string;
content?: string;
channel?: string;
channel_id?: string;
channel_name?: string;
permalink?: string;
url?: string;
edited?: { ts?: string; user?: string };
reply_count?: number;
replies?: SlackMessage[];
};
function loadState(): SlackSyncState {
try {
if (fs.existsSync(STATE_FILE)) {
const parsed = JSON.parse(fs.readFileSync(STATE_FILE, 'utf-8')) as Partial<SlackSyncState>;
return { channels: {}, ...parsed };
}
} catch (error) {
console.error('[SlackKnowledge] Failed to load state:', error);
}
return { channels: {} };
}
function saveState(state: SlackSyncState): void {
fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2), 'utf-8');
}
const MAX_SOURCE_SYNC_INTERVAL_MS = 30 * 60 * 1000;
/** Source interval with rate-limit backoff applied, capped at 30 minutes. */
export function effectiveIntervalMs(source: KnowledgeSourceConfig, sourceState?: SlackSourceSyncState): number {
const base = source.intervalMs ?? DEFAULT_SYNC_INTERVAL_MS;
const multiplier = Math.max(1, sourceState?.backoffMultiplier ?? 1);
return Math.min(base * multiplier, MAX_SOURCE_SYNC_INTERVAL_MS);
}
function isSourceDue(source: KnowledgeSourceConfig, state: SlackSyncState): boolean {
const sourceState = state.sources?.[source.id];
if (!sourceState?.lastSyncAt) return true;
const lastSyncMs = Date.parse(sourceState.lastSyncAt);
return !Number.isFinite(lastSyncMs) || Date.now() - lastSyncMs >= effectiveIntervalMs(source, sourceState);
}
function safeSegment(value: string): string {
return value
.replace(/^https?:\/\//, '')
.replace(/[\\/*?:"<>|#\s]+/g, '_')
.replace(/_+/g, '_')
.replace(/^_+|_+$/g, '')
.slice(0, 120) || 'unknown';
}
function slackTsToDate(ts: string): string {
const seconds = Number(ts.split('.')[0]);
if (!Number.isFinite(seconds)) {
return new Date().toISOString();
}
return new Date(seconds * 1000).toISOString();
}
function subtractSlackTs(ts: string | undefined, seconds: number): string | undefined {
if (!ts) return undefined;
const value = Number(ts);
if (!Number.isFinite(value)) return undefined;
return Math.max(0, value - seconds).toFixed(6);
}
function compareSlackTs(a: string | undefined, b: string | undefined): number {
const an = Number(a);
const bn = Number(b);
if (!Number.isFinite(an) && !Number.isFinite(bn)) return 0;
if (!Number.isFinite(an)) return -1;
if (!Number.isFinite(bn)) return 1;
return an - bn;
}
function extractMessages(raw: unknown): SlackMessage[] {
if (Array.isArray(raw)) return raw as SlackMessage[];
if (raw && typeof raw === 'object') {
const obj = raw as Record<string, unknown>;
const candidates = [obj.messages, obj.items, obj.results, obj.data];
for (const candidate of candidates) {
if (Array.isArray(candidate)) return candidate as SlackMessage[];
}
}
return [];
}
function getMessageText(message: SlackMessage): string {
return message.text ?? message.body ?? message.content ?? '';
}
function getMessageAuthor(message: SlackMessage): string {
return message.username ?? message.user ?? 'unknown';
}
async function runAgentSlack(args: string[]): Promise<unknown> {
const result = await execAgentSlack(args, { timeoutMs: 30_000, maxBuffer: 2 * 1024 * 1024 });
if (!result.ok) {
throw new AgentSlackRunError(result.kind, result.message);
}
return result.data ?? [];
}
async function listMessages(source: KnowledgeSourceConfig, scope: KnowledgeSourceScope, oldest?: string): Promise<SlackMessage[]> {
const target = scope.id;
const args = [
'message',
'list',
target,
'--limit',
String(source.filters?.limit ?? DEFAULT_LIMIT),
'--max-body-chars',
String(source.filters?.maxBodyChars ?? 4000),
];
if (scope.workspaceUrl) {
args.push('--workspace', scope.workspaceUrl);
}
if (oldest) {
args.push('--oldest', oldest);
}
const raw = await runAgentSlack(args);
return extractMessages(raw)
.filter(message => message.ts && getMessageText(message).trim().length > 0)
.sort((a, b) => compareSlackTs(a.ts, b.ts));
}
function artifactForMessage(source: KnowledgeSourceConfig, scope: KnowledgeSourceScope, message: SlackMessage): KnowledgeArtifact | null {
if (!message.ts) return null;
const channelName = scope.name ?? message.channel_name ?? message.channel ?? message.channel_id ?? scope.id;
const workspaceName = scope.workspaceUrl ?? 'Slack';
const version = message.edited?.ts ?? message.ts;
const url = message.permalink ?? message.url;
const title = `Slack message in ${channelName}`;
const occurredAt = slackTsToDate(message.ts);
const author = getMessageAuthor(message);
const body = getMessageText(message).trim();
const bodyMarkdown = [
`# ${title}`,
``,
`**Workspace:** ${workspaceName}`,
`**Channel:** ${channelName}`,
`**Author:** ${author}`,
`**Timestamp:** ${occurredAt}`,
message.thread_ts ? `**Thread TS:** ${message.thread_ts}` : '',
url ? `**Link:** ${url}` : '',
``,
`## Message`,
``,
body,
].filter(line => line !== '').join('\n');
return {
sourceId: source.id,
provider: 'slack',
externalId: `${scope.workspaceUrl ?? 'workspace'}:${scope.id}:${message.ts}`,
version,
occurredAt,
title,
bodyMarkdown,
url,
metadata: {
workspaceUrl: scope.workspaceUrl,
channelId: scope.id,
channelName,
author,
ts: message.ts,
threadTs: message.thread_ts,
editedTs: message.edited?.ts,
},
};
}
function writeArtifact(source: KnowledgeSourceConfig, scope: KnowledgeSourceScope, artifact: KnowledgeArtifact): string | null {
const workspace = safeSegment(scope.workspaceUrl ?? 'workspace');
const channel = safeSegment(scope.name ?? scope.id);
const ts = safeSegment(artifact.metadata.ts as string);
const dir = path.join(WorkDir, source.artifactDir || path.join('knowledge_sources', 'slack'), workspace, channel);
fs.mkdirSync(dir, { recursive: true });
const filePath = path.join(dir, `${ts}.md`);
const frontmatter = [
'---',
`source: ${artifact.provider}`,
`source_id: ${artifact.sourceId}`,
`external_id: ${JSON.stringify(artifact.externalId)}`,
`version: ${JSON.stringify(artifact.version)}`,
`occurred_at: ${JSON.stringify(artifact.occurredAt)}`,
artifact.url ? `url: ${JSON.stringify(artifact.url)}` : '',
'---',
'',
].filter(Boolean).join('\n');
const content = `${frontmatter}${artifact.bodyMarkdown}\n`;
if (fs.existsSync(filePath)) {
try {
if (fs.readFileSync(filePath, 'utf-8') === content) {
return null;
}
} catch {
// Fall through and rewrite the artifact.
}
}
fs.writeFileSync(filePath, content, 'utf-8');
return filePath;
}
async function publishSlackSyncEvent(files: string[]): Promise<void> {
if (files.length === 0) return;
const relativeFiles = files.map(file => path.relative(WorkDir, file));
await createEvent({
source: 'slack',
type: 'slack.synced',
createdAt: new Date().toISOString(),
payload: [
'# Slack knowledge sync update',
'',
`${files.length} new/updated message artifact${files.length === 1 ? '' : 's'}.`,
'',
...relativeFiles.slice(0, 20).map(file => `- ${file}`),
].join('\n'),
});
}
/**
* Sync one source's channels into artifact files. Mutates state.channels as
* it goes; throws AgentSlackRunError on CLI failure (status bookkeeping is
* the caller's job).
*/
async function syncSource(source: KnowledgeSourceConfig, state: SlackSyncState): Promise<string[]> {
if (source.scopes.length === 0) {
console.log(`[SlackKnowledge] Source ${source.id} has no channel scopes; skipping`);
return [];
}
const writtenFiles: string[] = [];
for (const scope of source.scopes.filter(scope => scope.type === 'channel')) {
const key = `${source.id}:${scope.workspaceUrl ?? ''}:${scope.id}`;
const channelState = state.channels[key] ?? {};
const recentBackfillSeconds = Number(source.filters?.recentBackfillSeconds ?? DEFAULT_RECENT_BACKFILL_SECONDS);
const oldest = subtractSlackTs(channelState.lastSeenTs, recentBackfillSeconds);
const messages = await listMessages(source, scope, oldest);
let newestTs = channelState.lastSeenTs;
for (const message of messages) {
if (compareSlackTs(message.ts, channelState.lastSeenTs) <= 0 && !message.edited?.ts) {
continue;
}
const artifact = artifactForMessage(source, scope, message);
if (!artifact) continue;
const writtenFile = writeArtifact(source, scope, artifact);
if (writtenFile) {
writtenFiles.push(writtenFile);
}
if (compareSlackTs(message.ts, newestTs) > 0) {
newestTs = message.ts;
}
}
state.channels[key] = { lastSeenTs: newestTs };
}
return writtenFiles;
}
function recordSourceResult(state: SlackSyncState, sourceId: string, error?: { kind: AgentSlackErrorKind | 'unknown'; message: string }): void {
const previous = state.sources?.[sourceId];
const now = new Date().toISOString();
const next: SlackSourceSyncState = { lastSyncAt: now };
if (error) {
next.lastStatus = 'error';
next.lastError = error;
if (error.kind === 'rate_limited') {
// Doubles each consecutive rate limit; effectiveIntervalMs caps
// the resulting interval at 30 min, the clamp keeps the stored
// value sane in the state file.
next.backoffMultiplier = Math.min(Math.max(2, (previous?.backoffMultiplier ?? 1) * 2), 1024);
}
} else {
next.lastStatus = 'ok';
}
state.lastSyncAt = now;
state.sources = { ...(state.sources ?? {}), [sourceId]: next };
}
export async function syncSlackKnowledgeSources(): Promise<string[]> {
const state = loadState();
const sources = knowledgeSourcesRepo
.listEnabledSources()
.filter(source => source.provider === 'slack' && source.syncMode === 'poll')
.filter(source => isSourceDue(source, state));
if (sources.length === 0) return [];
const run = await serviceLogger.startRun({
service: 'slack',
message: 'Syncing Slack knowledge sources',
trigger: 'timer',
});
const writtenFiles: string[] = [];
let hadError = false;
for (const source of sources) {
let rateLimited = false;
try {
const files = await syncSource(source, state);
writtenFiles.push(...files);
recordSourceResult(state, source.id);
} catch (error) {
// One failing source must not abort the others.
hadError = true;
const kind = error instanceof AgentSlackRunError ? error.kind : 'unknown';
const message = error instanceof Error ? error.message : String(error);
recordSourceResult(state, source.id, { kind, message });
rateLimited = kind === 'rate_limited';
console.error(`[SlackKnowledge] Sync failed for source ${source.id} (${kind}):`, message);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Slack knowledge sync error for source ${source.id} (${kind})`,
error: message,
});
}
// Persist after every source so progress and status survive a crash.
saveState(state);
// Rate limits are per-token, so the remaining sources would hit the
// same wall — end this run; they stay due for the next tick.
if (rateLimited) break;
}
if (writtenFiles.length > 0) {
try {
const relativeFiles = writtenFiles.map(file => path.relative(WorkDir, file));
const limitedFiles = limitEventItems(relativeFiles);
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Slack updates: ${writtenFiles.length} message artifact${writtenFiles.length === 1 ? '' : 's'}`,
counts: { messages: writtenFiles.length },
items: limitedFiles.items,
truncated: limitedFiles.truncated,
});
await publishSlackSyncEvent(writtenFiles);
} catch (error) {
hadError = true;
console.error('[SlackKnowledge] Failed to publish sync results:', error);
}
}
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Slack sync complete: ${writtenFiles.length} artifact${writtenFiles.length === 1 ? '' : 's'}`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: { artifacts: writtenFiles.length },
});
return writtenFiles;
}
export function getSlackKnowledgeArtifactRoot(): string {
return ARTIFACT_ROOT;
}
export type SlackKnowledgeSourceStatus = {
id: string;
enabled: boolean;
lastSyncAt?: string;
lastStatus?: 'ok' | 'error';
lastError?: { kind: string; message: string };
/** When the source next becomes due, given interval + backoff. */
nextDueAt?: string;
};
/** Per-source sync status for the slack:knowledgeStatus IPC channel. */
export function getSlackKnowledgeSyncStatus(): SlackKnowledgeSourceStatus[] {
const state = loadState();
return knowledgeSourcesRepo
.getConfig()
.sources
.filter(source => source.provider === 'slack')
.map(source => {
const sourceState = state.sources?.[source.id];
const lastMs = sourceState?.lastSyncAt ? Date.parse(sourceState.lastSyncAt) : NaN;
return {
id: source.id,
enabled: source.enabled,
lastSyncAt: sourceState?.lastSyncAt,
lastStatus: sourceState?.lastStatus,
lastError: sourceState?.lastError,
nextDueAt: Number.isFinite(lastMs)
? new Date(lastMs + effectiveIntervalMs(source, sourceState)).toISOString()
: undefined,
};
});
}
let wakeResolve: (() => void) | null = null;
export function triggerSync(): void {
if (wakeResolve) {
wakeResolve();
wakeResolve = null;
}
}
function interruptibleSleep(ms: number): Promise<void> {
return new Promise(resolve => {
const timeout = setTimeout(() => {
wakeResolve = null;
resolve();
}, ms);
wakeResolve = () => {
clearTimeout(timeout);
resolve();
};
});
}
export async function init(): Promise<void> {
console.log(`[SlackKnowledge] Starting Slack knowledge sync. Polling every ${DEFAULT_SYNC_INTERVAL_MS / 1000}s`);
while (true) {
await syncSlackKnowledgeSources();
await interruptibleSleep(DEFAULT_SYNC_INTERVAL_MS);
}
}

View file

@ -0,0 +1,49 @@
import { z } from 'zod';
export const KnowledgeSourceProvider = z.enum([
'gmail',
'meeting',
'voice_memo',
'slack',
'github',
'linear',
]);
export type KnowledgeSourceProvider = z.infer<typeof KnowledgeSourceProvider>;
export const KnowledgeSourceScope = z.object({
type: z.string(),
id: z.string(),
name: z.string().optional(),
workspaceUrl: z.string().optional(),
});
export type KnowledgeSourceScope = z.infer<typeof KnowledgeSourceScope>;
export const KnowledgeSourceConfig = z.object({
id: z.string(),
provider: KnowledgeSourceProvider,
enabled: z.boolean(),
artifactDir: z.string(),
syncMode: z.enum(['file', 'poll', 'event', 'manual']).default('file'),
intervalMs: z.number().int().positive().optional(),
scopes: z.array(KnowledgeSourceScope).default([]),
instructions: z.string().optional(),
filters: z.record(z.string(), z.unknown()).optional(),
});
export type KnowledgeSourceConfig = z.infer<typeof KnowledgeSourceConfig>;
export const KnowledgeSourcesFile = z.object({
sources: z.array(KnowledgeSourceConfig),
});
export type KnowledgeSourcesFile = z.infer<typeof KnowledgeSourcesFile>;
export interface KnowledgeArtifact {
sourceId: string;
provider: KnowledgeSourceProvider;
externalId: string;
version: string;
occurredAt: string;
title: string;
bodyMarkdown: string;
url?: string;
metadata: Record<string, unknown>;
}

View file

@ -0,0 +1,190 @@
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import { exec } from 'node:child_process';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { promisify } from 'node:util';
import { agentSlackShimEnv, classifyAgentSlackStderr, resolveAgentSlackCli, runAgentSlack } from './agent-slack-exec.js';
const execAsync = promisify(exec);
// Fixture CLI scripts spawned via process.execPath (real node under vitest),
// exercising the same spawn path the app uses.
let fixtureDir: string;
let jsonCli: string;
let garbageCli: string;
let sleepCli: string;
let failingCli: string;
let stdinCli: string;
function writeFixture(name: string, code: string): string {
const file = path.join(fixtureDir, name);
fs.writeFileSync(file, code, 'utf-8');
return file;
}
beforeAll(() => {
fixtureDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-slack-exec-test-'));
jsonCli = writeFixture('json.cjs', `process.stdout.write(JSON.stringify({ args: process.argv.slice(2) }));`);
stdinCli = writeFixture('stdin.cjs', `let s = ''; process.stdin.on('data', c => s += c); process.stdin.on('end', () => process.stdout.write(s.trim()));`);
garbageCli = writeFixture('garbage.cjs', `process.stdout.write('definitely: not json');`);
sleepCli = writeFixture('sleep.cjs', `setTimeout(() => {}, 60_000);`);
failingCli = writeFixture('fail.cjs', `process.stderr.write('boom'); process.exit(2);`);
});
afterAll(() => {
fs.rmSync(fixtureDir, { recursive: true, force: true });
});
const missing = path.join('/nonexistent', 'agent-slack.cjs');
describe('resolveAgentSlackCli', () => {
it('prefers the bundled bin over global and PATH', () => {
const resolved = resolveAgentSlackCli({
bundledCandidates: [jsonCli],
globalCandidates: [garbageCli],
pathProbe: () => garbageCli,
});
expect(resolved).toEqual({ entry: jsonCli, source: 'bundled' });
});
it('falls back to a global install when the bundled bin is missing', () => {
const resolved = resolveAgentSlackCli({
bundledCandidates: [missing],
globalCandidates: [jsonCli],
pathProbe: () => garbageCli,
});
expect(resolved).toEqual({ entry: jsonCli, source: 'global' });
});
it('falls back to PATH last', () => {
const resolved = resolveAgentSlackCli({
bundledCandidates: [missing],
globalCandidates: [missing],
pathProbe: () => jsonCli,
});
expect(resolved).toEqual({ entry: jsonCli, source: 'path' });
});
it('returns null when nothing is found', () => {
const resolved = resolveAgentSlackCli({
bundledCandidates: [missing],
globalCandidates: [missing],
pathProbe: () => null,
});
expect(resolved).toBeNull();
});
});
describe('runAgentSlack', () => {
const via = (entry: string) => ({
bundledCandidates: [entry],
globalCandidates: [],
pathProbe: () => null,
});
it('returns parsed JSON stdout and forwards args', async () => {
const result = await runAgentSlack(['auth', 'whoami'], { resolve: via(jsonCli) });
expect(result).toMatchObject({ ok: true, data: { args: ['auth', 'whoami'] } });
});
it('returns raw stdout when parseJson is false', async () => {
const result = await runAgentSlack([], { resolve: via(garbageCli), parseJson: false });
expect(result.ok).toBe(true);
if (result.ok) expect(result.stdout).toBe('definitely: not json');
});
it('writes opts.input to the child stdin (parse-curl path)', async () => {
const result = await runAgentSlack([], { resolve: via(stdinCli), parseJson: false, input: "curl 'https://team.slack.com'" });
expect(result.ok).toBe(true);
if (result.ok) expect(result.stdout).toBe("curl 'https://team.slack.com'");
});
it('reports not_installed when no binary resolves', async () => {
const result = await runAgentSlack(['--version'], {
resolve: { bundledCandidates: [missing], globalCandidates: [missing], pathProbe: () => null },
});
expect(result).toMatchObject({ ok: false, kind: 'not_installed' });
});
it('reports parse_error on malformed JSON stdout', async () => {
const result = await runAgentSlack([], { resolve: via(garbageCli) });
expect(result).toMatchObject({ ok: false, kind: 'parse_error' });
});
it('kills a hung CLI and reports timeout', async () => {
const result = await runAgentSlack([], { resolve: via(sleepCli), timeoutMs: 300 });
expect(result).toMatchObject({ ok: false, kind: 'timeout' });
}, 10_000);
it('classifies stderr on non-zero exit (unrecognized → unknown)', async () => {
const result = await runAgentSlack([], { resolve: via(failingCli) });
expect(result).toMatchObject({ ok: false, kind: 'unknown', stderr: 'boom', message: 'boom' });
});
});
describe('classifyAgentSlackStderr', () => {
// Fixture corpus: strings marked (captured) are real stderr induced on a
// machine with no Slack auth; the rest are taken verbatim from the
// agent-slack 0.9.3 / @slack/web-api 7.17 sources.
const cases: Array<[string, ReturnType<typeof classifyAgentSlackStderr>]> = [
// not_authed — empty credential store (auth auto-import cascade)
['Firefox extraction is not supported on win32.', 'not_authed'], // (captured)
['Slack Desktop data not found. Checked:\n - C:\\Users\\X\\AppData\\Roaming\\Slack\\Local Storage\\leveldb', 'not_authed'], // (captured)
// not_authed — Slack API codes, both client flavors
['invalid_auth', 'not_authed'],
['token_expired', 'not_authed'],
['An API error occurred: invalid_auth', 'not_authed'],
['account_inactive', 'not_authed'],
// rate_limited
['ratelimited', 'rate_limited'],
['A rate-limit has been reached, you may retry this request in 30 seconds', 'rate_limited'],
['Slack HTTP 429 calling conversations.history', 'rate_limited'],
// network
['A request error occurred: getaddrinfo ENOTFOUND slack.com', 'network'],
['fetch failed', 'network'],
['connect ECONNREFUSED 127.0.0.1:443', 'network'],
['Slack HTTP 503 calling conversations.list', 'network'],
// bad_channel
['channel_not_found', 'bad_channel'],
['An API error occurred: channel_not_found', 'bad_channel'],
['Could not resolve channel name: #nonexistent-channel', 'bad_channel'],
['not_in_channel', 'bad_channel'],
// unknown
['Ambiguous channel name across multiple workspaces. Pass --workspace "<url>"', 'unknown'],
['', 'unknown'],
];
it.each(cases)('%j → %s', (stderr, expected) => {
expect(classifyAgentSlackStderr(stderr)).toBe(expected);
});
it('does not misread substrings of longer identifiers', () => {
// "speedratelimitedness" style false positives guarded by boundaries
expect(classifyAgentSlackStderr('field xratelimitedx in payload')).toBe('unknown');
expect(classifyAgentSlackStderr('saved to channel_not_found_archive.txt')).toBe('unknown');
});
});
describe('agentSlackShimEnv', () => {
it('returns the base env unchanged when no CLI resolves', () => {
const base = { PATH: '/usr/bin' };
const env = agentSlackShimEnv(path.join(fixtureDir, 'bin'), base, {
bundledCandidates: [missing], globalCandidates: [missing], pathProbe: () => null,
});
expect(env).toBe(base);
});
it('makes `agent-slack` runnable by name through a shell', async () => {
const shimDir = path.join(fixtureDir, 'bin');
const env = agentSlackShimEnv(shimDir, process.env, {
bundledCandidates: [jsonCli], globalCandidates: [], pathProbe: () => null,
});
const pathKey = Object.keys(env).find(key => key.toUpperCase() === 'PATH') ?? 'PATH';
expect(env[pathKey]!.startsWith(`${shimDir}${path.delimiter}`)).toBe(true);
// Same spawn shape as executeCommand: command string through a shell.
const { stdout } = await execAsync('agent-slack hello world', { env });
expect(JSON.parse(stdout)).toEqual({ args: ['hello', 'world'] });
});
});

View file

@ -0,0 +1,315 @@
import { execFile, execFileSync } from 'node:child_process';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { promisify } from 'node:util';
const execFileAsync = promisify(execFile);
/**
* Single shared executor for the agent-slack CLI.
*
* Every agent-slack invocation in the app must go through runAgentSlack()
* never execFile('agent-slack', ...) directly. Spawning the bare command
* requires it on PATH (we no longer auto-install it) and on Windows hits the
* .cmd-shim EINVAL bug. Instead we resolve a JS entry file and spawn it with
* process.execPath, which works without Node/npm on the user's machine.
*/
export type AgentSlackSource = 'bundled' | 'global' | 'path';
export interface ResolvedAgentSlack {
/** Absolute path to a JS entry file runnable via `node <entry>`. */
entry: string;
source: AgentSlackSource;
}
export type AgentSlackErrorKind =
// Structural failures (detected without running / from the spawn itself)
| 'not_installed' | 'timeout' | 'parse_error'
// CLI failures classified from stderr (exit code is always 1)
| 'not_authed' | 'rate_limited' | 'network' | 'bad_channel' | 'unknown';
// agent-slack prints `err.message` to stderr and exits 1 for every failure, so
// stderr text is the only classification signal. Patterns cover both Slack
// client flavors the CLI uses: the browser-token client throws bare Slack
// error codes ("invalid_auth"), @slack/web-api wraps them ("An API error
// occurred: invalid_auth") — plus the CLI's own messages. Word-ish boundaries
// match the CLI's own auth-detection regex.
const SLACK_CODE = (codes: string) => new RegExp(`(?:^|[^a-z_])(?:${codes})(?:$|[^a-z_])`, 'i');
const NOT_AUTHED_RE = SLACK_CODE('invalid_auth|token_expired|token_revoked|account_inactive|not_authed');
// Empty credential store surfaces as the auth auto-import cascade failing,
// e.g. "Slack Desktop data not found." / "Firefox extraction is not supported
// on win32." (real stderr captured on Windows with no Slack installed).
const AUTH_IMPORT_RE = /Slack Desktop data not found|extraction is not supported/i;
const RATE_LIMITED_RE = /(?:^|[^a-z_])ratelimited(?:$|[^a-z_])|A rate-?limit has been reached|Slack HTTP 429/i;
const NETWORK_RE = /A request error occurred|fetch failed|socket hang up|ENOTFOUND|ECONNREFUSED|ECONNRESET|ETIMEDOUT|EAI_AGAIN|EPIPE|Slack HTTP 5\d\d/i;
const BAD_CHANNEL_RE = new RegExp(
`${SLACK_CODE('channel_not_found|not_in_channel|is_archived').source}|Could not resolve channel name`, 'i');
/** Classify an agent-slack failure from its stderr. Exported for tests. */
export function classifyAgentSlackStderr(stderr: string): Exclude<AgentSlackErrorKind, 'not_installed' | 'timeout' | 'parse_error'> {
if (RATE_LIMITED_RE.test(stderr)) return 'rate_limited';
if (BAD_CHANNEL_RE.test(stderr)) return 'bad_channel';
if (NOT_AUTHED_RE.test(stderr) || AUTH_IMPORT_RE.test(stderr)) return 'not_authed';
if (NETWORK_RE.test(stderr)) return 'network';
return 'unknown';
}
export type AgentSlackResult =
| { ok: true; stdout: string; data: unknown }
| { ok: false; kind: AgentSlackErrorKind; message: string; stderr: string };
/** Throwable wrapper for callers with throw-based control flow (sync loop). */
export class AgentSlackRunError extends Error {
constructor(public readonly kind: AgentSlackErrorKind, message: string) {
super(message);
this.name = 'AgentSlackRunError';
}
}
export interface ResolveOptions {
/** Re-probe even if a previous resolution succeeded. */
refresh?: boolean;
/** Test hooks — override the default probe locations. */
bundledCandidates?: string[];
globalCandidates?: string[];
pathProbe?: () => string | null;
}
export interface RunAgentSlackOptions {
timeoutMs?: number;
maxBuffer?: number;
/** Set false for commands with non-JSON output (e.g. --version). */
parseJson?: boolean;
/** Written to the child's stdin then closed (e.g. `auth parse-curl`). */
input?: string;
/** Test hook — bypass the default resolver. */
resolve?: ResolveOptions;
}
const DEFAULT_TIMEOUT_MS = 30_000;
const DEFAULT_MAX_BUFFER = 2 * 1024 * 1024;
// The CLI is bundled by apps/main/bundle.mjs to agent-slack.cjs next to
// main.cjs. At runtime import.meta.url is rewritten by esbuild to point at
// main.cjs, so the sibling lookup works in dev and packaged builds alike.
// (Under vitest/tsc output the sibling doesn't exist and we fall through.)
function defaultBundledCandidates(): string[] {
return [path.join(path.dirname(fileURLToPath(import.meta.url)), 'agent-slack.cjs')];
}
const GLOBAL_BIN_REL = path.join('node_modules', 'agent-slack', 'bin', 'agent-slack.js');
function defaultGlobalCandidates(): string[] {
if (process.platform === 'win32') {
const appData = process.env.APPDATA ?? path.join(os.homedir(), 'AppData', 'Roaming');
return [path.join(appData, 'npm', GLOBAL_BIN_REL)];
}
return [
path.join('/usr/local/lib', GLOBAL_BIN_REL),
path.join('/opt/homebrew/lib', GLOBAL_BIN_REL),
];
}
/** Map a PATH hit (symlink, npm .cmd/.ps1/sh shim) to the underlying JS bin. */
function jsEntryFromPathHit(hit: string): string | null {
try {
const real = fs.realpathSync(hit);
if (/\.(c|m)?js$/.test(real)) return real;
// npm shims live next to the global node_modules tree.
const sibling = path.join(path.dirname(real), GLOBAL_BIN_REL);
if (fs.existsSync(sibling)) return sibling;
} catch {
// Broken symlink or unreadable shim — treat as no hit.
}
return null;
}
function defaultPathProbe(): string | null {
const lookup = process.platform === 'win32' ? 'where.exe' : 'which';
let output: string;
try {
output = execFileSync(lookup, ['agent-slack'], {
timeout: 5_000,
encoding: 'utf-8',
windowsHide: true,
});
} catch {
return null;
}
for (const line of output.split(/\r?\n/)) {
const hit = line.trim();
if (!hit) continue;
const entry = jsEntryFromPathHit(hit);
if (entry) return entry;
}
return null;
}
let cachedResolution: ResolvedAgentSlack | null = null;
export function resolveAgentSlackCli(opts: ResolveOptions = {}): ResolvedAgentSlack | null {
if (cachedResolution && !opts.refresh
&& !opts.bundledCandidates && !opts.globalCandidates && !opts.pathProbe) {
return cachedResolution;
}
let resolved: ResolvedAgentSlack | null = null;
for (const candidate of opts.bundledCandidates ?? defaultBundledCandidates()) {
if (fs.existsSync(candidate)) {
resolved = { entry: candidate, source: 'bundled' };
break;
}
}
if (!resolved) {
for (const candidate of opts.globalCandidates ?? defaultGlobalCandidates()) {
if (fs.existsSync(candidate)) {
resolved = { entry: candidate, source: 'global' };
break;
}
}
}
if (!resolved) {
const entry = (opts.pathProbe ?? defaultPathProbe)();
if (entry) resolved = { entry, source: 'path' };
}
// Only cache the default probe — test overrides must not leak, and a
// failed probe should retry next call (the user may install meanwhile).
if (resolved && !opts.bundledCandidates && !opts.globalCandidates && !opts.pathProbe) {
cachedResolution = resolved;
}
return resolved;
}
export async function runAgentSlack(args: string[], opts: RunAgentSlackOptions = {}): Promise<AgentSlackResult> {
const resolved = resolveAgentSlackCli(opts.resolve ?? {});
if (!resolved) {
return {
ok: false,
kind: 'not_installed',
message: 'agent-slack CLI not found (bundled copy missing and no global install)',
stderr: '',
};
}
const timeout = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
let stdout: string;
try {
// process.execPath inside Electron's main process is the Electron
// binary, not node — ELECTRON_RUN_AS_NODE makes it behave as plain
// node (and is ignored when we already run under real node).
const promise = execFileAsync(process.execPath, [resolved.entry, ...args], {
timeout,
maxBuffer: opts.maxBuffer ?? DEFAULT_MAX_BUFFER,
encoding: 'utf-8',
windowsHide: true,
env: { ...process.env, ELECTRON_RUN_AS_NODE: '1' },
});
// promisify(execFile) exposes the ChildProcess as `.child`, letting us
// feed stdin for commands that read it (e.g. `auth parse-curl`). Close
// stdin so those commands stop waiting for more input.
if (opts.input != null) {
promise.child.stdin?.end(opts.input);
}
const result = await promise;
stdout = result.stdout;
} catch (error) {
const err = error as NodeJS.ErrnoException & { killed?: boolean; signal?: string; stderr?: string };
const stderr = typeof err.stderr === 'string' ? err.stderr : '';
if (err.code === 'ENOENT') {
return { ok: false, kind: 'not_installed', message: `agent-slack entry vanished: ${resolved.entry}`, stderr };
}
if (err.killed || err.signal === 'SIGTERM') {
return { ok: false, kind: 'timeout', message: `agent-slack timed out after ${timeout}ms`, stderr };
}
return { ok: false, kind: classifyAgentSlackStderr(stderr), message: stderr.trim() || err.message || 'agent-slack failed', stderr };
}
if (opts.parseJson === false) {
return { ok: true, stdout, data: undefined };
}
const trimmed = stdout.trim();
try {
return { ok: true, stdout, data: trimmed ? JSON.parse(trimmed) : undefined };
} catch {
return {
ok: false,
kind: 'parse_error',
message: `agent-slack returned non-JSON output: ${trimmed.slice(0, 200)}`,
stderr: '',
};
}
}
export type AgentSlackCliStatus =
| { available: true; version: string; source: AgentSlackSource }
| { available: false };
/** Availability probe backing the slack:cliStatus IPC channel. */
export async function getAgentSlackCliStatus(): Promise<AgentSlackCliStatus> {
const resolved = resolveAgentSlackCli({ refresh: true });
if (!resolved) return { available: false };
const result = await runAgentSlack(['--version'], { timeoutMs: 10_000, parseJson: false });
if (!result.ok) return { available: false };
return { available: true, version: result.stdout.trim(), source: resolved.source };
}
// --- PATH shim for shell consumers (Copilot skill via executeCommand) -------
//
// The Copilot Slack skill runs literal `agent-slack ...` shell commands. Those
// used to rely on the startup `npm install -g` that this module replaced, so
// without help they'd only work on machines with a manual global install.
// We generate a tiny launcher script that forwards to the resolved CLI entry
// and prepend its directory to PATH for executeCommand children.
let shimmedFor: string | null = null;
function ensureAgentSlackShim(shimDir: string, entry: string): void {
const cacheKey = `${process.execPath}${entry}${shimDir}`;
if (shimmedFor === cacheKey) return;
fs.mkdirSync(shimDir, { recursive: true });
if (process.platform === 'win32') {
const cmd = `@echo off\r\nset ELECTRON_RUN_AS_NODE=1\r\n"${process.execPath}" "${entry}" %*\r\n`;
const cmdPath = path.join(shimDir, 'agent-slack.cmd');
if (!fs.existsSync(cmdPath) || fs.readFileSync(cmdPath, 'utf-8') !== cmd) {
fs.writeFileSync(cmdPath, cmd, 'utf-8');
}
} else {
const sh = `#!/bin/sh\nELECTRON_RUN_AS_NODE=1 exec "${process.execPath}" "${entry}" "$@"\n`;
const shPath = path.join(shimDir, 'agent-slack');
if (!fs.existsSync(shPath) || fs.readFileSync(shPath, 'utf-8') !== sh) {
fs.writeFileSync(shPath, sh, { encoding: 'utf-8', mode: 0o755 });
}
fs.chmodSync(shPath, 0o755);
}
shimmedFor = cacheKey;
}
/**
* Environment for shell commands that may invoke `agent-slack` by name.
* Prepends a shim directory to PATH so the resolved CLI (bundled first) wins
* over or substitutes for a global npm install. Returns the base env
* unchanged when no CLI can be resolved.
*/
export function agentSlackShimEnv(
shimDir: string,
base: NodeJS.ProcessEnv = process.env,
resolve?: ResolveOptions,
): NodeJS.ProcessEnv {
const resolved = resolveAgentSlackCli(resolve ?? {});
if (!resolved) return base;
try {
ensureAgentSlackShim(shimDir, resolved.entry);
} catch (error) {
console.warn('[Slack] Failed to write agent-slack PATH shim:', error);
return base;
}
// Windows env vars are case-insensitive; reuse the existing key ('Path')
// rather than introducing a duplicate 'PATH'.
const pathKey = Object.keys(base).find(key => key.toUpperCase() === 'PATH') ?? 'PATH';
return { ...base, [pathKey]: `${shimDir}${path.delimiter}${base[pathKey] ?? ''}` };
}