diff --git a/trustgraph-base/trustgraph/api/__init__.py b/trustgraph-base/trustgraph/api/__init__.py index 7a3fc86d..0ecb760e 100644 --- a/trustgraph-base/trustgraph/api/__init__.py +++ b/trustgraph-base/trustgraph/api/__init__.py @@ -34,7 +34,26 @@ from .types import ( ) # Exceptions -from .exceptions import ProtocolException, ApplicationException +from .exceptions import ( + ProtocolException, + TrustGraphException, + AgentError, + ConfigError, + DocumentRagError, + FlowError, + GatewayError, + GraphRagError, + LLMError, + LoadError, + LookupError, + NLPQueryError, + ObjectsQueryError, + RequestError, + StructuredQueryError, + UnexpectedError, + # Legacy alias + ApplicationException, +) __all__ = [ # Core API @@ -75,6 +94,21 @@ __all__ = [ # Exceptions "ProtocolException", - "ApplicationException", + "TrustGraphException", + "AgentError", + "ConfigError", + "DocumentRagError", + "FlowError", + "GatewayError", + "GraphRagError", + "LLMError", + "LoadError", + "LookupError", + "NLPQueryError", + "ObjectsQueryError", + "RequestError", + "StructuredQueryError", + "UnexpectedError", + "ApplicationException", # Legacy alias ] diff --git a/trustgraph-base/trustgraph/api/async_socket_client.py b/trustgraph-base/trustgraph/api/async_socket_client.py index b68a69d5..7c2a5aab 100644 --- a/trustgraph-base/trustgraph/api/async_socket_client.py +++ b/trustgraph-base/trustgraph/api/async_socket_client.py @@ -130,18 +130,26 @@ class AsyncSocketClient: content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False) ) - elif chunk_type == "final-answer": + elif chunk_type == "answer" or chunk_type == "final-answer": return AgentAnswer( content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False), end_of_dialog=resp.get("end_of_dialog", False) ) + elif chunk_type == "action": + # Agent action chunks - treat as thoughts for display purposes + return AgentThought( + content=resp.get("content", ""), + end_of_message=resp.get("end_of_message", False) + ) else: # RAG-style chunk (or generic chunk) + # Text-completion uses "response" field, RAG uses "chunk" field, Prompt uses "text" field + content = resp.get("response", resp.get("chunk", resp.get("text", ""))) return RAGChunk( - content=resp.get("chunk", ""), + content=content, end_of_stream=resp.get("end_of_stream", False), - error=resp.get("error") + error=None # Errors are always thrown, never stored ) async def aclose(self): diff --git a/trustgraph-base/trustgraph/api/exceptions.py b/trustgraph-base/trustgraph/api/exceptions.py index b3f732d4..311d2651 100644 --- a/trustgraph-base/trustgraph/api/exceptions.py +++ b/trustgraph-base/trustgraph/api/exceptions.py @@ -1,6 +1,134 @@ +""" +TrustGraph API Exceptions +Exception hierarchy for errors returned by TrustGraph services. +Each service error type maps to a specific exception class. +""" + +# Protocol-level exceptions (communication errors) class ProtocolException(Exception): + """Raised when WebSocket protocol errors occur""" pass -class ApplicationException(Exception): + +# Base class for all TrustGraph application errors +class TrustGraphException(Exception): + """Base class for all TrustGraph service errors""" + def __init__(self, message: str, error_type: str = None): + super().__init__(message) + self.message = message + self.error_type = error_type + + +# Service-specific exceptions +class AgentError(TrustGraphException): + """Agent service error""" pass + + +class ConfigError(TrustGraphException): + """Configuration service error""" + pass + + +class DocumentRagError(TrustGraphException): + """Document RAG retrieval error""" + pass + + +class FlowError(TrustGraphException): + """Flow management error""" + pass + + +class GatewayError(TrustGraphException): + """API Gateway error""" + pass + + +class GraphRagError(TrustGraphException): + """Graph RAG retrieval error""" + pass + + +class LLMError(TrustGraphException): + """LLM service error""" + pass + + +class LoadError(TrustGraphException): + """Data loading error""" + pass + + +class LookupError(TrustGraphException): + """Lookup/search error""" + pass + + +class NLPQueryError(TrustGraphException): + """NLP query service error""" + pass + + +class ObjectsQueryError(TrustGraphException): + """Objects query service error""" + pass + + +class RequestError(TrustGraphException): + """Request processing error""" + pass + + +class StructuredQueryError(TrustGraphException): + """Structured query service error""" + pass + + +class UnexpectedError(TrustGraphException): + """Unexpected/unknown error""" + pass + + +# Mapping from error type string to exception class +ERROR_TYPE_MAPPING = { + "agent-error": AgentError, + "config-error": ConfigError, + "document-rag-error": DocumentRagError, + "flow-error": FlowError, + "gateway-error": GatewayError, + "graph-rag-error": GraphRagError, + "llm-error": LLMError, + "load-error": LoadError, + "lookup-error": LookupError, + "nlp-query-error": NLPQueryError, + "objects-query-error": ObjectsQueryError, + "request-error": RequestError, + "structured-query-error": StructuredQueryError, + "unexpected-error": UnexpectedError, +} + + +def raise_from_error_dict(error_dict: dict) -> None: + """ + Raise appropriate exception from TrustGraph error dictionary. + + Args: + error_dict: Dictionary with 'type' and 'message' keys + + Raises: + Appropriate TrustGraphException subclass based on error type + """ + error_type = error_dict.get("type", "unexpected-error") + message = error_dict.get("message", "Unknown error") + + # Look up exception class, default to UnexpectedError + exception_class = ERROR_TYPE_MAPPING.get(error_type, UnexpectedError) + + # Raise the appropriate exception + raise exception_class(message, error_type) + + +# Legacy exception for backwards compatibility +ApplicationException = TrustGraphException diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index 7e5dfaaf..c0fd9cd9 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -6,7 +6,7 @@ from typing import Optional, Dict, Any, Iterator, Union, List from threading import Lock from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk -from . exceptions import ProtocolException, ApplicationException +from . exceptions import ProtocolException, raise_from_error_dict class SocketClient: @@ -126,7 +126,7 @@ class SocketClient: raise ProtocolException(f"Response ID mismatch") if "error" in response: - raise ApplicationException(response["error"]) + raise_from_error_dict(response["error"]) if "response" not in response: raise ProtocolException(f"Missing response in message") @@ -171,11 +171,15 @@ class SocketClient: continue # Ignore messages for other requests if "error" in response: - raise ApplicationException(response["error"]) + raise_from_error_dict(response["error"]) if "response" in response: resp = response["response"] + # Check for errors in response chunks + if "error" in resp: + raise_from_error_dict(resp["error"]) + # Parse different chunk types chunk = self._parse_chunk(resp) yield chunk @@ -198,18 +202,26 @@ class SocketClient: content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False) ) - elif chunk_type == "final-answer": + elif chunk_type == "answer" or chunk_type == "final-answer": return AgentAnswer( content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False), end_of_dialog=resp.get("end_of_dialog", False) ) + elif chunk_type == "action": + # Agent action chunks - treat as thoughts for display purposes + return AgentThought( + content=resp.get("content", ""), + end_of_message=resp.get("end_of_message", False) + ) else: # RAG-style chunk (or generic chunk) + # Text-completion uses "response" field, RAG uses "chunk" field, Prompt uses "text" field + content = resp.get("response", resp.get("chunk", resp.get("text", ""))) return RAGChunk( - content=resp.get("chunk", ""), + content=content, end_of_stream=resp.get("end_of_stream", False), - error=resp.get("error") + error=None # Errors are always thrown, never stored ) def close(self) -> None: diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index 016f2c7a..bba566a6 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -79,5 +79,6 @@ class AgentAnswer(StreamingChunk): @dataclasses.dataclass class RAGChunk(StreamingChunk): """RAG streaming chunk""" + chunk_type: str = "rag" end_of_stream: bool = False error: Optional[Dict[str, str]] = None diff --git a/trustgraph-cli/trustgraph/cli/invoke_agent.py b/trustgraph-cli/trustgraph/cli/invoke_agent.py index 6af12cd5..de70021b 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_agent.py +++ b/trustgraph-cli/trustgraph/cli/invoke_agent.py @@ -161,6 +161,11 @@ def question( # Output the chunk if current_outputter: current_outputter.output(content) + # Flush word buffer after each chunk to avoid delay + if current_outputter.word_buffer: + print(current_outputter.word_buffer, end="", flush=True) + current_outputter.column += len(current_outputter.word_buffer) + current_outputter.word_buffer = "" elif chunk_type == "final-answer": print(content, end="", flush=True) diff --git a/trustgraph-cli/trustgraph/cli/invoke_llm.py b/trustgraph-cli/trustgraph/cli/invoke_llm.py index 261993d9..a1611625 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_llm.py +++ b/trustgraph-cli/trustgraph/cli/invoke_llm.py @@ -28,7 +28,7 @@ def query(url, flow_id, system, prompt, streaming=True, token=None): if streaming: # Stream output to stdout without newline for chunk in response: - print(chunk.content, end="", flush=True) + print(chunk, end="", flush=True) # Add final newline after streaming print() else: diff --git a/trustgraph-cli/trustgraph/cli/invoke_prompt.py b/trustgraph-cli/trustgraph/cli/invoke_prompt.py index b1eba5aa..09cc9043 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_prompt.py +++ b/trustgraph-cli/trustgraph/cli/invoke_prompt.py @@ -31,36 +31,16 @@ def query(url, flow_id, template_id, variables, streaming=True, token=None): ) if streaming: - full_response = {"text": "", "object": ""} - - # Stream output + # Stream output (prompt yields strings directly) for chunk in response: - content = chunk.content - if content: - print(content, end="", flush=True) - full_response["text"] += content - - # Check if this is an object response (JSON) - if hasattr(chunk, 'object') and chunk.object: - full_response["object"] = chunk.object - - # Handle final output - if full_response["text"]: - # Add final newline after streaming text - print() - elif full_response["object"]: - # Print JSON object (pretty-printed) - print(json.dumps(json.loads(full_response["object"]), indent=4)) + if chunk: + print(chunk, end="", flush=True) + # Add final newline after streaming + print() else: - # Non-streaming: handle response - if isinstance(response, str): - print(response) - elif isinstance(response, dict): - if "text" in response: - print(response["text"]) - elif "object" in response: - print(json.dumps(json.loads(response["object"]), indent=4)) + # Non-streaming: print complete response + print(response) finally: # Clean up socket connection diff --git a/trustgraph-cli/trustgraph/cli/verify_system_status.py b/trustgraph-cli/trustgraph/cli/verify_system_status.py index e576b7e8..294a3738 100644 --- a/trustgraph-cli/trustgraph/cli/verify_system_status.py +++ b/trustgraph-cli/trustgraph/cli/verify_system_status.py @@ -171,14 +171,12 @@ def check_api_gateway(url: str, timeout: int, token: Optional[str] = None) -> Tu def check_processors(url: str, min_processors: int, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]: - """Check if processors are running via show-processor-state.""" + """Check if processors are running via metrics endpoint.""" try: - api = Api(url, token=token) - - # Use the metrics endpoint similar to show_processor_state - # This is a simplified check - we'll use requests to check the metrics - metrics_url = url.replace('http://', '').replace('https://', '').split('/')[0] - metrics_url = f"http://{metrics_url}:8088/api/metrics/query?query=processor_info" + # Construct metrics URL from API URL + if not url.endswith('/'): + url += '/' + metrics_url = f"{url}api/metrics/query?query=processor_info" resp = requests.get(metrics_url, timeout=timeout) if resp.status_code == 200: @@ -199,7 +197,7 @@ def check_processors(url: str, min_processors: int, timeout: int, token: Optiona def check_flow_classes(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]: """Check if flow classes are loaded.""" try: - api = Api(url, token=token) + api = Api(url, token=token, timeout=timeout) flow_api = api.flow() classes = flow_api.list_classes() @@ -216,7 +214,7 @@ def check_flow_classes(url: str, timeout: int, token: Optional[str] = None) -> T def check_flows(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]: """Check if flow manager is responding.""" try: - api = Api(url, token=token) + api = Api(url, token=token, timeout=timeout) flow_api = api.flow() flows = flow_api.list() @@ -231,12 +229,22 @@ def check_flows(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bo def check_prompts(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]: """Check if prompts are loaded.""" try: - api = Api(url, token=token) + api = Api(url, token=token, timeout=timeout) + config = api.config() - prompts = api.prompts().list() + # Import ConfigKey here to avoid top-level import issues + from trustgraph.api.types import ConfigKey + import json - if prompts and len(prompts) > 0: - return True, f"Found {len(prompts)} prompt(s)" + # Get the template-index which lists all prompts + values = config.get([ + ConfigKey(type="prompt", key="template-index") + ]) + + ix = json.loads(values[0].value) + + if ix and len(ix) > 0: + return True, f"Found {len(ix)} prompt(s)" else: return False, "No prompts found" @@ -247,7 +255,7 @@ def check_prompts(url: str, timeout: int, token: Optional[str] = None) -> Tuple[ def check_library(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]: """Check if library service is responding.""" try: - api = Api(url, token=token) + api = Api(url, token=token, timeout=timeout) library_api = api.library() # Try to get documents (with default user) @@ -365,10 +373,10 @@ def main(): print("=" * 60) print("TrustGraph System Status Verification") print("=" * 60) - print(f"Global timeout: {args.global_timeout}s") - print(f"Check timeout: {args.check_timeout}s") - print(f"Retry delay: {args.retry_delay}s") - print("=" * 60) +# print(f"Global timeout: {args.global_timeout}s") +# print(f"Check timeout: {args.check_timeout}s") +# print(f"Retry delay: {args.retry_delay}s") +# print("=" * 60) print() # Phase 1: Infrastructure