diff --git a/internal/application/orchestrator/orchestrator.go b/internal/application/orchestrator/orchestrator.go index 096c1f6..af50d49 100644 --- a/internal/application/orchestrator/orchestrator.go +++ b/internal/application/orchestrator/orchestrator.go @@ -79,17 +79,18 @@ type HeartbeatResult struct { // Orchestrator runs the DIP heartbeat pipeline. type Orchestrator struct { - mu sync.RWMutex - config Config - peerReg *peer.Registry - store memory.FactStore - synapseStore synapse.SynapseStore // v3.4: Module 9 - alertBus *alert.Bus - running bool - cycle int - history []HeartbeatResult - lastSync time.Time - lastFactCount int + mu sync.RWMutex + config Config + peerReg *peer.Registry + store memory.FactStore + synapseStore synapse.SynapseStore // v3.4: Module 9 + alertBus *alert.Bus + running bool + cycle int + history []HeartbeatResult + lastSync time.Time + lastFactCount int + lastApoptosisWritten time.Time // debounce: prevents WATCHDOG_RECOVERY flood } // New creates a new orchestrator. @@ -415,16 +416,32 @@ func (o *Orchestrator) stabilityCheck(ctx context.Context, result *HeartbeatResu if normalizedEntropy >= o.config.EntropyThreshold { result.ApoptosisTriggered = true - currentHash := memory.CompiledGenomeHash() - recoveryMarker := memory.NewFact( - fmt.Sprintf("[WATCHDOG_RECOVERY] genome_hash=%s entropy=%.4f cycle=%d", - currentHash, normalizedEntropy, result.Cycle), - memory.LevelProject, - "recovery", - "watchdog", - ) - recoveryMarker.Source = "watchdog" - _ = o.store.Add(ctx, recoveryMarker) + + // Debounce: write a recovery marker at most once per 24h. + // Without this guard, every heartbeat cycle (~5 min) that has high + // entropy writes a new record, flooding the DB with thousands of + // identical WATCHDOG_RECOVERY entries and causing context deadline + // exceeded on the next server startup. + o.mu.RLock() + lastWritten := o.lastApoptosisWritten + o.mu.RUnlock() + + if time.Since(lastWritten) >= 24*time.Hour { + currentHash := memory.CompiledGenomeHash() + recoveryMarker := memory.NewFact( + fmt.Sprintf("[WATCHDOG_RECOVERY] genome_hash=%s entropy=%.4f cycle=%d", + currentHash, normalizedEntropy, result.Cycle), + memory.LevelProject, + "recovery", + "watchdog", + ) + recoveryMarker.Source = "watchdog" + if err := o.store.Add(ctx, recoveryMarker); err == nil { + o.mu.Lock() + o.lastApoptosisWritten = time.Now() + o.mu.Unlock() + } + } } return genomeOK, normalizedEntropy @@ -596,11 +613,26 @@ func (o *Orchestrator) memoryHygiene(ctx context.Context, result *HeartbeatResul } } - // Step 2: Archive facts that have been stale for a while. + // Step 2: Purge aged watchdog recovery markers (domain=recovery, source=watchdog). + // These are purely diagnostic breadcrumbs — they accumulate forever and bloat + // the DB. Delete any that are older than 24h; only the latest is ever useful. + watchdogCutoff := time.Now().Add(-24 * time.Hour) + wdFacts, wdErr := o.store.ListByLevel(ctx, memory.LevelProject) + if wdErr == nil { + for _, f := range wdFacts { + if f.Domain == "recovery" && f.Source == "watchdog" && f.CreatedAt.Before(watchdogCutoff) { + if delErr := o.store.Delete(ctx, f.ID); delErr == nil { + archived++ // reuse counter — reported as "archived" in log + } + } + } + } + + // Step 3: Archive facts that have been stale for a while. staleFacts, err := o.store.GetStale(ctx, false) // exclude already-archived if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("hygiene get-stale: %v", err)) - return expired, 0 + return expired, archived } staleThreshold := time.Now().Add(-24 * time.Hour) // Archive if stale > 24h. for _, f := range staleFacts { diff --git a/internal/infrastructure/sqlite/soc_repo.go b/internal/infrastructure/sqlite/soc_repo.go index 98583dc..0396316 100644 --- a/internal/infrastructure/sqlite/soc_repo.go +++ b/internal/infrastructure/sqlite/soc_repo.go @@ -76,24 +76,23 @@ func (r *SOCRepo) migrate() error { hostname TEXT NOT NULL DEFAULT '', version TEXT NOT NULL DEFAULT '' )`, - // Indexes for common queries. + // Indexes for non-tenant columns only — safe on any schema version. + // tenant_id indexes are applied after ALTER TABLE migrations below. `CREATE INDEX IF NOT EXISTS idx_soc_events_timestamp ON soc_events(timestamp)`, `CREATE INDEX IF NOT EXISTS idx_soc_events_severity ON soc_events(severity)`, `CREATE INDEX IF NOT EXISTS idx_soc_events_category ON soc_events(category)`, `CREATE INDEX IF NOT EXISTS idx_soc_events_sensor ON soc_events(sensor_id)`, `CREATE INDEX IF NOT EXISTS idx_soc_events_content_hash ON soc_events(content_hash)`, - `CREATE INDEX IF NOT EXISTS idx_soc_events_tenant ON soc_events(tenant_id)`, `CREATE INDEX IF NOT EXISTS idx_soc_incidents_status ON soc_incidents(status)`, - `CREATE INDEX IF NOT EXISTS idx_soc_incidents_tenant ON soc_incidents(tenant_id)`, `CREATE INDEX IF NOT EXISTS idx_soc_sensors_status ON soc_sensors(status)`, - `CREATE INDEX IF NOT EXISTS idx_soc_sensors_tenant ON soc_sensors(tenant_id)`, } for _, ddl := range tables { if _, err := r.db.Exec(ddl); err != nil { return fmt.Errorf("exec %q: %w", ddl[:40], err) } } - // Migration: add columns (safe to re-run — ignore "already exists" errors) + // Migration: add columns and tenant indexes after columns are guaranteed to exist. + // All steps ignore errors — safe to re-run on any schema version. migrations := []string{ `ALTER TABLE soc_incidents ADD COLUMN assigned_to TEXT NOT NULL DEFAULT ''`, `ALTER TABLE soc_incidents ADD COLUMN notes_json TEXT NOT NULL DEFAULT '[]'`, @@ -101,9 +100,13 @@ func (r *SOCRepo) migrate() error { `ALTER TABLE soc_events ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''`, `ALTER TABLE soc_incidents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''`, `ALTER TABLE soc_sensors ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''`, + // tenant_id indexes — must come AFTER the ALTER TABLE statements above. + `CREATE INDEX IF NOT EXISTS idx_soc_events_tenant ON soc_events(tenant_id)`, + `CREATE INDEX IF NOT EXISTS idx_soc_incidents_tenant ON soc_incidents(tenant_id)`, + `CREATE INDEX IF NOT EXISTS idx_soc_sensors_tenant ON soc_sensors(tenant_id)`, } for _, m := range migrations { - r.db.Exec(m) // Ignore errors (column already exists) + r.db.Exec(m) // Ignore errors (column/index already exists) } return nil }