feat: redact historic sql staged artifacts

This commit is contained in:
Andrey Avtomonov 2026-05-11 20:10:17 +02:00
parent 7b38418900
commit 06163452b4
2 changed files with 91 additions and 1 deletions

View file

@ -165,4 +165,75 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
},
]);
});
it('redacts configured SQL substrings in staged artifacts while analyzing original SQL', async () => {
const stagedDir = await tempDir();
const originalSql =
"select * from public.api_events where api_key = 'sk_live_abc123' and note = 'Secret_Token_9f'";
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'api-events-with-secret',
canonicalSql: originalSql,
stats: {
executions: 15,
distinctUsers: 2,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 12,
p95RuntimeMs: 25,
errorRate: 0,
rowsProduced: 15,
},
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
[
'api-events-with-secret',
{
tablesTouched: ['public.api_events'],
columnsByClause: {
select: [],
where: ['api_key', 'note'],
join: [],
groupBy: [],
},
},
],
])),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
redactionPatterns: ['sk_live_[A-Za-z0-9]+', '(?i)secret_token_[a-z0-9]+'],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[{ id: 'api-events-with-secret', sql: originalSql }],
'postgres',
);
const tableJson = await readFile(join(stagedDir, 'tables/public.api_events.json'), 'utf-8');
const patternsJson = await readFile(join(stagedDir, 'patterns-input.json'), 'utf-8');
expect(tableJson).not.toContain('sk_live_abc123');
expect(tableJson).not.toContain('Secret_Token_9f');
expect(patternsJson).not.toContain('sk_live_abc123');
expect(patternsJson).not.toContain('Secret_Token_9f');
expect(tableJson).toContain('[REDACTED]');
expect(patternsJson).toContain('[REDACTED]');
});
});

View file

@ -9,6 +9,11 @@ import {
bucketP95Runtime,
bucketRecency,
} from './buckets.js';
import {
compileHistoricSqlRedactionPatterns,
redactHistoricSqlText,
type HistoricSqlRedactionPattern,
} from './redaction.js';
import {
HISTORIC_SQL_SOURCE_KEY,
aggregatedTemplateSchema,
@ -97,6 +102,19 @@ function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUni
return false;
}
function redactTemplateSql(
template: AggregatedTemplate,
redactors: readonly HistoricSqlRedactionPattern[],
): AggregatedTemplate {
if (redactors.length === 0) {
return template;
}
return {
...template,
canonicalSql: redactHistoricSqlText(template.canonicalSql, redactors),
};
}
function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void {
const byColumn = acc.columnsByClause.get(clause) ?? new Map<string, number>();
byColumn.set(column, (byColumn.get(column) ?? 0) + executions);
@ -212,6 +230,7 @@ function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput
export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise<void> {
const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig);
const redactors = compileHistoricSqlRedactionPatterns(config.redactionPatterns);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000);
const probe = await input.reader.probe(input.queryClient);
@ -243,7 +262,7 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
continue;
}
parsedTemplates.push({
template,
template: redactTemplateSql(template, redactors),
tablesTouched,
columnsByClause: Object.fromEntries(
Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]),