mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
fix: render prompt template for variable extraction
This commit is contained in:
parent
87c8c5e2c8
commit
8b3dc02722
4 changed files with 48 additions and 14 deletions
|
|
@ -95,14 +95,38 @@ async def duplicate_workflow(
|
|||
|
||||
# 5. Copy template_context_variables and workflow_configurations from source definition
|
||||
source_tcv = source_def.template_context_variables
|
||||
source_wc = source_def.workflow_configurations
|
||||
source_wc = (
|
||||
copy.deepcopy(source_def.workflow_configurations)
|
||||
if source_def.workflow_configurations
|
||||
else None
|
||||
)
|
||||
|
||||
# 5a. Copy custom ambient noise file if present
|
||||
if source_wc:
|
||||
ambient_cfg = source_wc.get("ambient_noise_configuration")
|
||||
if ambient_cfg and ambient_cfg.get("storage_key"):
|
||||
old_key = ambient_cfg["storage_key"]
|
||||
filename = posixpath.basename(old_key)
|
||||
new_key = f"ambient-noise/{organization_id}/{new_workflow.id}/{filename}"
|
||||
try:
|
||||
if await _copy_storage_object(
|
||||
old_key, new_key, ambient_cfg.get("storage_backend", "")
|
||||
):
|
||||
ambient_cfg["storage_key"] = new_key
|
||||
else:
|
||||
logger.warning(
|
||||
f"Failed to copy ambient noise file {old_key}, keeping original reference"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error copying ambient noise file: {e}")
|
||||
|
||||
if source_tcv or source_wc:
|
||||
new_workflow = await db_client.update_workflow(
|
||||
workflow_id=new_workflow.id,
|
||||
name=None,
|
||||
workflow_definition=None,
|
||||
template_context_variables=copy.deepcopy(source_tcv),
|
||||
workflow_configurations=copy.deepcopy(source_wc),
|
||||
workflow_configurations=source_wc,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
|
|
@ -177,9 +201,9 @@ async def _duplicate_recordings(
|
|||
f"/{filename}"
|
||||
)
|
||||
|
||||
# Copy the file in storage (server-side copy)
|
||||
fs = _get_storage_for_recording(rec.storage_backend)
|
||||
copied = await fs.acopy_file(rec.storage_key, new_storage_key)
|
||||
copied = await _copy_storage_object(
|
||||
rec.storage_key, new_storage_key, rec.storage_backend
|
||||
)
|
||||
if not copied:
|
||||
logger.warning(
|
||||
f"Failed to copy recording file {rec.recording_id}, skipping"
|
||||
|
|
@ -229,9 +253,14 @@ def _replace_recording_ids(
|
|||
return json.loads(definition_str)
|
||||
|
||||
|
||||
def _get_storage_for_recording(storage_backend: str):
|
||||
"""Get the appropriate storage filesystem for a recording's backend."""
|
||||
async def _copy_storage_object(
|
||||
source_key: str, dest_key: str, storage_backend: str
|
||||
) -> bool:
|
||||
"""Copy a file in storage, resolving the correct backend. Returns True on success."""
|
||||
current_backend = StorageBackend.get_current_backend()
|
||||
if storage_backend == current_backend.value:
|
||||
return storage_fs
|
||||
return get_storage_for_backend(storage_backend)
|
||||
fs = (
|
||||
storage_fs
|
||||
if storage_backend == current_backend.value
|
||||
else get_storage_for_backend(storage_backend)
|
||||
)
|
||||
return await fs.acopy_file(source_key, dest_key)
|
||||
|
|
|
|||
|
|
@ -341,7 +341,12 @@ class PipecatEngine:
|
|||
parent_context = self._get_otel_context()
|
||||
|
||||
extraction_prompt = self._format_prompt(node.extraction_prompt)
|
||||
extraction_variables = node.extraction_variables
|
||||
extraction_variables = [
|
||||
v.model_copy(update={"prompt": self._format_prompt(v.prompt)})
|
||||
if v.prompt
|
||||
else v
|
||||
for v in node.extraction_variables
|
||||
]
|
||||
|
||||
async def _do_extraction():
|
||||
try:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue