mirror of
https://github.com/syntrex-lab/gomcp.git
synced 2026-05-09 19:42:37 +02:00
perf: full optimization pass for 10K battle
This commit is contained in:
parent
af945d5008
commit
5c00ffef75
2 changed files with 99 additions and 13 deletions
|
|
@ -6,12 +6,15 @@ package httpserver
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
shadowai "github.com/syntrex/gomcp/internal/application/shadow_ai"
|
||||
|
|
@ -41,6 +44,8 @@ type Server struct {
|
|||
wsHub *WSHub
|
||||
usageTracker *auth.UsageTracker
|
||||
scanSem chan struct{} // Limits concurrent CPU-heavy scans
|
||||
scanCache map[string]*cachedScan
|
||||
scanCacheMu sync.RWMutex
|
||||
sovereignEnabled bool
|
||||
sovereignMode string
|
||||
pprofEnabled bool
|
||||
|
|
@ -50,6 +55,18 @@ type Server struct {
|
|||
tlsKey string
|
||||
}
|
||||
|
||||
// cachedScan stores a cached scan result with expiry.
|
||||
type cachedScan struct {
|
||||
response map[string]any
|
||||
expiry time.Time
|
||||
}
|
||||
|
||||
// promptHash returns a fast SHA-256 hash of the prompt for cache keying.
|
||||
func promptHash(prompt string) string {
|
||||
h := sha256.Sum256([]byte(prompt))
|
||||
return hex.EncodeToString(h[:16]) // 128-bit is enough for cache key
|
||||
}
|
||||
|
||||
// New creates an HTTP server bound to the given port.
|
||||
func New(socSvc *appsoc.Service, port int) *Server {
|
||||
return &Server{
|
||||
|
|
@ -60,7 +77,8 @@ func New(socSvc *appsoc.Service, port int) *Server {
|
|||
metrics: NewMetrics(),
|
||||
logger: NewRequestLogger(true),
|
||||
wsHub: NewWSHub(),
|
||||
scanSem: make(chan struct{}, 4), // Max 4 concurrent scans (1 per CPU)
|
||||
scanSem: make(chan struct{}, 6), // Max 6 concurrent scans (~2 per CPU)
|
||||
scanCache: make(map[string]*cachedScan, 500),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
appsoc "github.com/syntrex/gomcp/internal/application/soc"
|
||||
|
|
@ -1520,15 +1521,33 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// ── Concurrency limiter: prevent OOM under burst load ──
|
||||
// ── Scan cache: return cached result for identical prompts ──
|
||||
cacheKey := promptHash(req.Prompt)
|
||||
s.scanCacheMu.RLock()
|
||||
if cached, ok := s.scanCache[cacheKey]; ok && time.Now().Before(cached.expiry) {
|
||||
s.scanCacheMu.RUnlock()
|
||||
resp := make(map[string]any, len(cached.response)+1)
|
||||
for k, v := range cached.response {
|
||||
resp[k] = v
|
||||
}
|
||||
resp["cached"] = true
|
||||
slog.Debug("scan cache hit", "key", cacheKey[:8])
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
return
|
||||
}
|
||||
s.scanCacheMu.RUnlock()
|
||||
|
||||
// ── Concurrency limiter: queue up to 5s before 503 ──
|
||||
select {
|
||||
case s.scanSem <- struct{}{}:
|
||||
defer func() { <-s.scanSem }() // Release slot when done
|
||||
default:
|
||||
// All scan slots busy → backpressure
|
||||
w.Header().Set("Retry-After", "2")
|
||||
slog.Warn("scan backpressure: all slots busy", "capacity", cap(s.scanSem))
|
||||
writeError(w, http.StatusServiceUnavailable, "scan engine busy — retry in 2 seconds")
|
||||
defer func() { <-s.scanSem }()
|
||||
case <-time.After(5 * time.Second):
|
||||
// Waited 5s, still no slot → 503
|
||||
w.Header().Set("Retry-After", "3")
|
||||
slog.Warn("scan backpressure: queue timeout", "capacity", cap(s.scanSem))
|
||||
writeError(w, http.StatusServiceUnavailable, "scan engine busy — retry in 3 seconds")
|
||||
return
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1536,18 +1555,39 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) {
|
|||
scanCtx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Run sentinel-core (54 Rust engines)
|
||||
// ── Parallel scan: sentinel-core + shield run concurrently ──
|
||||
// Latency = max(core, shield) instead of core + shield
|
||||
coreEngine := s.getEngine("sentinel-core")
|
||||
coreResult, coreErr := coreEngine.ScanPrompt(scanCtx, req.Prompt)
|
||||
|
||||
// Run Shield (C payload inspection)
|
||||
var shieldEng engines.Shield
|
||||
if s.shieldEngine != nil {
|
||||
shieldEng = s.shieldEngine
|
||||
} else {
|
||||
shieldEng = engines.NewStubShield()
|
||||
}
|
||||
shieldResult, shieldErr := shieldEng.InspectTraffic(scanCtx, []byte(req.Prompt), nil)
|
||||
|
||||
var (
|
||||
coreResult *engines.ScanResult
|
||||
coreErr error
|
||||
shieldResult *engines.ScanResult
|
||||
shieldErr error
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
// Goroutine 1: sentinel-core (54 Rust engines) — the heavy path
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
coreResult, coreErr = coreEngine.ScanPrompt(scanCtx, req.Prompt)
|
||||
}()
|
||||
|
||||
// Goroutine 2: shield (C11 payload inspection) — lighter path
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
shieldResult, shieldErr = shieldEng.InspectTraffic(scanCtx, []byte(req.Prompt), nil)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Build response — merge both engines
|
||||
response := map[string]any{}
|
||||
|
|
@ -1592,6 +1632,34 @@ func (s *Server) handlePublicScan(w http.ResponseWriter, r *http.Request) {
|
|||
response["latency_ms"] = float64(coreResult.Duration.Microseconds()) / 1000.0
|
||||
response["shield_status"] = shieldStatus
|
||||
|
||||
// ── Store in cache (5 min TTL, evict oldest if >500 entries) ──
|
||||
s.scanCacheMu.Lock()
|
||||
if len(s.scanCache) >= 500 {
|
||||
// Simple eviction: remove any expired entries
|
||||
now := time.Now()
|
||||
for k, v := range s.scanCache {
|
||||
if now.After(v.expiry) {
|
||||
delete(s.scanCache, k)
|
||||
}
|
||||
}
|
||||
// If still full, clear oldest 25%
|
||||
if len(s.scanCache) >= 500 {
|
||||
i := 0
|
||||
for k := range s.scanCache {
|
||||
delete(s.scanCache, k)
|
||||
i++
|
||||
if i >= 125 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
s.scanCache[cacheKey] = &cachedScan{
|
||||
response: response,
|
||||
expiry: time.Now().Add(5 * time.Minute),
|
||||
}
|
||||
s.scanCacheMu.Unlock()
|
||||
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue