mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-06 19:35:13 +02:00
* feat: Enhance control flow analysis with function summaries and taint analysis * feat: Update taint analysis to utilize function summaries for enhanced tracking * Refactor `walk.rs` batch processing and override handling: - Renamed `Batcher` to `BatchSender` for clarity. - Added `BatchSender::new` constructor for cleaner initialization. - Simplified batch size management in `BatchSender`. - Extracted `build_overrides` function for reusable override construction. - Improved error handling and validation in override building. - Enhanced performance with directory and file type filtering in `walk`. * Improve logging and streamline directory walk process: - Added detailed `tracing` logs for debugging batch flushes, override construction, and walk initialization/completion. - Optimized and simplified `filter_entry` logic for directory and file type filters. - Improved metadata checks and max file size enforcement during the scan. * Refactor and optimize taint tracking, label rules, and directory walk process: - Replaced `DefaultHasher` with `blake3::Hasher` for improved taint hashing. - Enhanced sorting and hashing logic in `taint.rs` for consistency and efficiency. - Removed unused `set_hash` function and redundant imports across files. - Improved batch sender logic in `walk.rs`, renaming key components for clarity. - Unified `spawn_senders` and `spawn_file_walker` with thread handling and channel tuple return. - Expanded label rules with additional matchers for sources, sanitizers, and sinks. - Deprecated `dump_cfg` and specific logging utilities in `cfg.rs` for code cleanup. * fix: fixed let chains error in walk.rs * fix: updated dependencies * fix: updated dependencies * chore: Remove standard error in scan.rs * feat: Introduce function summaries for enhanced taint and control flow analysis * feat: Enhance taint analysis with interop support and function summaries * feat: Add configuration analysis module and enhance matcher rules * feat: Add arity column to function_summaries and handle schema migration * fix: fixed clippy &PathBuf warnings * chore: Update dependencies and versioning in Cargo files * docs: Update README to enhance clarity and detail on features and analysis modes * chore: Update CHANGELOG for version 0.2.0 with new features, changes, and fixes * docs: Update SECURITY.md to clarify version support status --------- Co-authored-by: elipeter <eli.peter@es.fcm.travel>
115 lines
3.7 KiB
Python
115 lines
3.7 KiB
Python
import os
|
|
import subprocess
|
|
import sqlite3
|
|
import pickle
|
|
import shlex
|
|
|
|
# ───── Configuration ─────
|
|
|
|
DATABASE_PATH = os.getenv("DB_PATH", "/var/lib/app/data.db")
|
|
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/uploads")
|
|
REDIS_URL = os.getenv("REDIS_URL")
|
|
|
|
# ───── Request handlers ─────
|
|
|
|
def handle_admin_exec(request):
|
|
"""POST /admin/exec
|
|
Runs an admin command from environment config.
|
|
VULN: os.getenv flows into subprocess.run (command injection)
|
|
"""
|
|
admin_cmd = os.getenv("ADMIN_COMMAND")
|
|
result = subprocess.run(admin_cmd, shell=True, capture_output=True)
|
|
return {"status": result.returncode, "output": result.stdout.decode()}
|
|
|
|
def handle_report_generate(request):
|
|
"""POST /reports/generate
|
|
Generates a report by calling an external script.
|
|
VULN: os.getenv flows into subprocess.Popen
|
|
"""
|
|
script_path = os.getenv("REPORT_SCRIPT")
|
|
proc = subprocess.Popen(
|
|
[script_path, "--format", "pdf"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
stdout, stderr = proc.communicate()
|
|
return {"report": stdout.decode()}
|
|
|
|
def handle_eval_expression(request):
|
|
"""POST /api/eval
|
|
Evaluates a mathematical expression from user input.
|
|
VULN: request.form flows into eval (code injection)
|
|
"""
|
|
expression = request.form.get("expr")
|
|
result = eval(expression)
|
|
return {"result": result}
|
|
|
|
def handle_dynamic_import(request):
|
|
"""POST /api/plugins/load
|
|
Loads a plugin by executing its setup code.
|
|
VULN: request.json flows into exec (arbitrary code execution)
|
|
"""
|
|
plugin_code = request.json.get("setup_code")
|
|
exec(plugin_code)
|
|
return {"status": "loaded"}
|
|
|
|
def handle_search(request):
|
|
"""GET /api/search
|
|
Searches the database with user-supplied query.
|
|
VULN: request.args flows into cursor.execute (SQL injection)
|
|
"""
|
|
query = request.args.get("q")
|
|
conn = sqlite3.connect(DATABASE_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM items WHERE name LIKE '%" + query + "%'")
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return {"results": rows}
|
|
|
|
def handle_lookup(request):
|
|
"""GET /api/lookup
|
|
Looks up a record by user-supplied ID.
|
|
VULN: request.args flows into os.popen (command injection)
|
|
"""
|
|
record_id = request.args.get("id")
|
|
output = os.popen("grep " + record_id + " /var/log/audit.log").read()
|
|
return {"matches": output}
|
|
|
|
def handle_backup(request):
|
|
"""POST /admin/backup
|
|
Creates a database backup.
|
|
VULN: os.environ flows into subprocess.call
|
|
"""
|
|
backup_dir = os.environ.get("BACKUP_DIR", "/backups")
|
|
subprocess.call(["pg_dump", "-f", backup_dir + "/dump.sql", REDIS_URL])
|
|
return {"status": "ok"}
|
|
|
|
# ───── Input handling ─────
|
|
|
|
def handle_interactive_setup():
|
|
"""Interactive setup wizard.
|
|
VULN: input() flows into os.system (command injection from stdin)
|
|
"""
|
|
db_host = input("Enter database host: ")
|
|
os.system("ping -c 1 " + db_host)
|
|
|
|
db_password = input("Enter database password: ")
|
|
return {"host": db_host, "password": db_password}
|
|
|
|
# ───── Safe patterns ─────
|
|
|
|
def handle_safe_exec():
|
|
"""SAFE: shlex.quote sanitizes before shell execution."""
|
|
user_dir = os.getenv("USER_DIR")
|
|
safe_dir = shlex.quote(user_dir)
|
|
subprocess.run(["ls", "-la", safe_dir], capture_output=True)
|
|
|
|
def handle_safe_search(request):
|
|
"""SAFE: parameterized query prevents SQL injection."""
|
|
query = request.args.get("q")
|
|
conn = sqlite3.connect(DATABASE_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM items WHERE name LIKE ?", ("%" + query + "%",))
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return {"results": rows}
|