Chunking dynamic params (#536)

* Chunking params are dynamic

* Update tests
This commit is contained in:
cybermaggedon 2025-09-26 10:53:32 +01:00 committed by GitHub
parent 43cfcb18a0
commit 8929a680a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 584 additions and 478 deletions

View file

@ -13,6 +13,7 @@ from . producer_spec import ProducerSpec
from . subscriber_spec import SubscriberSpec
from . request_response_spec import RequestResponseSpec
from . llm_service import LlmService, LlmResult
from . chunking_service import ChunkingService
from . embeddings_service import EmbeddingsService
from . embeddings_client import EmbeddingsClientSpec
from . text_completion_client import TextCompletionClientSpec

View file

@ -0,0 +1,62 @@
"""
Base chunking service that provides parameter specification functionality
for chunk-size and chunk-overlap parameters
"""
import logging
from .flow_processor import FlowProcessor
from .parameter_spec import ParameterSpec
# Module logger
logger = logging.getLogger(__name__)
class ChunkingService(FlowProcessor):
"""Base service for chunking processors with parameter specification support"""
def __init__(self, **params):
# Call parent constructor
super(ChunkingService, self).__init__(**params)
# Register parameter specifications for chunk-size and chunk-overlap
self.register_specification(
ParameterSpec(name="chunk-size")
)
self.register_specification(
ParameterSpec(name="chunk-overlap")
)
logger.debug("ChunkingService initialized with parameter specifications")
async def chunk_document(self, msg, consumer, flow, default_chunk_size, default_chunk_overlap):
"""
Extract chunk parameters from flow and return effective values
Args:
msg: The message containing the document to chunk
consumer: The consumer spec
flow: The flow context
default_chunk_size: Default chunk size from processor config
default_chunk_overlap: Default chunk overlap from processor config
Returns:
tuple: (chunk_size, chunk_overlap) - effective values to use
"""
# Extract parameters from flow (flow-configurable parameters)
chunk_size = flow("chunk-size")
chunk_overlap = flow("chunk-overlap")
# Use provided values or fall back to defaults
effective_chunk_size = chunk_size if chunk_size is not None else default_chunk_size
effective_chunk_overlap = chunk_overlap if chunk_overlap is not None else default_chunk_overlap
logger.debug(f"Using chunk-size: {effective_chunk_size}")
logger.debug(f"Using chunk-overlap: {effective_chunk_overlap}")
return effective_chunk_size, effective_chunk_overlap
@staticmethod
def add_args(parser):
"""Add chunking service arguments to parser"""
FlowProcessor.add_args(parser)