Use Predicate and FileSystem in final Effect cleanup

This commit is contained in:
elpresidank 2026-06-04 08:33:31 -05:00
parent c40bd406f8
commit 976e7ecfc5
6 changed files with 164 additions and 32 deletions

View file

@ -4,6 +4,7 @@
* Python reference: trustgraph-base/trustgraph/exceptions.py
*/
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
import type { TgError } from "./schema/index.ts";
@ -315,8 +316,8 @@ export function flowParameterDecodeError(
}
export function errorMessage(error: unknown): string {
if (typeof error === "object" && error !== null && "message" in error) {
const message = (error as { message?: unknown }).message;
if (Predicate.isObject(error) && Predicate.hasProperty(error, "message")) {
const message = error.message;
if (typeof message === "string") return message;
}
return String(error);

View file

@ -20,6 +20,7 @@ import type { BackendConsumer, PubSubBackend } from "../backend/types.js";
import { Flow, type FlowDefinition } from "./flow.js";
import { topics } from "../schema/topics.js";
import {
errorMessage,
pubSubError,
type FlowRuntimeError,
type ProcessorLifecycleError,
@ -282,7 +283,7 @@ export function runFlowProcessorDefinitionScoped<
return Effect.void;
}
return Effect.logError(`[${options.id}] Config consumer error`, {
error: error instanceof Error ? error.message : String(error),
error: errorMessage(error),
}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);

View file

@ -113,15 +113,12 @@ function withDefault(value: string | undefined, fallback: string): string {
}
function toErrorMessage(value: unknown, fallback: string): string {
if (value instanceof Error) {
return value.message;
}
if (typeof value === "string" && value.length > 0) {
if (Predicate.isString(value) && value.length > 0) {
return value;
}
if (value !== null && typeof value === "object" && "message" in value) {
const message = (value as { message?: unknown }).message;
if (typeof message === "string" && message.length > 0) {
if (Predicate.isObject(value) && Predicate.hasProperty(value, "message")) {
const message = value.message;
if (Predicate.isString(message) && message.length > 0) {
return message;
}
}

View file

@ -1,3 +1,10 @@
import * as BunFileSystem from "@effect/platform-bun/BunFileSystem";
import { Effect, ManagedRuntime } from "effect";
import * as FileSystem from "effect/FileSystem";
import type { PlatformError } from "effect/PlatformError";
const fileSystemRuntime = ManagedRuntime.make(BunFileSystem.layer);
export function joinPath(...segments: string[]): string {
const joined = segments
.filter((segment) => segment.length > 0)
@ -15,26 +22,52 @@ export function dirnamePath(path: string): string {
return normalized.slice(0, index);
}
export const ensureDirectoryEffect = (path: string): Effect.Effect<void, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) =>
fs.makeDirectory(path, { recursive: true })
);
export function ensureDirectory(path: string): Promise<void> {
return Bun.$`mkdir -p ${path}`.quiet().then(() => undefined);
return fileSystemRuntime.runPromise(ensureDirectoryEffect(path));
}
export const readTextFileEffect = (path: string): Effect.Effect<string, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) => fs.readFileString(path));
export function readTextFile(path: string): Promise<string> {
return Bun.file(path).text();
return fileSystemRuntime.runPromise(readTextFileEffect(path));
}
export const readBinaryFileEffect = (path: string): Effect.Effect<Uint8Array, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) => fs.readFile(path));
export function readBinaryFile(path: string): Promise<Uint8Array> {
return Bun.file(path).arrayBuffer().then((buffer) => new Uint8Array(buffer));
return fileSystemRuntime.runPromise(readBinaryFileEffect(path));
}
export const writeTextFileEffect = (
path: string,
data: string,
): Effect.Effect<void, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) => fs.writeFileString(path, data));
export function writeTextFile(path: string, data: string): Promise<void> {
return Bun.write(path, data).then(() => undefined);
return fileSystemRuntime.runPromise(writeTextFileEffect(path, data));
}
export const writeBinaryFileEffect = (
path: string,
data: Uint8Array,
): Effect.Effect<void, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) => fs.writeFile(path, data));
export function writeBinaryFile(path: string, data: Uint8Array): Promise<void> {
return Bun.write(path, data).then(() => undefined);
return fileSystemRuntime.runPromise(writeBinaryFileEffect(path, data));
}
export const removePathEffect = (path: string): Effect.Effect<void, PlatformError, FileSystem.FileSystem> =>
Effect.flatMap(FileSystem.FileSystem, (fs) => fs.remove(path));
export function removePath(path: string): Promise<void> {
return Bun.file(path).delete();
return fileSystemRuntime.runPromise(removePathEffect(path));
}

View file

@ -4,6 +4,7 @@ import * as BrowserKeyValueStore from "@effect/platform-browser/BrowserKeyValueS
import { BaseApi, type ConnectionState, type DocumentMetadata, type ExplainEvent, type StreamingMetadata, type Term, type Triple } from "@trustgraph/client";
import { Cause, Clock, Context, Effect, Layer, Match, Metric, Option, Random, Schema as S } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as Predicate from "effect/Predicate";
import * as Otlp from "effect/unstable/observability/Otlp";
import * as AsyncResult from "effect/unstable/reactivity/AsyncResult";
import * as Atom from "effect/unstable/reactivity/Atom";
@ -44,9 +45,9 @@ const isWorkbenchPromiseError = S.is(WorkbenchPromiseError);
function errorMessage(error: unknown): string {
if (isWorkbenchPromiseError(error)) return error.message;
if (typeof error === "object" && error !== null && "message" in error) {
const message = (error as { message?: unknown }).message;
if (typeof message === "string") return message;
if (Predicate.isObject(error) && Predicate.hasProperty(error, "message")) {
const message = error.message;
if (Predicate.isString(message)) return message;
}
return String(error);
}