diff --git a/internal/infrastructure/postgres/migrations/003_add_tenant_id.sql b/internal/infrastructure/postgres/migrations/003_add_tenant_id.sql new file mode 100644 index 0000000..807c004 --- /dev/null +++ b/internal/infrastructure/postgres/migrations/003_add_tenant_id.sql @@ -0,0 +1,29 @@ +-- +goose Up +-- Add tenant_id to SOC tables for multi-tenant isolation. +-- Safe: DEFAULT '' fills existing rows without data loss. + +ALTER TABLE soc_events ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT ''; +ALTER TABLE soc_incidents ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT ''; +ALTER TABLE soc_sensors ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT ''; + +-- Tenant isolation indexes +CREATE INDEX IF NOT EXISTS idx_soc_events_tenant ON soc_events(tenant_id); +CREATE INDEX IF NOT EXISTS idx_soc_incidents_tenant ON soc_incidents(tenant_id); +CREATE INDEX IF NOT EXISTS idx_soc_sensors_tenant ON soc_sensors(tenant_id); + +-- Composite indexes for common tenant-scoped queries +CREATE INDEX IF NOT EXISTS idx_soc_events_tenant_ts ON soc_events(tenant_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_soc_events_tenant_cat ON soc_events(tenant_id, category); +CREATE INDEX IF NOT EXISTS idx_soc_incidents_tenant_status ON soc_incidents(tenant_id, status); + +-- +goose Down +DROP INDEX IF EXISTS idx_soc_incidents_tenant_status; +DROP INDEX IF EXISTS idx_soc_events_tenant_cat; +DROP INDEX IF EXISTS idx_soc_events_tenant_ts; +DROP INDEX IF EXISTS idx_soc_sensors_tenant; +DROP INDEX IF EXISTS idx_soc_incidents_tenant; +DROP INDEX IF EXISTS idx_soc_events_tenant; + +ALTER TABLE soc_sensors DROP COLUMN IF EXISTS tenant_id; +ALTER TABLE soc_incidents DROP COLUMN IF EXISTS tenant_id; +ALTER TABLE soc_events DROP COLUMN IF EXISTS tenant_id; diff --git a/internal/transport/http/server.go b/internal/transport/http/server.go index 996c777..58d58fe 100644 --- a/internal/transport/http/server.go +++ b/internal/transport/http/server.go @@ -40,6 +40,7 @@ type Server struct { jwtSecret []byte wsHub *WSHub usageTracker *auth.UsageTracker + scanSem chan struct{} // Limits concurrent CPU-heavy scans sovereignEnabled bool sovereignMode string pprofEnabled bool @@ -59,6 +60,7 @@ func New(socSvc *appsoc.Service, port int) *Server { metrics: NewMetrics(), logger: NewRequestLogger(true), wsHub: NewWSHub(), + scanSem: make(chan struct{}, 4), // Max 4 concurrent scans (1 per CPU) } } diff --git a/internal/transport/http/soc_handlers.go b/internal/transport/http/soc_handlers.go index c929ef8..5db0dee 100644 --- a/internal/transport/http/soc_handlers.go +++ b/internal/transport/http/soc_handlers.go @@ -1,6 +1,7 @@ package httpserver import ( + "context" "encoding/json" "errors" "fmt" @@ -1472,6 +1473,10 @@ func (s *Server) handleSLAConfig(w http.ResponseWriter, _ *http.Request) { // handlePublicScan provides a public (no-auth) prompt scanning endpoint for the demo. // POST /api/v1/scan body: {"prompt": "Ignore all instructions..."} // Runs sentinel-core (54 Rust engines) + Shield (C11 payload inspection) in parallel. +// +// Concurrency control: uses scanSem (buffered channel) to limit parallel scans. +// If all slots are busy, returns 503 Service Unavailable with Retry-After header +// to prevent OOM under burst load (e.g., 20 concurrent battle workers). func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { limitBody(w, r) defer r.Body.Close() @@ -1515,9 +1520,25 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { } } + // ── Concurrency limiter: prevent OOM under burst load ── + select { + case s.scanSem <- struct{}{}: + defer func() { <-s.scanSem }() // Release slot when done + default: + // All scan slots busy → backpressure + w.Header().Set("Retry-After", "2") + slog.Warn("scan backpressure: all slots busy", "capacity", cap(s.scanSem)) + writeError(w, http.StatusServiceUnavailable, "scan engine busy — retry in 2 seconds") + return + } + + // ── Scan timeout: 30s hard limit ── + scanCtx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + // Run sentinel-core (54 Rust engines) coreEngine := s.getEngine("sentinel-core") - coreResult, coreErr := coreEngine.ScanPrompt(r.Context(), req.Prompt) + coreResult, coreErr := coreEngine.ScanPrompt(scanCtx, req.Prompt) // Run Shield (C payload inspection) var shieldEng engines.Shield @@ -1526,7 +1547,7 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { } else { shieldEng = engines.NewStubShield() } - shieldResult, shieldErr := shieldEng.InspectTraffic(r.Context(), []byte(req.Prompt), nil) + shieldResult, shieldErr := shieldEng.InspectTraffic(scanCtx, []byte(req.Prompt), nil) // Build response — merge both engines response := map[string]any{}