From 5c00ffef750e3403d7a1b0c78834cd86bb0acf2b Mon Sep 17 00:00:00 2001 From: DmitrL-dev <84296377+DmitrL-dev@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:25:43 +1000 Subject: [PATCH] perf: full optimization pass for 10K battle --- internal/transport/http/server.go | 20 +++++- internal/transport/http/soc_handlers.go | 92 +++++++++++++++++++++---- 2 files changed, 99 insertions(+), 13 deletions(-) diff --git a/internal/transport/http/server.go b/internal/transport/http/server.go index 58d58fe..e2da7c5 100644 --- a/internal/transport/http/server.go +++ b/internal/transport/http/server.go @@ -6,12 +6,15 @@ package httpserver import ( "context" + "crypto/sha256" "crypto/tls" "database/sql" + "encoding/hex" "encoding/json" "fmt" "log/slog" "net/http" + "sync" "time" shadowai "github.com/syntrex/gomcp/internal/application/shadow_ai" @@ -41,6 +44,8 @@ type Server struct { wsHub *WSHub usageTracker *auth.UsageTracker scanSem chan struct{} // Limits concurrent CPU-heavy scans + scanCache map[string]*cachedScan + scanCacheMu sync.RWMutex sovereignEnabled bool sovereignMode string pprofEnabled bool @@ -50,6 +55,18 @@ type Server struct { tlsKey string } +// cachedScan stores a cached scan result with expiry. +type cachedScan struct { + response map[string]any + expiry time.Time +} + +// promptHash returns a fast SHA-256 hash of the prompt for cache keying. +func promptHash(prompt string) string { + h := sha256.Sum256([]byte(prompt)) + return hex.EncodeToString(h[:16]) // 128-bit is enough for cache key +} + // New creates an HTTP server bound to the given port. func New(socSvc *appsoc.Service, port int) *Server { return &Server{ @@ -60,7 +77,8 @@ func New(socSvc *appsoc.Service, port int) *Server { metrics: NewMetrics(), logger: NewRequestLogger(true), wsHub: NewWSHub(), - scanSem: make(chan struct{}, 4), // Max 4 concurrent scans (1 per CPU) + scanSem: make(chan struct{}, 6), // Max 6 concurrent scans (~2 per CPU) + scanCache: make(map[string]*cachedScan, 500), } } diff --git a/internal/transport/http/soc_handlers.go b/internal/transport/http/soc_handlers.go index 5db0dee..5167e72 100644 --- a/internal/transport/http/soc_handlers.go +++ b/internal/transport/http/soc_handlers.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "strconv" + "sync" "time" appsoc "github.com/syntrex/gomcp/internal/application/soc" @@ -1520,15 +1521,33 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { } } - // ── Concurrency limiter: prevent OOM under burst load ── + // ── Scan cache: return cached result for identical prompts ── + cacheKey := promptHash(req.Prompt) + s.scanCacheMu.RLock() + if cached, ok := s.scanCache[cacheKey]; ok && time.Now().Before(cached.expiry) { + s.scanCacheMu.RUnlock() + resp := make(map[string]any, len(cached.response)+1) + for k, v := range cached.response { + resp[k] = v + } + resp["cached"] = true + slog.Debug("scan cache hit", "key", cacheKey[:8]) + writeJSON(w, http.StatusOK, resp) + return + } + s.scanCacheMu.RUnlock() + + // ── Concurrency limiter: queue up to 5s before 503 ── select { case s.scanSem <- struct{}{}: - defer func() { <-s.scanSem }() // Release slot when done - default: - // All scan slots busy → backpressure - w.Header().Set("Retry-After", "2") - slog.Warn("scan backpressure: all slots busy", "capacity", cap(s.scanSem)) - writeError(w, http.StatusServiceUnavailable, "scan engine busy — retry in 2 seconds") + defer func() { <-s.scanSem }() + case <-time.After(5 * time.Second): + // Waited 5s, still no slot → 503 + w.Header().Set("Retry-After", "3") + slog.Warn("scan backpressure: queue timeout", "capacity", cap(s.scanSem)) + writeError(w, http.StatusServiceUnavailable, "scan engine busy — retry in 3 seconds") + return + case <-r.Context().Done(): return } @@ -1536,18 +1555,39 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { scanCtx, cancel := context.WithTimeout(r.Context(), 30*time.Second) defer cancel() - // Run sentinel-core (54 Rust engines) + // ── Parallel scan: sentinel-core + shield run concurrently ── + // Latency = max(core, shield) instead of core + shield coreEngine := s.getEngine("sentinel-core") - coreResult, coreErr := coreEngine.ScanPrompt(scanCtx, req.Prompt) - - // Run Shield (C payload inspection) var shieldEng engines.Shield if s.shieldEngine != nil { shieldEng = s.shieldEngine } else { shieldEng = engines.NewStubShield() } - shieldResult, shieldErr := shieldEng.InspectTraffic(scanCtx, []byte(req.Prompt), nil) + + var ( + coreResult *engines.ScanResult + coreErr error + shieldResult *engines.ScanResult + shieldErr error + wg sync.WaitGroup + ) + + wg.Add(2) + + // Goroutine 1: sentinel-core (54 Rust engines) — the heavy path + go func() { + defer wg.Done() + coreResult, coreErr = coreEngine.ScanPrompt(scanCtx, req.Prompt) + }() + + // Goroutine 2: shield (C11 payload inspection) — lighter path + go func() { + defer wg.Done() + shieldResult, shieldErr = shieldEng.InspectTraffic(scanCtx, []byte(req.Prompt), nil) + }() + + wg.Wait() // Build response — merge both engines response := map[string]any{} @@ -1592,6 +1632,34 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) { response["latency_ms"] = float64(coreResult.Duration.Microseconds()) / 1000.0 response["shield_status"] = shieldStatus + // ── Store in cache (5 min TTL, evict oldest if >500 entries) ── + s.scanCacheMu.Lock() + if len(s.scanCache) >= 500 { + // Simple eviction: remove any expired entries + now := time.Now() + for k, v := range s.scanCache { + if now.After(v.expiry) { + delete(s.scanCache, k) + } + } + // If still full, clear oldest 25% + if len(s.scanCache) >= 500 { + i := 0 + for k := range s.scanCache { + delete(s.scanCache, k) + i++ + if i >= 125 { + break + } + } + } + } + s.scanCache[cacheKey] = &cachedScan{ + response: response, + expiry: time.Now().Add(5 * time.Minute), + } + s.scanCacheMu.Unlock() + writeJSON(w, http.StatusOK, response) }