From 4930bc4d2b64aeaf05053310a2612173d8df8ee0 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Sat, 11 Apr 2026 20:22:35 +0100 Subject: [PATCH] =?UTF-8?q?trustgraph-base/trustgraph/base/text=5Fcompleti?= =?UTF-8?q?on=5Fclient.py=20-=20New=20TextCompletionResult=20dataclass:=20?= =?UTF-8?q?text:=20Optional[str],=20=20=20in=5Ftoken,=20out=5Ftoken,=20mod?= =?UTF-8?q?el.=20-=20text=5Fcompletion(system,=20prompt,=20timeout=3D600)?= =?UTF-8?q?=20=E2=80=94=20non-streaming=20=20=20only.=20Returns=20TextComp?= =?UTF-8?q?letionResult=20with=20text=20set=20and=20=20=20tokens/model=20p?= =?UTF-8?q?opulated=20from=20the=20response.=20-=20New=20text=5Fcompletion?= =?UTF-8?q?=5Fstream(system,=20prompt,=20handler,=20=20=20timeout=3D600)?= =?UTF-8?q?=20=E2=80=94=20streaming.=20Invokes=20handler(chunk)=20with=20e?= =?UTF-8?q?ach=20=20=20TextCompletionResponse=20as=20it=20arrives=20(inclu?= =?UTF-8?q?ding=20the=20final=20one,=20=20=20so=20the=20caller=20can=20see?= =?UTF-8?q?=20end=5Fof=5Fstream).=20Raises=20on=20=20=20resp.error.=20Retu?= =?UTF-8?q?rns=20a=20TextCompletionResult=20with=20text=3DNone=20and=20=20?= =?UTF-8?q?=20in=5Ftoken/out=5Ftoken/model=20pulled=20from=20the=20final?= =?UTF-8?q?=20message.=20-=20Old=20streaming=3D=20kwarg=20on=20text=5Fcomp?= =?UTF-8?q?letion()=20is=20gone.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit trustgraph-base/trustgraph/base/__init__.py - Export TextCompletionClient and TextCompletionResult alongside the spec. trustgraph-flow/trustgraph/prompt/template/service.py - Non-streaming path now uses await ...text_completion(system=..., prompt=...) and reads result.text. The llm() callback still returns a plain string to PromptManager, preserving its contract. - Streaming path collapsed onto text_completion_stream(..., handler=forward_chunks). Removed the hand-rolled client.request(...) + TextCompletionRequest plumbing. The is_final / resp.error behavior is preserved (forward_chunks still checks end_of_stream; errors are now raised centrally by the client). - Dropped the now-unused TextCompletionRequest, TextCompletionResponse import. Token counts are now available to any TextCompletionClient caller - result.in_token / result.out_token / result.model — in both modes. Nothing in the prompt service consumes them yet; that's the next step on this branch. --- trustgraph-base/trustgraph/base/__init__.py | 4 +- .../trustgraph/base/text_completion_client.py | 89 ++++++++++++------- .../trustgraph/prompt/template/service.py | 29 ++---- 3 files changed, 67 insertions(+), 55 deletions(-) diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index 24b6c1f0..9511c44d 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -18,7 +18,9 @@ from . librarian_client import LibrarianClient from . chunking_service import ChunkingService from . embeddings_service import EmbeddingsService from . embeddings_client import EmbeddingsClientSpec -from . text_completion_client import TextCompletionClientSpec +from . text_completion_client import ( + TextCompletionClientSpec, TextCompletionClient, TextCompletionResult, +) from . prompt_client import PromptClientSpec from . triples_store_service import TriplesStoreService from . graph_embeddings_store_service import GraphEmbeddingsStoreService diff --git a/trustgraph-base/trustgraph/base/text_completion_client.py b/trustgraph-base/trustgraph/base/text_completion_client.py index ae93e22e..0a1358dc 100644 --- a/trustgraph-base/trustgraph/base/text_completion_client.py +++ b/trustgraph-base/trustgraph/base/text_completion_client.py @@ -1,47 +1,71 @@ +from dataclasses import dataclass +from typing import Optional + from . request_response_spec import RequestResponse, RequestResponseSpec from .. schema import TextCompletionRequest, TextCompletionResponse +@dataclass +class TextCompletionResult: + text: Optional[str] + in_token: int = 0 + out_token: int = 0 + model: str = "" + class TextCompletionClient(RequestResponse): - async def text_completion(self, system, prompt, streaming=False, timeout=600): - # If not streaming, use original behavior - if not streaming: - resp = await self.request( - TextCompletionRequest( - system = system, prompt = prompt, streaming = False - ), - timeout=timeout - ) - if resp.error: - raise RuntimeError(resp.error.message) + async def text_completion(self, system, prompt, timeout=600): - return resp.response - - # For streaming: collect all chunks and return complete response - full_response = "" - - async def collect_chunks(resp): - nonlocal full_response - - if resp.error: - raise RuntimeError(resp.error.message) - - if resp.response: - full_response += resp.response - - # Return True when end_of_stream is reached - return getattr(resp, 'end_of_stream', False) - - await self.request( + resp = await self.request( TextCompletionRequest( - system = system, prompt = prompt, streaming = True + system = system, prompt = prompt, streaming = False ), - recipient=collect_chunks, timeout=timeout ) - return full_response + if resp.error: + raise RuntimeError(resp.error.message) + + return TextCompletionResult( + text = resp.response, + in_token = getattr(resp, "in_token", 0) or 0, + out_token = getattr(resp, "out_token", 0) or 0, + model = getattr(resp, "model", "") or "", + ) + + async def text_completion_stream( + self, system, prompt, handler, timeout=600, + ): + """ + Streaming text completion. `handler` is an async callable invoked + once per chunk with the chunk's TextCompletionResponse. Returns a + TextCompletionResult with text=None and token counts / model taken + from the end_of_stream message. + """ + + async def on_chunk(resp): + + if resp.error: + raise RuntimeError(resp.error.message) + + await handler(resp) + + return getattr(resp, "end_of_stream", False) + + final = await self.request( + TextCompletionRequest( + system = system, prompt = prompt, streaming = True + ), + recipient=on_chunk, + timeout=timeout, + ) + + return TextCompletionResult( + text = None, + in_token = getattr(final, "in_token", 0) or 0, + out_token = getattr(final, "out_token", 0) or 0, + model = getattr(final, "model", "") or "", + ) class TextCompletionClientSpec(RequestResponseSpec): def __init__( @@ -54,4 +78,3 @@ class TextCompletionClientSpec(RequestResponseSpec): response_schema = TextCompletionResponse, impl = TextCompletionClient, ) - diff --git a/trustgraph-flow/trustgraph/prompt/template/service.py b/trustgraph-flow/trustgraph/prompt/template/service.py index 97298e13..c9c0c87b 100755 --- a/trustgraph-flow/trustgraph/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/prompt/template/service.py @@ -11,7 +11,6 @@ import logging from ...schema import Definition, Relationship, Triple from ...schema import Topic from ...schema import PromptRequest, PromptResponse, Error -from ...schema import TextCompletionRequest, TextCompletionResponse from ...base import FlowProcessor from ...base import ProducerSpec, ConsumerSpec, TextCompletionClientSpec @@ -124,13 +123,7 @@ class Processor(FlowProcessor): logger.debug(f"System prompt: {system}") logger.debug(f"User prompt: {prompt}") - # Use the text completion client with recipient handler - client = flow("text-completion-request") - async def forward_chunks(resp): - if resp.error: - raise RuntimeError(resp.error.message) - is_final = getattr(resp, 'end_of_stream', False) # Always send a message if there's content OR if it's the final message @@ -144,15 +137,10 @@ class Processor(FlowProcessor): ) await flow("response").send(r, properties={"id": id}) - # Return True when end_of_stream - return is_final - - await client.request( - TextCompletionRequest( - system=system, prompt=prompt, streaming=True - ), - recipient=forward_chunks, - timeout=600 + await flow("text-completion-request").text_completion_stream( + system=system, prompt=prompt, + handler=forward_chunks, + timeout=600, ) # Return empty string since we already sent all chunks @@ -172,12 +160,11 @@ class Processor(FlowProcessor): logger.debug(f"System prompt: {system}") logger.debug(f"User prompt: {prompt}") - resp = await flow("text-completion-request").text_completion( - system = system, prompt = prompt, streaming = False, - ) - try: - return resp + result = await flow("text-completion-request").text_completion( + system = system, prompt = prompt, + ) + return result.text except Exception as e: logger.error(f"LLM Exception: {e}", exc_info=True) return None