mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-12 19:55:14 +02:00
Performance and precision pass (#64)
This commit is contained in:
parent
c7c5e0f3a1
commit
fb698d2c27
97 changed files with 9932 additions and 517 deletions
|
|
@ -0,0 +1,44 @@
|
|||
"""Recall guard for the Phase 1 caller-scope IPA fix.
|
||||
|
||||
Same shape as `safe_caller_scope_helper_under_authorized_route.py`, but
|
||||
the router carries no route-level auth dep (`router = APIRouter()`).
|
||||
The helper's `session.add` is reached from a route handler with no
|
||||
authorization, so the engine MUST still fire
|
||||
`missing_ownership_check` (and `token_override_without_validation`)
|
||||
on the helper's sink.
|
||||
|
||||
Triggers `apply_caller_scope_propagation`'s soundness rule: a helper's
|
||||
caller list must contain at least one caller with route-level non-Login
|
||||
auth checks. When no caller is authorized, no propagation happens and
|
||||
the helper's sinks fire as expected.
|
||||
"""
|
||||
from typing import Annotated
|
||||
from uuid import UUID
|
||||
from fastapi import APIRouter, Body
|
||||
|
||||
|
||||
# Bare router — no Security dep at the boundary.
|
||||
ti_id_router = APIRouter()
|
||||
|
||||
|
||||
def _create_state_update(
|
||||
*,
|
||||
task_instance_id: UUID,
|
||||
payload: dict,
|
||||
session,
|
||||
) -> None:
|
||||
if payload.get("kind") == "reschedule":
|
||||
session.add({"id": task_instance_id, "data": payload})
|
||||
|
||||
|
||||
@ti_id_router.patch("/{task_instance_id}/state")
|
||||
def ti_update_state(
|
||||
task_instance_id: UUID,
|
||||
payload: Annotated[dict, Body()],
|
||||
session,
|
||||
) -> None:
|
||||
_create_state_update(
|
||||
task_instance_id=task_instance_id,
|
||||
payload=payload,
|
||||
session=session,
|
||||
)
|
||||
|
|
@ -1,19 +1,26 @@
|
|||
"""
|
||||
Vulnerable counterpart to safe_fastapi_route_dependencies_auth.py: same
|
||||
shape but with NO `dependencies=[Depends(...)]` keyword arg on the route
|
||||
decorator. The FastAPI ownership-check rule must still fire — the
|
||||
recognizer must not blanket-suppress every FastAPI route, only those
|
||||
with an actual dependency-injected auth check.
|
||||
FastAPI route shape but with NO `dependencies=[Depends(...)]` keyword
|
||||
arg on the route decorator. The ownership-check rule must still fire
|
||||
— the dependency-injection recogniser must not blanket-suppress every
|
||||
FastAPI route, only those with an actual dependency-injected auth
|
||||
check.
|
||||
|
||||
Sink uses a qualified Django-style ORM call so the post-fix
|
||||
classifier still recognises it (`receiver_is_simple_chain` requires a
|
||||
non-chained receiver dot).
|
||||
"""
|
||||
from fastapi import FastAPI
|
||||
|
||||
router = FastAPI()
|
||||
|
||||
|
||||
class Connection:
|
||||
objects = None
|
||||
|
||||
|
||||
@router.delete("/{connection_id}")
|
||||
def delete_connection(connection_id: str, session):
|
||||
def delete_connection(connection_id: str):
|
||||
"""No auth — must still fire missing_ownership_check."""
|
||||
connection = session.scalar(select(Connection).filter_by(conn_id=connection_id))
|
||||
if connection is None:
|
||||
raise HTTPException(404, "not found")
|
||||
session.delete(connection)
|
||||
Connection.objects.filter(id=connection_id).delete()
|
||||
return {"ok": True}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,27 @@
|
|||
"""SQLAlchemy variant of vuln_fastapi_route_no_dependencies.py: same FastAPI
|
||||
route shape with NO `dependencies=[Depends(...)]` keyword arg, but the sink
|
||||
is a real-world airflow-style SQLAlchemy queryset chain
|
||||
`session.scalar(select(C).filter_by(conn_id=user_input))`.
|
||||
|
||||
Pre-fix the chain reduced to bare `["filter_by"]` and was suppressed by
|
||||
`receiver_is_simple_chain`, blocking recall on this real-repo airflow shape.
|
||||
The member_chain Python `function`-field traversal + `db_query_builder_roots`
|
||||
extension restores recall.
|
||||
|
||||
Recall guard: ownership-check rule must fire on the chained query — the
|
||||
caller has no auth check.
|
||||
"""
|
||||
from fastapi import FastAPI
|
||||
from sqlalchemy import select
|
||||
|
||||
router = FastAPI()
|
||||
|
||||
|
||||
class Connection:
|
||||
pass
|
||||
|
||||
|
||||
@router.delete("/{connection_id}")
|
||||
def delete_connection(connection_id: str, session):
|
||||
"""No auth — must fire missing_ownership_check on the chained query."""
|
||||
return session.scalar(select(Connection).filter_by(conn_id=connection_id))
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
"""Recall counterpart to safe_fastapi_route_security_scopes.py.
|
||||
|
||||
Precision guard for the Security-without-scopes path: a bare
|
||||
`Security(callable)` with no `scopes=[...]` kwarg, or with an empty
|
||||
`scopes=[]`, is NOT promoted from LoginGuard to AuthorizationCheck —
|
||||
the OAuth2 scope semantic only fires when scopes is non-empty. Without
|
||||
scope enforcement the wrapper is functionally equivalent to
|
||||
`Depends(callable)` plus a bare login check, so `missing_ownership_check`
|
||||
must still fire on a downstream id-targeted ORM filter.
|
||||
|
||||
Recall guard: ownership-check rule must fire — Security with no scopes
|
||||
is conservative (treated as login-only), so the route is not promoted
|
||||
to authorized.
|
||||
"""
|
||||
from fastapi import FastAPI, Security
|
||||
|
||||
|
||||
def require_auth():
|
||||
pass
|
||||
|
||||
|
||||
router = FastAPI()
|
||||
|
||||
|
||||
class TaskInstance:
|
||||
pass
|
||||
|
||||
|
||||
@router.patch(
|
||||
"/{task_instance_id}/run",
|
||||
dependencies=[Security(require_auth, scopes=[])],
|
||||
)
|
||||
def ti_run(task_instance_id: str, session):
|
||||
return session.scalar(select(TaskInstance).filter_by(id=task_instance_id))
|
||||
|
||||
|
||||
def select(_):
|
||||
pass
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
"""Recall guard for the router-level Security-prop fix. When a router
|
||||
is declared with NO `dependencies=` kwarg (`router = APIRouter(...)`),
|
||||
attached routes that don't supply inline deps are genuinely
|
||||
unauthorized — the engine must still flag id-targeted writes as
|
||||
`missing_ownership_check`. Without the gate the router-level extractor
|
||||
would over-fire by treating every router as auth-providing.
|
||||
|
||||
Distilled from airflow
|
||||
`task_instances.py:1036-1082` where `router = VersionedAPIRouter()`
|
||||
(bare, no deps) attaches `@router.get("/states", ...)` — the route is
|
||||
auth-attached only via the cross-file `include_router` chain in
|
||||
`routes/__init__.py`, which is a separate gap (see deep_engine_fixes.md).
|
||||
For the per-file case where the router has no router-level deps
|
||||
declared, the route is correctly an un-guarded ownership-check FN.
|
||||
"""
|
||||
from cadwyn import VersionedAPIRouter
|
||||
|
||||
|
||||
# Bare router — no router-level dependencies declared.
|
||||
router = VersionedAPIRouter()
|
||||
|
||||
|
||||
class TaskInstance:
|
||||
pass
|
||||
|
||||
|
||||
@router.get("/states/{run_id}/{task_id}")
|
||||
def get_task_instance_states(run_id: str, task_id: str, session):
|
||||
rows = session.scalars(
|
||||
select(TaskInstance)
|
||||
.where(TaskInstance.run_id == run_id)
|
||||
.where(TaskInstance.task_id == task_id)
|
||||
).all()
|
||||
[
|
||||
run_id_task_state_map[task.run_id].update(
|
||||
{task.task_id: task.state}
|
||||
)
|
||||
for task in rows
|
||||
]
|
||||
|
||||
|
||||
def select(_):
|
||||
pass
|
||||
|
||||
|
||||
run_id_task_state_map = {}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
# py-auth-realrepo-XXX (vuln pair): same bare-`set()` / `dict()` /
|
||||
# `defaultdict()` local collection shape as
|
||||
# safe_local_set_update_no_orm.py, but the helper *also* runs an
|
||||
# id-targeted ORM query whose filter argument is a user-supplied id
|
||||
# (`team_id` in the function signature, no caller-scope-entity
|
||||
# exemption applies).
|
||||
#
|
||||
# Recall guard: the bare-callee constructor recogniser must only
|
||||
# suppress the InMemoryLocal `.update` / `.add` calls — the
|
||||
# id-targeted ORM `.filter(id=team_id)` must still fire
|
||||
# `py.auth.missing_ownership_check`.
|
||||
class Team:
|
||||
pass
|
||||
|
||||
|
||||
def get_team_with_history(request, team_id):
|
||||
seen_ids = set()
|
||||
audit = dict()
|
||||
seen_ids.add(team_id)
|
||||
audit["team"] = team_id
|
||||
|
||||
return Team.objects.filter(id=team_id).first()
|
||||
|
||||
|
||||
def archive_team(request, team_id):
|
||||
pending = set()
|
||||
pending.add(team_id)
|
||||
Team.objects.filter(id=team_id).delete()
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
# py-path-traversal-no-relative-to: regression guard companion to
|
||||
# safe_relative_to_validator.py. Same source/sink shape but without
|
||||
# the `filepath.relative_to(base)` validator — taint must propagate.
|
||||
from pathlib import Path
|
||||
|
||||
from flask import request, send_file
|
||||
|
||||
|
||||
def download() -> None:
|
||||
base = Path("/var/www/static")
|
||||
rel_url = request.args.get("path")
|
||||
filepath = base.joinpath(rel_url).resolve()
|
||||
send_file(str(filepath))
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
# py-auth-realrepo-011: bare-identifier callees without a receiver dot
|
||||
# are never DB / ORM operations. Distilled from sentry
|
||||
# `src/sentry/tasks/statistical_detectors.py` (line 743:
|
||||
# `org_ids = list({p.organization_id for p in projects})`),
|
||||
# `src/sentry/utils/query.py:90` (`events = list(method(...))`),
|
||||
# `src/sentry/api/helpers/group_index/delete.py` (bare `delete_group_list`,
|
||||
# `create_audit_entry`, `create_audit_entries` helper calls), and
|
||||
# `src/sentry/seer/autofix/coding_agent.py` (bare `update_coding_agent_state`).
|
||||
#
|
||||
# Before the fix, the verb-name fallback in `classify_sink_class`
|
||||
# matched bare callees `list`, `filter`, `update`, `create`, `add`,
|
||||
# `delete` against the Python read/mutation indicator vocabulary and
|
||||
# classified them as `DbCrossTenantRead` / `DbMutation`. Combined with
|
||||
# the user-input-evidence precondition (`request: Request` triggers it),
|
||||
# every internal helper firing one of these builtins / locally-defined
|
||||
# helpers produced a `py.auth.missing_ownership_check` finding.
|
||||
#
|
||||
# A real ORM / DB call always carries a receiver
|
||||
# (`Model.objects.filter(...)`, `repo.find(id)`, `db.query(...)`); a
|
||||
# bare-identifier callee is a Python builtin or a locally-defined
|
||||
# helper, neither of which has the cross-tenant read / mutation
|
||||
# semantics the rule is checking for. The fix gates the verb fallback
|
||||
# on `receiver_is_simple_chain(callee)` (callee contains a dot AND the
|
||||
# receiver isn't itself a call expression).
|
||||
from typing import Any, Iterable
|
||||
|
||||
|
||||
def fetch_continuous_examples(raw_examples):
|
||||
# `list(...)` is a Python builtin — no DB op happens here.
|
||||
project_ids = list({pid for pid, _ in raw_examples.keys()})
|
||||
return project_ids
|
||||
|
||||
|
||||
def detect_function_change_points(projects, start, transactions_per_project=None):
|
||||
# Bare `list({...})` set-comprehension materialisation; both args
|
||||
# come from internally-supplied `projects` collection iteration.
|
||||
org_ids = list({p.organization_id for p in projects})
|
||||
project_ids = list({p.id for p in projects})
|
||||
return org_ids, project_ids
|
||||
|
||||
|
||||
def delete_group_list(request, project, group_list, delete_type):
|
||||
# Bare-name local helper invocation — `create_audit_entries` is a
|
||||
# function defined in the same module, not a DB write. Used to
|
||||
# fire `py.auth.missing_ownership_check`.
|
||||
transaction_id = "tx"
|
||||
create_audit_entries(request, project, group_list, delete_type, transaction_id)
|
||||
|
||||
|
||||
def create_audit_entries(request, project, group_list, delete_type, transaction_id):
|
||||
for group in group_list:
|
||||
# Bare `create_audit_entry` is a helper, not a DB INSERT.
|
||||
create_audit_entry(
|
||||
request=request,
|
||||
target_object=group.id,
|
||||
event="ISSUE_DELETE",
|
||||
data={"issue_id": group.id, "project_slug": project.slug},
|
||||
)
|
||||
|
||||
|
||||
def create_audit_entry(**kwargs):
|
||||
pass
|
||||
|
||||
|
||||
def update_coding_agent_state(state, action):
|
||||
# Bare `update_*` helper called inside an outer task — Python lets
|
||||
# you name local helpers freely, the verb prefix does not imply a
|
||||
# DB mutation.
|
||||
pass
|
||||
|
||||
|
||||
def materialise_filter_chain(events: Iterable[Any]):
|
||||
# `filter(...)` is the Python builtin (`Iterable.filter`), and the
|
||||
# bare-name local helper pattern below is endemic in real-repo
|
||||
# Python code.
|
||||
return list(filter(lambda e: e is not None, events))
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
"""Distilled from airflow
|
||||
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:516-628`:
|
||||
The route handler `ti_update_state` is route-level authorized via the
|
||||
`ti_id_router = VersionedAPIRouter(dependencies=[Security(require_auth,
|
||||
scopes=["ti:self"])])` declaration (closed by the session-0010 fix).
|
||||
The handler then delegates the actual `session.add(TaskReschedule(...))`
|
||||
sink to a private helper `_create_ti_state_update_query_and_update_state`
|
||||
that has no inline auth check of its own.
|
||||
|
||||
Pre-fix the helper fired `missing_ownership_check` +
|
||||
`token_override_without_validation` at the helper's body sink because
|
||||
`check_ownership_gaps` is scoped per AnalysisUnit — the caller's
|
||||
route-level auth check did not propagate to the callee.
|
||||
|
||||
The Phase 1 caller-scope IPA fix (`apply_caller_scope_propagation` in
|
||||
`src/auth_analysis/mod.rs`) walks the call graph DOWN: when every
|
||||
in-file caller of a helper carries route-level non-Login auth
|
||||
(Other / Membership / Ownership / AdminGuard), the helper inherits the
|
||||
caller's checks via synthetic `is_route_level=true` AuthChecks. This
|
||||
lifts the airflow shape exactly, both findings cleared post-fix.
|
||||
|
||||
Precision guard: helper must NOT fire `missing_ownership_check` or
|
||||
`token_override_without_validation` despite holding the auth-relevant
|
||||
sinks (`session.add` with caller-passed scoped id).
|
||||
"""
|
||||
from typing import Annotated
|
||||
from uuid import UUID
|
||||
from fastapi import APIRouter, Body, Security
|
||||
|
||||
|
||||
def require_auth():
|
||||
pass
|
||||
|
||||
|
||||
# Router-level Security carries the JWT scope check on every attached
|
||||
# route at runtime. Closes the prior session-0010 gap.
|
||||
ti_id_router = APIRouter(
|
||||
dependencies=[Security(require_auth, scopes=["ti:self"])],
|
||||
)
|
||||
|
||||
|
||||
def _create_state_update(
|
||||
*,
|
||||
task_instance_id: UUID,
|
||||
payload: dict,
|
||||
session,
|
||||
) -> None:
|
||||
"""Helper: caller-scope IPA must propagate route-level auth into here."""
|
||||
if payload.get("kind") == "reschedule":
|
||||
session.add({"id": task_instance_id, "data": payload})
|
||||
|
||||
|
||||
@ti_id_router.patch("/{task_instance_id}/state")
|
||||
def ti_update_state(
|
||||
task_instance_id: UUID,
|
||||
payload: Annotated[dict, Body()],
|
||||
session,
|
||||
) -> None:
|
||||
_create_state_update(
|
||||
task_instance_id=task_instance_id,
|
||||
payload=payload,
|
||||
session=session,
|
||||
)
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
"""Distilled from airflow
|
||||
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:101-117`:
|
||||
FastAPI route declares its auth dependency as
|
||||
`dependencies=[Security(require_auth, scopes=["token:execution"])]`.
|
||||
`Security(...)` is FastAPI's OAuth2-scope-checked variant of `Depends(...)`
|
||||
— the JWT must carry one of the listed scopes, so the route is fully
|
||||
authorized at the boundary.
|
||||
|
||||
Pre-fix `is_depends_callee` only matched `Depends`; `Security(...)` was
|
||||
ignored, leaving the route as if no auth dep were declared. Even after
|
||||
recognising the marker, `require_auth` is a registered login-guard, and a
|
||||
`LoginGuard` AuthCheckKind would have been filtered by
|
||||
`has_prior_subject_auth` — the route would still fire
|
||||
`missing_ownership_check`. The deeper fix promotes a scoped Security
|
||||
wrapper to `AuthCheckKind::Other` so the route counts as authorized for
|
||||
ownership / membership checks at any sink the handler reaches.
|
||||
|
||||
Precision guard: route must NOT fire `missing_ownership_check` even
|
||||
though the handler does an id-targeted ORM filter.
|
||||
"""
|
||||
from fastapi import FastAPI, Security
|
||||
|
||||
|
||||
def require_auth(scopes):
|
||||
pass
|
||||
|
||||
|
||||
router = FastAPI()
|
||||
|
||||
|
||||
class TaskInstance:
|
||||
pass
|
||||
|
||||
|
||||
@router.patch(
|
||||
"/{task_instance_id}/run",
|
||||
dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])],
|
||||
)
|
||||
def ti_run(task_instance_id: str, session):
|
||||
return session.scalar(select(TaskInstance).filter_by(id=task_instance_id))
|
||||
|
||||
|
||||
def select(_):
|
||||
pass
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
"""Distilled from airflow
|
||||
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:89-318`:
|
||||
FastAPI declares its auth dependency once at the router constructor —
|
||||
`ti_id_router = VersionedAPIRouter(dependencies=[Security(require_auth,
|
||||
scopes=["ti:self"])])` — and every per-task route attaches via
|
||||
`@ti_id_router.<verb>(...)` with no inline deps. FastAPI propagates
|
||||
router-level dependencies to every attached route at runtime, so the
|
||||
JWT-validated scope check guards every `session.add` / row-update sink
|
||||
the handler body reaches.
|
||||
|
||||
Pre-fix the FastAPI dep extractor only walked the per-route decorator's
|
||||
`dependencies=[...]` kwarg; router-constructor `dependencies=` was
|
||||
dropped, so every `@ti_id_router.<verb>` route without inline deps fired
|
||||
`missing_ownership_check` + `token_override_without_validation` despite
|
||||
being authorized.
|
||||
|
||||
The fix walks module-level `<router> = APIRouter(...)` /
|
||||
`VersionedAPIRouter(...)` / `FastAPI(...)` assignments, captures the
|
||||
router's `dependencies=[...]` into a per-router map, and merges them
|
||||
into the per-route middleware list when the decorator's prefix matches.
|
||||
A scoped Security wrapper synthesises matching TokenExpiry +
|
||||
TokenRecipient checks (the JWT-validation semantics) so the
|
||||
token-override rule recognises the route too.
|
||||
|
||||
Precision guard: route must NOT fire `missing_ownership_check` /
|
||||
`token_override_without_validation` even though the handler writes
|
||||
through an id-targeted state update.
|
||||
"""
|
||||
from fastapi import Security
|
||||
from cadwyn import VersionedAPIRouter
|
||||
|
||||
|
||||
def require_auth(scopes):
|
||||
pass
|
||||
|
||||
|
||||
# Router-level Security with non-empty scopes. Every route attached to
|
||||
# this router inherits the dep; no inline declaration needed.
|
||||
ti_id_router = VersionedAPIRouter(
|
||||
dependencies=[
|
||||
Security(require_auth, scopes=["ti:self"]),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class Log:
|
||||
pass
|
||||
|
||||
|
||||
class TaskInstance:
|
||||
pass
|
||||
|
||||
|
||||
@ti_id_router.patch("/{task_instance_id}/state")
|
||||
def ti_update_state(task_instance_id: str, session):
|
||||
session.add(
|
||||
Log(
|
||||
task_instance_id=task_instance_id,
|
||||
event="state_update",
|
||||
)
|
||||
)
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
# py-auth-realrepo-XXX: bare-callee Python container constructors
|
||||
# (`set()` / `dict()` / `defaultdict()`) bind a non-sink local
|
||||
# collection. Subsequent method calls on the bound var
|
||||
# (`verified_ids.update(..)`, `cache[k] = v`, `requested_teams.add(..)`)
|
||||
# are in-memory mutations, not ORM/DB writes, so the route handler
|
||||
# must NOT fire `py.auth.missing_ownership_check`.
|
||||
#
|
||||
# Distilled from sentry `src/sentry/api/helpers/teams.py::get_teams`:
|
||||
#
|
||||
# def get_teams(request, organization, teams=None):
|
||||
# requested_teams = set(request.GET.getlist("team", []) ...)
|
||||
# verified_ids: set[int] = set()
|
||||
# ...
|
||||
# verified_ids.update(myteams) # <-- LOCAL set update
|
||||
# requested_teams.update(verified_ids)
|
||||
# teams_query = Team.objects.filter(
|
||||
# id__in=requested_teams, organization_id=organization.id
|
||||
# )
|
||||
#
|
||||
# Without the bare-callee constructor recogniser, `set()` / `dict()`
|
||||
# go untracked, the bound vars miss `non_sink_vars`, and the
|
||||
# `.update(..)` / `.add(..)` calls classify as `DbMutation` —
|
||||
# triggering the false missing-ownership-check finding. See
|
||||
# `AuthAnalysisRules::is_non_sink_constructor_callee` and the
|
||||
# `assignment` arm in `collect_unit_state`.
|
||||
from collections import Counter, defaultdict
|
||||
from collections.abc import Iterable
|
||||
|
||||
|
||||
class Organization:
|
||||
pass
|
||||
|
||||
|
||||
class Team:
|
||||
pass
|
||||
|
||||
|
||||
def get_teams(request, organization: Organization, teams: Iterable[int] | None = None):
|
||||
requested_teams = set(request.GET.getlist("team", []) if teams is None else teams)
|
||||
verified_ids: set[int] = set()
|
||||
seen_counter = Counter()
|
||||
cache = defaultdict(list)
|
||||
metadata = dict()
|
||||
pending = list()
|
||||
|
||||
if "myteams" in requested_teams:
|
||||
requested_teams.remove("myteams")
|
||||
myteams = request.access.team_ids_with_membership
|
||||
verified_ids.update(myteams)
|
||||
requested_teams.update(verified_ids)
|
||||
seen_counter.update(myteams)
|
||||
cache["my"].append(myteams)
|
||||
metadata["count"] = len(myteams)
|
||||
pending.append(myteams)
|
||||
|
||||
return Team.objects.filter(
|
||||
id__in=requested_teams, organization_id=organization.id
|
||||
)
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
# py-safe-relative-to: pathlib `relative_to(base)` raise-on-escape
|
||||
# pattern recognised as a receiver-side FILE_IO validator. Captures
|
||||
# the canonical Python path-containment idiom — the receiver is proven
|
||||
# contained in `base` if execution reaches the next statement.
|
||||
# Motivated by CVE-2024-23334 patched fixture.
|
||||
from pathlib import Path
|
||||
|
||||
from flask import request, send_file
|
||||
|
||||
|
||||
def download() -> None:
|
||||
base = Path("/var/www/static")
|
||||
rel_url = request.args.get("path")
|
||||
filepath = base.joinpath(rel_url).resolve()
|
||||
try:
|
||||
filepath.relative_to(base)
|
||||
except ValueError:
|
||||
return
|
||||
send_file(str(filepath))
|
||||
Loading…
Add table
Add a link
Reference in a new issue