mirror of
https://github.com/syntrex-lab/gomcp.git
synced 2026-05-09 11:32:37 +02:00
initial: Syntrex extraction from sentinel-community (615 files)
This commit is contained in:
commit
2c50c993b1
175 changed files with 32396 additions and 0 deletions
290
internal/transport/p2p/ws_transport.go
Normal file
290
internal/transport/p2p/ws_transport.go
Normal file
|
|
@ -0,0 +1,290 @@
|
|||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sentinel-community/gomcp/internal/domain/peer"
|
||||
)
|
||||
|
||||
// WSTransport provides WebSocket-based P2P communication (v3.5).
|
||||
// Enables real-time fact sync between GoMCP instances.
|
||||
type WSTransport struct {
|
||||
mu sync.RWMutex
|
||||
registry *peer.Registry
|
||||
listener net.Listener
|
||||
port int
|
||||
running bool
|
||||
onSync func(payload peer.SyncPayload) error // Callback for incoming syncs
|
||||
}
|
||||
|
||||
// WSConfig holds WebSocket transport configuration.
|
||||
type WSConfig struct {
|
||||
Port int `json:"port"` // Listen port (default: 9741)
|
||||
Host string `json:"host"` // Bind address (default: localhost)
|
||||
Enabled bool `json:"enabled"` // Enable WebSocket transport
|
||||
}
|
||||
|
||||
// NewWSTransport creates a new WebSocket transport.
|
||||
func NewWSTransport(cfg WSConfig, reg *peer.Registry) *WSTransport {
|
||||
if cfg.Port < 0 {
|
||||
cfg.Port = 9741
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "localhost"
|
||||
}
|
||||
return &WSTransport{
|
||||
registry: reg,
|
||||
port: cfg.Port,
|
||||
}
|
||||
}
|
||||
|
||||
// OnSync registers a callback for incoming sync payloads.
|
||||
func (t *WSTransport) OnSync(fn func(peer.SyncPayload) error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.onSync = fn
|
||||
}
|
||||
|
||||
// Message is the wire protocol for P2P communication.
|
||||
type Message struct {
|
||||
Type string `json:"type"` // "handshake", "sync", "delta_sync_req", "delta_sync_res", "ping", "pong"
|
||||
Payload json.RawMessage `json:"payload"` // Type-specific data
|
||||
From string `json:"from"` // Sender peer ID
|
||||
SentAt time.Time `json:"sent_at"`
|
||||
}
|
||||
|
||||
// Start begins listening for WebSocket connections.
|
||||
func (t *WSTransport) Start() error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/p2p", t.handleP2P)
|
||||
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(w, `{"status":"ok","peer_id":"%s","node":"%s"}`, t.registry.SelfID(), t.registry.NodeName())
|
||||
})
|
||||
|
||||
addr := fmt.Sprintf("localhost:%d", t.port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ws listen %s: %w", addr, err)
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
t.listener = listener
|
||||
t.running = true
|
||||
t.mu.Unlock()
|
||||
|
||||
log.Printf("ws-transport: listening on %s (peer=%s)", addr, t.registry.SelfID())
|
||||
|
||||
go func() {
|
||||
srv := &http.Server{Handler: mux}
|
||||
if err := srv.Serve(listener); err != nil && t.isRunning() {
|
||||
log.Printf("ws-transport: serve error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleP2P handles incoming WebSocket-like HTTP connections.
|
||||
// Uses simple HTTP POST for compatibility (true WebSocket upgrade optional in v3.6).
|
||||
func (t *WSTransport) handleP2P(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var msg Message
|
||||
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
|
||||
http.Error(w, "invalid message", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
switch msg.Type {
|
||||
case "ping":
|
||||
resp := Message{Type: "pong", From: t.registry.SelfID(), SentAt: time.Now()}
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
|
||||
case "handshake":
|
||||
var req peer.HandshakeRequest
|
||||
json.Unmarshal(msg.Payload, &req)
|
||||
// Process handshake through registry.
|
||||
respData, _ := json.Marshal(map[string]string{"status": "received", "peer_id": t.registry.SelfID()})
|
||||
resp := Message{Type: "handshake", From: t.registry.SelfID(), Payload: respData, SentAt: time.Now()}
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
|
||||
case "sync":
|
||||
var payload peer.SyncPayload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
http.Error(w, "invalid sync payload", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
t.mu.RLock()
|
||||
handler := t.onSync
|
||||
t.mu.RUnlock()
|
||||
if handler != nil {
|
||||
if err := handler(payload); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
resp := Message{
|
||||
Type: "sync",
|
||||
From: t.registry.SelfID(),
|
||||
Payload: json.RawMessage(fmt.Sprintf(`{"accepted":%d}`, len(payload.Facts))),
|
||||
SentAt: time.Now(),
|
||||
}
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
|
||||
case "delta_sync_req":
|
||||
var req peer.DeltaSyncRequest
|
||||
if err := json.Unmarshal(msg.Payload, &req); err != nil {
|
||||
http.Error(w, "invalid delta sync request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// Respond with empty for now — actual fact retrieval connected at startup.
|
||||
resp := peer.DeltaSyncResponse{
|
||||
FromPeerID: t.registry.SelfID(),
|
||||
SyncedAt: time.Now(),
|
||||
HasMore: false,
|
||||
}
|
||||
respData, _ := json.Marshal(resp)
|
||||
json.NewEncoder(w).Encode(Message{Type: "delta_sync_res", From: t.registry.SelfID(), Payload: respData, SentAt: time.Now()})
|
||||
|
||||
default:
|
||||
http.Error(w, "unknown message type", http.StatusBadRequest)
|
||||
}
|
||||
}
|
||||
|
||||
// SendSync sends a sync payload to a remote peer via HTTP POST.
|
||||
func (t *WSTransport) SendSync(ctx context.Context, addr string, payload peer.SyncPayload) error {
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg := Message{
|
||||
Type: "sync",
|
||||
From: t.registry.SelfID(),
|
||||
Payload: data,
|
||||
SentAt: time.Now(),
|
||||
}
|
||||
return t.send(ctx, addr, msg)
|
||||
}
|
||||
|
||||
// SendDeltaSync sends a delta sync request to a remote peer.
|
||||
func (t *WSTransport) SendDeltaSync(ctx context.Context, addr string, req peer.DeltaSyncRequest) (*peer.DeltaSyncResponse, error) {
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg := Message{
|
||||
Type: "delta_sync_req",
|
||||
From: t.registry.SelfID(),
|
||||
Payload: data,
|
||||
SentAt: time.Now(),
|
||||
}
|
||||
|
||||
respMsg, err := t.sendAndReceive(ctx, addr, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp peer.DeltaSyncResponse
|
||||
if err := json.Unmarshal(respMsg.Payload, &resp); err != nil {
|
||||
return nil, fmt.Errorf("decode delta response: %w", err)
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// Ping checks if a remote peer is alive.
|
||||
func (t *WSTransport) Ping(ctx context.Context, addr string) (peerID string, err error) {
|
||||
msg := Message{Type: "ping", From: t.registry.SelfID(), SentAt: time.Now()}
|
||||
resp, err := t.sendAndReceive(ctx, addr, msg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return resp.From, nil
|
||||
}
|
||||
|
||||
// Stop shuts down the transport.
|
||||
func (t *WSTransport) Stop() error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.running = false
|
||||
if t.listener != nil {
|
||||
return t.listener.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr returns the listen address.
|
||||
func (t *WSTransport) Addr() string {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
if t.listener != nil {
|
||||
return t.listener.Addr().String()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (t *WSTransport) isRunning() bool {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.running
|
||||
}
|
||||
|
||||
func (t *WSTransport) send(ctx context.Context, addr string, msg Message) error {
|
||||
_, err := t.sendAndReceive(ctx, addr, msg)
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *WSTransport) sendAndReceive(_ context.Context, addr string, msg Message) (*Message, error) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/p2p", addr)
|
||||
|
||||
resp, err := client.Post(url, "application/json", jsonReader(data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p send to %s: %w", addr, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("p2p %s returned %d", addr, resp.StatusCode)
|
||||
}
|
||||
|
||||
var respMsg Message
|
||||
if err := json.NewDecoder(resp.Body).Decode(&respMsg); err != nil {
|
||||
return nil, fmt.Errorf("decode response from %s: %w", addr, err)
|
||||
}
|
||||
return &respMsg, nil
|
||||
}
|
||||
|
||||
func jsonReader(data []byte) *jsonBody { return &jsonBody{data: data} }
|
||||
|
||||
type jsonBody struct {
|
||||
data []byte
|
||||
off int
|
||||
}
|
||||
|
||||
func (j *jsonBody) Read(p []byte) (n int, err error) {
|
||||
if j.off >= len(j.data) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n = copy(p, j.data[j.off:])
|
||||
j.off += n
|
||||
return n, nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue