mirror of
https://github.com/syntrex-lab/gomcp.git
synced 2026-06-23 15:38:05 +02:00
feat: SOC ghost sinkhole, rate limiter, RBAC, demo seed
This commit is contained in:
parent
cc7956d835
commit
b8097d3f1b
19 changed files with 1169 additions and 63 deletions
|
|
@ -1,6 +1,7 @@
|
|||
package soc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
|
@ -53,10 +54,11 @@ func TestLoadTest_SustainedThroughput(t *testing.T) {
|
|||
sources := []domsoc.EventSource{domsoc.SourceSentinelCore, domsoc.SourceShield, domsoc.SourceGoMCP}
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
latencies = make([]time.Duration, totalEvents)
|
||||
errors int64
|
||||
incidents int64
|
||||
wg sync.WaitGroup
|
||||
latencies = make([]time.Duration, totalEvents)
|
||||
realErrors int64
|
||||
backpressure int64
|
||||
incidents int64
|
||||
)
|
||||
|
||||
start := time.Now()
|
||||
|
|
@ -80,7 +82,11 @@ func TestLoadTest_SustainedThroughput(t *testing.T) {
|
|||
latencies[idx] = time.Since(t0)
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt64(&errors, 1)
|
||||
if errors.Is(err, domsoc.ErrCapacityFull) {
|
||||
atomic.AddInt64(&backpressure, 1)
|
||||
} else {
|
||||
atomic.AddInt64(&realErrors, 1)
|
||||
}
|
||||
}
|
||||
if inc != nil {
|
||||
atomic.AddInt64(&incidents, 1)
|
||||
|
|
@ -118,12 +124,13 @@ func TestLoadTest_SustainedThroughput(t *testing.T) {
|
|||
t.Logf(" Min: %s", latencies[0].Round(time.Microsecond))
|
||||
t.Logf(" Max: %s", latencies[len(latencies)-1].Round(time.Microsecond))
|
||||
t.Logf("───────────────────────────────────────────────")
|
||||
t.Logf(" Errors: %d (%.1f%%)", errors, float64(errors)/float64(totalEvents)*100)
|
||||
t.Logf(" Real Errors: %d (%.1f%%)", realErrors, float64(realErrors)/float64(totalEvents)*100)
|
||||
t.Logf(" Backpressure: %d (%.1f%%) [§20.1 semaphore]", backpressure, float64(backpressure)/float64(totalEvents)*100)
|
||||
t.Logf(" Incidents: %d", incidents)
|
||||
t.Logf("═══════════════════════════════════════════════")
|
||||
|
||||
// Assertions: basic sanity checks.
|
||||
require.Less(t, float64(errors)/float64(totalEvents), 0.05, "error rate should be < 5%%")
|
||||
// Assertions: backpressure rejections are expected; only real errors are failures.
|
||||
require.Less(t, float64(realErrors)/float64(totalEvents), 0.05, "real error rate should be < 5%")
|
||||
require.Greater(t, eventsPerSec, float64(100), "should sustain > 100 events/sec")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,6 +74,10 @@ type Service struct {
|
|||
|
||||
// P-1 FIX: In-memory sliding window for correlation (avoids DB query per ingest).
|
||||
recentEvents []domsoc.SOCEvent
|
||||
|
||||
// Scan semaphore (§20.1): limits concurrent ingest processing to prevent OOM.
|
||||
// Non-blocking acquire → returns ErrCapacityFull if all slots in use.
|
||||
scanSemaphore chan struct{}
|
||||
}
|
||||
|
||||
// NewService creates a SOC service with persistence and decision logging.
|
||||
|
|
@ -110,9 +114,15 @@ func NewService(repo domsoc.SOCRepository, logger *audit.DecisionLogger) *Servic
|
|||
anomaly: domsoc.NewAnomalyDetector(),
|
||||
threatIntelEngine: domsoc.NewThreatIntelEngine(),
|
||||
retention: domsoc.NewDataRetentionPolicy(),
|
||||
scanSemaphore: make(chan struct{}, 8), // §20.1: max 8 concurrent scans
|
||||
}
|
||||
}
|
||||
|
||||
// Repo returns the underlying SOC repository (used for demo seed injection).
|
||||
func (s *Service) Repo() domsoc.SOCRepository {
|
||||
return s.repo
|
||||
}
|
||||
|
||||
// AddCustomRules appends YAML-loaded custom correlation rules (§7.5).
|
||||
func (s *Service) AddCustomRules(rules []domsoc.SOCCorrelationRule) {
|
||||
s.mu.Lock()
|
||||
|
|
@ -375,6 +385,19 @@ func (s *Service) IngestEvent(event domsoc.SOCEvent) (string, *domsoc.Incident,
|
|||
return "", nil, fmt.Errorf("%w: sensor %s (max %d events/sec)", domsoc.ErrRateLimited, sensorID, MaxEventsPerSecondPerSensor)
|
||||
}
|
||||
|
||||
// Step 0.6: Scan semaphore — backpressure guard (§20.1)
|
||||
select {
|
||||
case s.scanSemaphore <- struct{}{}:
|
||||
defer func() { <-s.scanSemaphore }()
|
||||
default:
|
||||
if s.logger != nil {
|
||||
s.logger.Record(audit.ModuleSOC,
|
||||
"CAPACITY_FULL:REJECT",
|
||||
fmt.Sprintf("concurrent_scans=%d", cap(s.scanSemaphore)))
|
||||
}
|
||||
return "", nil, fmt.Errorf("%w: max %d concurrent scans", domsoc.ErrCapacityFull, cap(s.scanSemaphore))
|
||||
}
|
||||
|
||||
// Step 1: Log decision with Zero-G tagging (§13.4)
|
||||
if s.logger != nil {
|
||||
zeroGTag := ""
|
||||
|
|
@ -500,10 +523,14 @@ func (s *Service) isRateLimited(sensorID string) bool {
|
|||
pruned = append(pruned, ts)
|
||||
}
|
||||
}
|
||||
pruned = append(pruned, now)
|
||||
|
||||
rateLimited := len(pruned) >= MaxEventsPerSecondPerSensor
|
||||
if !rateLimited {
|
||||
pruned = append(pruned, now)
|
||||
}
|
||||
s.sensorRates[sensorID] = pruned
|
||||
|
||||
return len(pruned) > MaxEventsPerSecondPerSensor
|
||||
return rateLimited
|
||||
}
|
||||
|
||||
// updateSensor registers/updates sentinel sensor on event ingest (§11.3 auto-discovery).
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue