nyx/tests/fixtures/flask_app/app.py
Eli Peter f96a89e7c1
Feat/full cfg (#30)
* feat: Enhance control flow analysis with function summaries and taint analysis

* feat: Update taint analysis to utilize function summaries for enhanced tracking

* Refactor `walk.rs` batch processing and override handling:

- Renamed `Batcher` to `BatchSender` for clarity.
- Added `BatchSender::new` constructor for cleaner initialization.
- Simplified batch size management in `BatchSender`.
- Extracted `build_overrides` function for reusable override construction.
- Improved error handling and validation in override building.
- Enhanced performance with directory and file type filtering in `walk`.

* Improve logging and streamline directory walk process:

- Added detailed `tracing` logs for debugging batch flushes, override construction, and walk initialization/completion.
- Optimized and simplified `filter_entry` logic for directory and file type filters.
- Improved metadata checks and max file size enforcement during the scan.

* Refactor and optimize taint tracking, label rules, and directory walk process:

- Replaced `DefaultHasher` with `blake3::Hasher` for improved taint hashing.
- Enhanced sorting and hashing logic in `taint.rs` for consistency and efficiency.
- Removed unused `set_hash` function and redundant imports across files.
- Improved batch sender logic in `walk.rs`, renaming key components for clarity.
- Unified `spawn_senders` and `spawn_file_walker` with thread handling and channel tuple return.
- Expanded label rules with additional matchers for sources, sanitizers, and sinks.
- Deprecated `dump_cfg` and specific logging utilities in `cfg.rs` for code cleanup.

* fix: fixed let chains error in walk.rs

* fix: updated dependencies

* fix: updated dependencies

* chore: Remove standard error in scan.rs

* feat: Introduce function summaries for enhanced taint and control flow analysis

* feat: Enhance taint analysis with interop support and function summaries

* feat: Add configuration analysis module and enhance matcher rules

* feat: Add arity column to function_summaries and handle schema migration

* fix: fixed clippy &PathBuf warnings

* chore: Update dependencies and versioning in Cargo files

* docs: Update README to enhance clarity and detail on features and analysis modes

* chore: Update CHANGELOG for version 0.2.0 with new features, changes, and fixes

* docs: Update SECURITY.md to clarify version support status

---------

Co-authored-by: elipeter <eli.peter@es.fcm.travel>
2026-02-24 23:44:07 -05:00

115 lines
3.7 KiB
Python

import os
import subprocess
import sqlite3
import pickle
import shlex
# ───── Configuration ─────
DATABASE_PATH = os.getenv("DB_PATH", "/var/lib/app/data.db")
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/uploads")
REDIS_URL = os.getenv("REDIS_URL")
# ───── Request handlers ─────
def handle_admin_exec(request):
"""POST /admin/exec
Runs an admin command from environment config.
VULN: os.getenv flows into subprocess.run (command injection)
"""
admin_cmd = os.getenv("ADMIN_COMMAND")
result = subprocess.run(admin_cmd, shell=True, capture_output=True)
return {"status": result.returncode, "output": result.stdout.decode()}
def handle_report_generate(request):
"""POST /reports/generate
Generates a report by calling an external script.
VULN: os.getenv flows into subprocess.Popen
"""
script_path = os.getenv("REPORT_SCRIPT")
proc = subprocess.Popen(
[script_path, "--format", "pdf"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
return {"report": stdout.decode()}
def handle_eval_expression(request):
"""POST /api/eval
Evaluates a mathematical expression from user input.
VULN: request.form flows into eval (code injection)
"""
expression = request.form.get("expr")
result = eval(expression)
return {"result": result}
def handle_dynamic_import(request):
"""POST /api/plugins/load
Loads a plugin by executing its setup code.
VULN: request.json flows into exec (arbitrary code execution)
"""
plugin_code = request.json.get("setup_code")
exec(plugin_code)
return {"status": "loaded"}
def handle_search(request):
"""GET /api/search
Searches the database with user-supplied query.
VULN: request.args flows into cursor.execute (SQL injection)
"""
query = request.args.get("q")
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE name LIKE '%" + query + "%'")
rows = cursor.fetchall()
conn.close()
return {"results": rows}
def handle_lookup(request):
"""GET /api/lookup
Looks up a record by user-supplied ID.
VULN: request.args flows into os.popen (command injection)
"""
record_id = request.args.get("id")
output = os.popen("grep " + record_id + " /var/log/audit.log").read()
return {"matches": output}
def handle_backup(request):
"""POST /admin/backup
Creates a database backup.
VULN: os.environ flows into subprocess.call
"""
backup_dir = os.environ.get("BACKUP_DIR", "/backups")
subprocess.call(["pg_dump", "-f", backup_dir + "/dump.sql", REDIS_URL])
return {"status": "ok"}
# ───── Input handling ─────
def handle_interactive_setup():
"""Interactive setup wizard.
VULN: input() flows into os.system (command injection from stdin)
"""
db_host = input("Enter database host: ")
os.system("ping -c 1 " + db_host)
db_password = input("Enter database password: ")
return {"host": db_host, "password": db_password}
# ───── Safe patterns ─────
def handle_safe_exec():
"""SAFE: shlex.quote sanitizes before shell execution."""
user_dir = os.getenv("USER_DIR")
safe_dir = shlex.quote(user_dir)
subprocess.run(["ls", "-la", safe_dir], capture_output=True)
def handle_safe_search(request):
"""SAFE: parameterized query prevents SQL injection."""
query = request.args.get("q")
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE name LIKE ?", ("%" + query + "%",))
rows = cursor.fetchall()
conn.close()
return {"results": rows}