Release prep: 54 engines, self-hosted signatures, i18n, dashboard updates

This commit is contained in:
DmitrL-dev 2026-03-23 16:45:40 +10:00
parent 694e32be26
commit 41cbfd6e0a
178 changed files with 36008 additions and 399 deletions

View file

@ -0,0 +1,159 @@
package sidecar
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
domsoc "github.com/syntrex/gomcp/internal/domain/soc"
)
// BusClient sends security events to the SOC Event Bus via HTTP POST.
type BusClient struct {
baseURL string
sensorID string
apiKey string
httpClient *http.Client
maxRetries int
}
// NewBusClient creates a client for the SOC Event Bus.
func NewBusClient(baseURL, sensorID, apiKey string) *BusClient {
return &BusClient{
baseURL: baseURL,
sensorID: sensorID,
apiKey: apiKey,
httpClient: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 5,
},
},
maxRetries: 3,
}
}
// ingestPayload matches the SOC ingest API expected JSON.
type ingestPayload struct {
Source string `json:"source"`
SensorID string `json:"sensor_id"`
SensorKey string `json:"sensor_key,omitempty"`
Severity string `json:"severity"`
Category string `json:"category"`
Subcategory string `json:"subcategory,omitempty"`
Confidence float64 `json:"confidence"`
Description string `json:"description"`
SessionID string `json:"session_id,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// SendEvent posts a SOCEvent to the Event Bus.
// Accepts context for graceful cancellation during retries (L-2 fix).
func (c *BusClient) SendEvent(ctx context.Context, evt *domsoc.SOCEvent) error {
payload := ingestPayload{
Source: string(evt.Source),
SensorID: c.sensorID,
SensorKey: c.apiKey,
Severity: string(evt.Severity),
Category: evt.Category,
Subcategory: evt.Subcategory,
Confidence: evt.Confidence,
Description: evt.Description,
SessionID: evt.SessionID,
Metadata: evt.Metadata,
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("sidecar: marshal event: %w", err)
}
url := c.baseURL + "/api/v1/soc/events"
var lastErr error
for attempt := 0; attempt <= c.maxRetries; attempt++ {
if attempt > 0 {
// Context-aware backoff: cancellable during shutdown (H-1 fix).
backoff := time.Duration(attempt*attempt) * 500 * time.Millisecond
select {
case <-ctx.Done():
return fmt.Errorf("sidecar: send cancelled during retry: %w", ctx.Err())
case <-time.After(backoff):
}
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("sidecar: create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
lastErr = err
slog.Warn("sidecar: bus POST failed, retrying",
"attempt", attempt+1, "error", err)
continue
}
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
lastErr = fmt.Errorf("bus returned %d", resp.StatusCode)
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
// Client error — don't retry.
return lastErr
}
slog.Warn("sidecar: bus returned server error, retrying",
"attempt", attempt+1, "status", resp.StatusCode)
}
return fmt.Errorf("sidecar: exhausted retries: %w", lastErr)
}
// Heartbeat sends a sensor heartbeat to the Event Bus.
func (c *BusClient) Heartbeat() error {
payload := map[string]string{
"sensor_id": c.sensorID,
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("sidecar: marshal heartbeat: %w", err)
}
url := c.baseURL + "/api/soc/sensors/heartbeat"
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
return fmt.Errorf("heartbeat returned %d", resp.StatusCode)
}
// Healthy checks if the bus is reachable (M-4 fix: /healthz not /health).
func (c *BusClient) Healthy() bool {
resp, err := c.httpClient.Get(c.baseURL + "/healthz")
if err != nil {
return false
}
resp.Body.Close()
return resp.StatusCode == http.StatusOK
}

View file

@ -0,0 +1,214 @@
// Package sidecar implements the Universal Sidecar (§5.5) — a zero-dependency
// Go binary that runs alongside SENTINEL sensors, tails their STDOUT/logs,
// and pushes parsed security events to the SOC Event Bus.
package sidecar
import (
"log/slog"
"regexp"
"strconv"
"strings"
domsoc "github.com/syntrex/gomcp/internal/domain/soc"
)
// Parser converts a raw log line into a SOCEvent.
// Returns nil, false if the line is not a security event.
type Parser interface {
Parse(line string) (*domsoc.SOCEvent, bool)
}
// ── sentinel-core Parser ─────────────────────────────────────────────────────
// SentinelCoreParser parses sentinel-core detection output.
// Expected format: [DETECT] engine=<name> confidence=<float> pattern=<desc> [severity=<sev>]
type SentinelCoreParser struct{}
var coreDetectRe = regexp.MustCompile(
`\[DETECT\]\s+engine=(\S+)\s+confidence=([0-9.]+)\s+pattern=(.+?)(?:\s+severity=(\S+))?$`)
func (p *SentinelCoreParser) Parse(line string) (*domsoc.SOCEvent, bool) {
m := coreDetectRe.FindStringSubmatch(strings.TrimSpace(line))
if m == nil {
return nil, false
}
engine := m[1]
conf, _ := strconv.ParseFloat(m[2], 64)
pattern := m[3]
severity := mapConfidenceToSeverity(conf)
if m[4] != "" {
severity = domsoc.EventSeverity(strings.ToUpper(m[4]))
}
evt := domsoc.NewSOCEvent(domsoc.SourceSentinelCore, severity, engine,
engine+": "+pattern)
evt.Confidence = conf
evt.Subcategory = pattern
return &evt, true
}
// ── shield Parser ────────────────────────────────────────────────────────────
// ShieldParser parses shield network block logs.
// Expected format: BLOCKED protocol=<proto> reason=<reason> source_ip=<ip>
type ShieldParser struct{}
var shieldBlockRe = regexp.MustCompile(
`BLOCKED\s+protocol=(\S+)\s+reason=(.+?)\s+source_ip=(\S+)`)
func (p *ShieldParser) Parse(line string) (*domsoc.SOCEvent, bool) {
m := shieldBlockRe.FindStringSubmatch(strings.TrimSpace(line))
if m == nil {
return nil, false
}
protocol := m[1]
reason := m[2]
sourceIP := m[3]
evt := domsoc.NewSOCEvent(domsoc.SourceShield, domsoc.SeverityMedium, "network_block",
"Shield blocked "+protocol+" from "+sourceIP+": "+reason)
evt.Subcategory = protocol
evt.Metadata = map[string]string{
"source_ip": sourceIP,
"protocol": protocol,
"reason": reason,
}
return &evt, true
}
// ── immune Parser ────────────────────────────────────────────────────────────
// ImmuneParser parses immune system anomaly/response logs.
// Expected format: [ANOMALY] type=<type> score=<float> detail=<text>
//
// or: [RESPONSE] action=<action> target=<target> reason=<text>
type ImmuneParser struct{}
var immuneAnomalyRe = regexp.MustCompile(
`\[ANOMALY\]\s+type=(\S+)\s+score=([0-9.]+)\s+detail=(.+)`)
var immuneResponseRe = regexp.MustCompile(
`\[RESPONSE\]\s+action=(\S+)\s+target=(\S+)\s+reason=(.+)`)
func (p *ImmuneParser) Parse(line string) (*domsoc.SOCEvent, bool) {
trimmed := strings.TrimSpace(line)
if m := immuneAnomalyRe.FindStringSubmatch(trimmed); m != nil {
anomalyType := m[1]
score, _ := strconv.ParseFloat(m[2], 64)
detail := m[3]
evt := domsoc.NewSOCEvent(domsoc.SourceImmune, mapConfidenceToSeverity(score),
"anomaly", "Immune anomaly: "+anomalyType+": "+detail)
evt.Confidence = score
evt.Subcategory = anomalyType
return &evt, true
}
if m := immuneResponseRe.FindStringSubmatch(trimmed); m != nil {
action := m[1]
target := m[2]
reason := m[3]
evt := domsoc.NewSOCEvent(domsoc.SourceImmune, domsoc.SeverityHigh,
"immune_response", "Immune response: "+action+" on "+target+": "+reason)
evt.Subcategory = action
evt.Metadata = map[string]string{
"action": action,
"target": target,
"reason": reason,
}
return &evt, true
}
return nil, false
}
// ── Generic Parser ───────────────────────────────────────────────────────────
// GenericParser uses a configurable regex with named groups.
// Named groups: "category", "severity", "description", "confidence".
type GenericParser struct {
Pattern *regexp.Regexp
Source domsoc.EventSource
}
func NewGenericParser(pattern string, source domsoc.EventSource) (*GenericParser, error) {
re, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}
return &GenericParser{Pattern: re, Source: source}, nil
}
func (p *GenericParser) Parse(line string) (*domsoc.SOCEvent, bool) {
m := p.Pattern.FindStringSubmatch(strings.TrimSpace(line))
if m == nil {
return nil, false
}
names := p.Pattern.SubexpNames()
groups := map[string]string{}
for i, name := range names {
if i > 0 && name != "" {
groups[name] = m[i]
}
}
category := groups["category"]
if category == "" {
category = "generic"
}
description := groups["description"]
if description == "" {
description = line
}
severity := domsoc.SeverityMedium
if s, ok := groups["severity"]; ok && s != "" {
severity = domsoc.EventSeverity(strings.ToUpper(s))
}
confidence := 0.5
if c, ok := groups["confidence"]; ok {
if f, err := strconv.ParseFloat(c, 64); err == nil {
confidence = f
}
}
evt := domsoc.NewSOCEvent(p.Source, severity, category, description)
evt.Confidence = confidence
return &evt, true
}
// ── Helpers ──────────────────────────────────────────────────────────────────
// ParserForSensor returns the appropriate parser for a sensor type.
func ParserForSensor(sensorType string) Parser {
switch strings.ToLower(sensorType) {
case "sentinel-core":
return &SentinelCoreParser{}
case "shield":
return &ShieldParser{}
case "immune":
return &ImmuneParser{}
default:
slog.Warn("sidecar: unknown sensor type, using sentinel-core parser as fallback",
"sensor_type", sensorType)
return &SentinelCoreParser{} // fallback
}
}
func mapConfidenceToSeverity(conf float64) domsoc.EventSeverity {
switch {
case conf >= 0.9:
return domsoc.SeverityCritical
case conf >= 0.7:
return domsoc.SeverityHigh
case conf >= 0.5:
return domsoc.SeverityMedium
case conf >= 0.3:
return domsoc.SeverityLow
default:
return domsoc.SeverityInfo
}
}

View file

@ -0,0 +1,157 @@
package sidecar
import (
"context"
"fmt"
"io"
"log/slog"
"sync/atomic"
"time"
)
// Config holds sidecar runtime configuration.
type Config struct {
SensorType string // sentinel-core, shield, immune, generic
LogPath string // Path to sensor log file, or "stdin"
BusURL string // SOC Event Bus URL (e.g., http://localhost:9100)
SensorID string // Sensor registration ID
APIKey string // Sensor API key
PollInterval time.Duration // Log file poll interval
}
// Stats tracks sidecar runtime metrics (thread-safe via atomic).
type Stats struct {
LinesRead atomic.Int64
EventsSent atomic.Int64
Errors atomic.Int64
StartedAt time.Time
}
// StatsSnapshot is a non-atomic copy for reading/logging.
type StatsSnapshot struct {
LinesRead int64 `json:"lines_read"`
EventsSent int64 `json:"events_sent"`
Errors int64 `json:"errors"`
StartedAt time.Time `json:"started_at"`
}
// Sidecar is the main orchestrator: tailer → parser → bus client.
type Sidecar struct {
config Config
parser Parser
client *BusClient
tailer *Tailer
stats Stats
}
// New creates a Sidecar with the given config.
func New(cfg Config) *Sidecar {
return &Sidecar{
config: cfg,
parser: ParserForSensor(cfg.SensorType),
client: NewBusClient(cfg.BusURL, cfg.SensorID, cfg.APIKey),
tailer: NewTailer(cfg.PollInterval),
stats: Stats{StartedAt: time.Now()},
}
}
// Run starts the sidecar pipeline: tail → parse → send.
// Blocks until ctx is cancelled.
func (s *Sidecar) Run(ctx context.Context) error {
slog.Info("sidecar: starting",
"sensor_type", s.config.SensorType,
"log_path", s.config.LogPath,
"bus_url", s.config.BusURL,
"sensor_id", s.config.SensorID,
)
// Start line source.
var lines <-chan string
if s.config.LogPath == "stdin" || s.config.LogPath == "-" {
lines = s.tailer.FollowStdin(ctx)
} else {
var err error
lines, err = s.tailer.FollowFile(ctx, s.config.LogPath)
if err != nil {
return fmt.Errorf("sidecar: open log: %w", err)
}
}
// Heartbeat goroutine.
go s.heartbeatLoop(ctx)
// Main pipeline loop (shared with RunReader).
return s.processLines(ctx, lines)
}
// RunReader runs the sidecar from any io.Reader (for testing).
func (s *Sidecar) RunReader(ctx context.Context, r io.Reader) error {
lines := s.tailer.FollowReader(ctx, r)
return s.processLines(ctx, lines)
}
// processLines is the shared pipeline loop: parse → send → stats.
// Extracted to DRY between Run() and RunReader() (H-3 fix).
func (s *Sidecar) processLines(ctx context.Context, lines <-chan string) error {
for {
select {
case <-ctx.Done():
slog.Info("sidecar: shutting down",
"lines_read", s.stats.LinesRead.Load(),
"events_sent", s.stats.EventsSent.Load(),
"errors", s.stats.Errors.Load(),
)
return nil
case line, ok := <-lines:
if !ok {
slog.Info("sidecar: input closed")
return nil
}
s.stats.LinesRead.Add(1)
evt, ok := s.parser.Parse(line)
if !ok {
continue // Not a security event.
}
evt.SensorID = s.config.SensorID
if err := s.client.SendEvent(ctx, evt); err != nil {
s.stats.Errors.Add(1)
slog.Error("sidecar: send failed",
"error", err,
"category", evt.Category,
)
continue
}
s.stats.EventsSent.Add(1)
}
}
}
// GetStats returns a snapshot of current runtime metrics (thread-safe).
func (s *Sidecar) GetStats() StatsSnapshot {
return StatsSnapshot{
LinesRead: s.stats.LinesRead.Load(),
EventsSent: s.stats.EventsSent.Load(),
Errors: s.stats.Errors.Load(),
StartedAt: s.stats.StartedAt,
}
}
func (s *Sidecar) heartbeatLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.client.Heartbeat(); err != nil {
slog.Warn("sidecar: heartbeat failed", "error", err)
}
}
}
}

View file

@ -0,0 +1,306 @@
package sidecar
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
// ── Parser Tests ─────────────────────────────────────────────────────────────
func TestSentinelCoreParser(t *testing.T) {
p := &SentinelCoreParser{}
tests := []struct {
line string
wantOK bool
category string
confMin float64
}{
{"[DETECT] engine=jailbreak confidence=0.95 pattern=DAN prompt", true, "jailbreak", 0.9},
{"[DETECT] engine=injection confidence=0.6 pattern=ignore_previous", true, "injection", 0.5},
{"[DETECT] engine=exfiltration confidence=0.3 pattern=tool_call severity=HIGH", true, "exfiltration", 0.2},
{"INFO: Engine loaded successfully", false, "", 0},
{"", false, "", 0},
}
for _, tt := range tests {
evt, ok := p.Parse(tt.line)
if ok != tt.wantOK {
t.Errorf("Parse(%q) ok=%v, want %v", tt.line, ok, tt.wantOK)
continue
}
if !ok {
continue
}
if evt.Category != tt.category {
t.Errorf("Parse(%q) category=%q, want %q", tt.line, evt.Category, tt.category)
}
if evt.Confidence < tt.confMin {
t.Errorf("Parse(%q) confidence=%.2f, want >=%.2f", tt.line, evt.Confidence, tt.confMin)
}
}
}
func TestShieldParser(t *testing.T) {
p := &ShieldParser{}
tests := []struct {
line string
wantOK bool
proto string
ip string
}{
{"BLOCKED protocol=tcp reason=port_scan source_ip=192.168.1.100", true, "tcp", "192.168.1.100"},
{"BLOCKED protocol=udp reason=dns_exfil source_ip=10.0.0.5", true, "udp", "10.0.0.5"},
{"ALLOWED protocol=https from 1.2.3.4", false, "", ""},
{"", false, "", ""},
}
for _, tt := range tests {
evt, ok := p.Parse(tt.line)
if ok != tt.wantOK {
t.Errorf("Parse(%q) ok=%v, want %v", tt.line, ok, tt.wantOK)
continue
}
if !ok {
continue
}
if evt.Metadata["protocol"] != tt.proto {
t.Errorf("protocol=%q, want %q", evt.Metadata["protocol"], tt.proto)
}
if evt.Metadata["source_ip"] != tt.ip {
t.Errorf("source_ip=%q, want %q", evt.Metadata["source_ip"], tt.ip)
}
}
}
func TestImmuneParser(t *testing.T) {
p := &ImmuneParser{}
tests := []struct {
line string
wantOK bool
category string
}{
{"[ANOMALY] type=drift score=0.85 detail=behavior shift detected", true, "anomaly"},
{"[RESPONSE] action=quarantine target=session-123 reason=high risk", true, "immune_response"},
{"[INFO] system healthy", false, ""},
}
for _, tt := range tests {
evt, ok := p.Parse(tt.line)
if ok != tt.wantOK {
t.Errorf("Parse(%q) ok=%v, want %v", tt.line, ok, tt.wantOK)
continue
}
if !ok {
continue
}
if evt.Category != tt.category {
t.Errorf("category=%q, want %q", evt.Category, tt.category)
}
}
}
func TestGenericParser(t *testing.T) {
p, err := NewGenericParser(
`ALERT\s+(?P<category>\S+)\s+(?P<severity>\S+)\s+(?P<description>.+)`,
"external",
)
if err != nil {
t.Fatalf("NewGenericParser: %v", err)
}
evt, ok := p.Parse("ALERT injection HIGH suspicious sql in query string")
if !ok {
t.Fatal("expected match")
}
if evt.Category != "injection" {
t.Errorf("category=%q, want injection", evt.Category)
}
if string(evt.Severity) != "HIGH" {
t.Errorf("severity=%q, want HIGH", evt.Severity)
}
}
func TestParserForSensor(t *testing.T) {
tests := map[string]string{
"sentinel-core": "*sidecar.SentinelCoreParser",
"shield": "*sidecar.ShieldParser",
"immune": "*sidecar.ImmuneParser",
"unknown": "*sidecar.SentinelCoreParser", // fallback
}
for sensorType, wantType := range tests {
p := ParserForSensor(sensorType)
if p == nil {
t.Errorf("ParserForSensor(%q) returned nil", sensorType)
continue
}
gotType := fmt.Sprintf("%T", p)
if gotType != wantType {
t.Errorf("ParserForSensor(%q) = %s, want %s", sensorType, gotType, wantType)
}
}
}
// ── Tailer Tests ─────────────────────────────────────────────────────────────
func TestTailer_FollowReader(t *testing.T) {
input := "[DETECT] engine=jailbreak confidence=0.95 pattern=DAN\nINFO: done\n[DETECT] engine=exfil confidence=0.7 pattern=tool_call\n"
reader := strings.NewReader(input)
tailer := NewTailer(50 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ch := tailer.FollowReader(ctx, reader)
var lines []string
for line := range ch {
lines = append(lines, line)
}
if len(lines) != 3 {
t.Fatalf("expected 3 lines, got %d: %v", len(lines), lines)
}
if lines[0] != "[DETECT] engine=jailbreak confidence=0.95 pattern=DAN" {
t.Errorf("line[0]=%q", lines[0])
}
}
// ── BusClient Tests ──────────────────────────────────────────────────────────
func TestBusClient_SendEvent(t *testing.T) {
var received []map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/v1/soc/events" {
var payload map[string]any
json.NewDecoder(r.Body).Decode(&payload)
received = append(received, payload)
w.WriteHeader(http.StatusCreated)
return
}
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
client := NewBusClient(ts.URL, "test-sensor", "test-key")
p := &SentinelCoreParser{}
evt, ok := p.Parse("[DETECT] engine=jailbreak confidence=0.95 pattern=DAN")
if !ok {
t.Fatal("parse failed")
}
err := client.SendEvent(context.Background(), evt)
if err != nil {
t.Fatalf("SendEvent: %v", err)
}
if len(received) != 1 {
t.Fatalf("expected 1 received event, got %d", len(received))
}
if received[0]["source"] != "sentinel-core" {
t.Errorf("source=%v, want sentinel-core", received[0]["source"])
}
if received[0]["category"] != "jailbreak" {
t.Errorf("category=%v, want jailbreak", received[0]["category"])
}
if received[0]["sensor_id"] != "test-sensor" {
t.Errorf("sensor_id=%v, want test-sensor", received[0]["sensor_id"])
}
}
func TestBusClient_Healthy(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
client := NewBusClient(ts.URL, "s1", "k1")
if !client.Healthy() {
t.Error("expected healthy")
}
// Unreachable server.
client2 := NewBusClient("http://localhost:1", "s2", "k2")
if client2.Healthy() {
t.Error("expected unhealthy")
}
}
// ── E2E Pipeline Test ────────────────────────────────────────────────────────
func TestSidecar_E2E_Pipeline(t *testing.T) {
var receivedEvents []map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/soc/events":
var payload map[string]any
json.NewDecoder(r.Body).Decode(&payload)
receivedEvents = append(receivedEvents, payload)
w.WriteHeader(http.StatusCreated)
case "/health":
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
defer ts.Close()
input := strings.Join([]string{
"[DETECT] engine=jailbreak confidence=0.95 pattern=DAN",
"INFO: processing complete",
"[DETECT] engine=injection confidence=0.7 pattern=ignore_previous",
"DEBUG: internal state update",
"[DETECT] engine=exfiltration confidence=0.5 pattern=tool_call",
}, "\n")
cfg := Config{
SensorType: "sentinel-core",
LogPath: "stdin",
BusURL: ts.URL,
SensorID: "e2e-test-sensor",
APIKey: "test-key",
}
sc := New(cfg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := sc.RunReader(ctx, strings.NewReader(input))
if err != nil {
t.Fatalf("RunReader: %v", err)
}
stats := sc.GetStats()
if stats.LinesRead != 5 {
t.Errorf("LinesRead=%d, want 5", stats.LinesRead)
}
if stats.EventsSent != 3 {
t.Errorf("EventsSent=%d, want 3 (3 DETECT lines, 2 skipped)", stats.EventsSent)
}
if len(receivedEvents) != 3 {
t.Fatalf("received %d events, want 3", len(receivedEvents))
}
// Verify first event.
first := receivedEvents[0]
if first["category"] != "jailbreak" {
t.Errorf("first event category=%v, want jailbreak", first["category"])
}
if first["sensor_id"] != "e2e-test-sensor" {
t.Errorf("first event sensor_id=%v, want e2e-test-sensor", first["sensor_id"])
}
}

View file

@ -0,0 +1,162 @@
package sidecar
import (
"bufio"
"context"
"io"
"log/slog"
"os"
"time"
)
// Tailer follows a log file or stdin, emitting lines via a channel.
type Tailer struct {
pollInterval time.Duration
}
// NewTailer creates a Tailer with the given poll interval for file changes.
func NewTailer(pollInterval time.Duration) *Tailer {
if pollInterval <= 0 {
pollInterval = 200 * time.Millisecond
}
return &Tailer{pollInterval: pollInterval}
}
// FollowFile tails a file, seeking to end on start.
// Sends lines on the returned channel until ctx is cancelled.
func (t *Tailer) FollowFile(ctx context.Context, path string) (<-chan string, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
// Seek to end — only process new lines.
if _, err := f.Seek(0, io.SeekEnd); err != nil {
f.Close()
return nil, err
}
ch := make(chan string, 256)
go func() {
defer f.Close()
defer close(ch)
// H-2 fix: Use Scanner with 1MB max line size to prevent OOM.
const maxLineSize = 1 << 20 // 1MB
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
for {
select {
case <-ctx.Done():
return
default:
}
if scanner.Scan() {
line := scanner.Text()
if line != "" {
select {
case ch <- line:
case <-ctx.Done():
return
}
}
continue
}
// Scanner stopped — either EOF or error.
if err := scanner.Err(); err != nil {
slog.Error("sidecar: read error", "error", err)
return
}
// EOF — wait and check for rotation.
time.Sleep(t.pollInterval)
if t.fileRotated(f, path) {
slog.Info("sidecar: log rotated, reopening", "path", path)
f.Close()
newF, err := os.Open(path)
if err != nil {
slog.Error("sidecar: reopen failed", "path", path, "error", err)
return
}
f = newF
scanner = bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
} else {
// Same file, re-create scanner at current position.
scanner = bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
}
}
}()
return ch, nil
}
// FollowStdin reads from stdin line by line.
func (t *Tailer) FollowStdin(ctx context.Context) <-chan string {
ch := make(chan string, 256)
go func() {
defer close(ch)
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
}
line := scanner.Text()
if line != "" {
select {
case ch <- line:
case <-ctx.Done():
return
}
}
}
}()
return ch
}
// FollowReader reads from any io.Reader (for testing).
func (t *Tailer) FollowReader(ctx context.Context, r io.Reader) <-chan string {
ch := make(chan string, 256)
go func() {
defer close(ch)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
}
line := scanner.Text()
if line != "" {
select {
case ch <- line:
case <-ctx.Done():
return
}
}
}
}()
return ch
}
// fileRotated checks if the file path now points to a different inode.
func (t *Tailer) fileRotated(current *os.File, path string) bool {
curInfo, err1 := current.Stat()
newInfo, err2 := os.Stat(path)
if err1 != nil || err2 != nil {
return false
}
return !os.SameFile(curInfo, newInfo)
}