trustgraph/trustgraph-flow/trustgraph/decoding/mistral_ocr/processor.py
cybermaggedon 89be656990
Release/v1.2 (#457)
* Bump setup.py versions for 1.1

* PoC MCP server (#419)

* Very initial MCP server PoC for TrustGraph

* Put service on port 8000

* Add MCP container and packages to buildout

* Update docs for API/CLI changes in 1.0 (#421)

* Update some API basics for the 0.23/1.0 API change

* Add MCP container push (#425)

* Add command args to the MCP server (#426)

* Host and port parameters

* Added websocket arg

* More docs

* MCP client support (#427)

- MCP client service
- Tool request/response schema
- API gateway support for mcp-tool
- Message translation for tool request & response
- Make mcp-tool using configuration service for information
  about where the MCP services are.

* Feature/react call mcp (#428)

Key Features

  - MCP Tool Integration: Added core MCP tool support with ToolClientSpec and ToolClient classes
  - API Enhancement: New mcp_tool method for flow-specific tool invocation
  - CLI Tooling: New tg-invoke-mcp-tool command for testing MCP integration
  - React Agent Enhancement: Fixed and improved multi-tool invocation capabilities
  - Tool Management: Enhanced CLI for tool configuration and management

Changes

  - Added MCP tool invocation to API with flow-specific integration
  - Implemented ToolClientSpec and ToolClient for tool call handling
  - Updated agent-manager-react to invoke MCP tools with configurable types
  - Enhanced CLI with new commands and improved help text
  - Added comprehensive documentation for new CLI commands
  - Improved tool configuration management

Testing

  - Added tg-invoke-mcp-tool CLI command for isolated MCP integration testing
  - Enhanced agent capability to invoke multiple tools simultaneously

* Test suite executed from CI pipeline (#433)

* Test strategy & test cases

* Unit tests

* Integration tests

* Extending test coverage (#434)

* Contract tests

* Testing embeedings

* Agent unit tests

* Knowledge pipeline tests

* Turn on contract tests

* Increase storage test coverage (#435)

* Fixing storage and adding tests

* PR pipeline only runs quick tests

* Empty configuration is returned as empty list, previously was not in response (#436)

* Update config util to take files as well as command-line text (#437)

* Updated CLI invocation and config model for tools and mcp (#438)

* Updated CLI invocation and config model for tools and mcp

* CLI anomalies

* Tweaked the MCP tool implementation for new model

* Update agent implementation to match the new model

* Fix agent tools, now all tested

* Fixed integration tests

* Fix MCP delete tool params

* Update Python deps to 1.2

* Update to enable knowledge extraction using the agent framework (#439)

* Implement KG extraction agent (kg-extract-agent)

* Using ReAct framework (agent-manager-react)
 
* ReAct manager had an issue when emitting JSON, which conflicts which ReAct manager's own JSON messages, so refactored ReAct manager to use traditional ReAct messages, non-JSON structure.
 
* Minor refactor to take the prompt template client out of prompt-template so it can be more readily used by other modules. kg-extract-agent uses this framework.

* Migrate from setup.py to pyproject.toml (#440)

* Converted setup.py to pyproject.toml

* Modern package infrastructure as recommended by py docs

* Install missing build deps (#441)

* Install missing build deps (#442)

* Implement logging strategy (#444)

* Logging strategy and convert all prints() to logging invocations

* Fix/startup failure (#445)

* Fix loggin startup problems

* Fix logging startup problems (#446)

* Fix logging startup problems (#447)

* Fixed Mistral OCR to use current API (#448)

* Fixed Mistral OCR to use current API

* Added PDF decoder tests

* Fix Mistral OCR ident to be standard pdf-decoder (#450)

* Fix Mistral OCR ident to be standard pdf-decoder

* Correct test

* Schema structure refactor (#451)

* Write schema refactor spec

* Implemented schema refactor spec

* Structure data mvp (#452)

* Structured data tech spec

* Architecture principles

* New schemas

* Updated schemas and specs

* Object extractor

* Add .coveragerc

* New tests

* Cassandra object storage

* Trying to object extraction working, issues exist

* Validate librarian collection (#453)

* Fix token chunker, broken API invocation (#454)

* Fix token chunker, broken API invocation (#455)

* Knowledge load utility CLI (#456)

* Knowledge loader

* More tests
2025-08-18 20:56:09 +01:00

194 lines
4.9 KiB
Python
Executable file

"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
from pypdf import PdfWriter, PdfReader
from io import BytesIO
import base64
import uuid
import os
from mistralai import Mistral
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk
from mistralai.models import OCRResponse
from ... schema import Document, TextDocument, Metadata
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec
import logging
logger = logging.getLogger(__name__)
default_ident = "pdf-decoder"
default_api_key = os.getenv("MISTRAL_TOKEN")
pages_per_chunk = 5
def chunks(lst, n):
"Yield successive n-sized chunks from lst."
for i in range(0, len(lst), n):
yield lst[i:i + n]
def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str:
"""
Replace image placeholders in markdown with base64-encoded images.
Args:
markdown_str: Markdown text containing image placeholders
images_dict: Dictionary mapping image IDs to base64 strings
Returns:
Markdown text with images replaced by base64 data
"""
for img_name, base64_str in images_dict.items():
markdown_str = markdown_str.replace(
f"![{img_name}]({img_name})", f"![{img_name}]({base64_str})"
)
return markdown_str
def get_combined_markdown(ocr_response: OCRResponse) -> str:
"""
Combine OCR text and images into a single markdown document.
Args:
ocr_response: Response from OCR processing containing text and images
Returns:
Combined markdown string with embedded images
"""
markdowns: list[str] = []
# Extract images from page
for page in ocr_response.pages:
image_data = {}
for img in page.images:
image_data[img.id] = img.image_base64
# Replace image placeholders with actual images
markdowns.append(replace_images_in_markdown(page.markdown, image_data))
return "\n\n".join(markdowns)
class Processor(FlowProcessor):
def __init__(self, **params):
id = params.get("id", default_ident)
api_key = params.get("api_key", default_api_key)
super(Processor, self).__init__(
**params | {
"id": id,
}
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = Document,
handler = self.on_message,
)
)
self.register_specification(
ProducerSpec(
name = "output",
schema = TextDocument,
)
)
if api_key is None:
raise RuntimeError("Mistral API key not specified")
self.mistral = Mistral(api_key=api_key)
# Used with Mistral doc upload
self.unique_id = str(uuid.uuid4())
logger.info("Mistral OCR processor initialized")
def ocr(self, blob):
logger.debug("Parse PDF...")
pdfbuf = BytesIO(blob)
pdf = PdfReader(pdfbuf)
for chunk in chunks(pdf.pages, pages_per_chunk):
logger.debug("Get next pages...")
part = PdfWriter()
for page in chunk:
part.add_page(page)
buf = BytesIO()
part.write_stream(buf)
logger.debug("Upload chunk...")
uploaded_file = self.mistral.files.upload(
file={
"file_name": self.unique_id,
"content": buf.getvalue(),
},
purpose="ocr",
)
signed_url = self.mistral.files.get_signed_url(
file_id=uploaded_file.id, expiry=1
)
logger.debug("OCR...")
processed = self.mistral.ocr.process(
model="mistral-ocr-latest",
include_image_base64=True,
document={
"type": "document_url",
"document_url": signed_url.url,
}
)
logger.debug("Extract markdown...")
markdown = get_combined_markdown(processed)
logger.info("OCR complete.")
return markdown
async def on_message(self, msg, consumer, flow):
logger.debug("PDF message received")
v = msg.value()
logger.info(f"Decoding {v.metadata.id}...")
markdown = self.ocr(base64.b64decode(v.data))
r = TextDocument(
metadata=v.metadata,
text=markdown.encode("utf-8"),
)
await flow("output").send(r)
logger.info("Done.")
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
parser.add_argument(
'-k', '--api-key',
default=default_api_key,
help=f'Mistral API Key'
)
def run():
Processor.launch(default_ident, __doc__)