import logging import re from pathlib import Path from app.models.document import Document logger = logging.getLogger(__name__) class DocumentProcessor: """Process and chunk documents for indexing""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): logger.debug( f"Initializing DocumentProcessor with chunk_size={chunk_size}, chunk_overlap={chunk_overlap}" ) self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap logger.debug("DocumentProcessor initialized successfully") def load_markdown_files(self, directory: str) -> list[Document]: """Load all markdown files from a directory""" logger.info(f"Loading markdown files from directory: {directory}") documents = [] markdown_path = Path(directory) if not markdown_path.exists(): logger.error(f"Directory {directory} does not exist") raise ValueError(f"Directory {directory} does not exist") logger.debug(f"Searching for markdown files in {markdown_path}") md_files = list(markdown_path.glob("**/*.md")) if not md_files: logger.error(f"No markdown files found in {directory}") raise ValueError(f"No markdown files found in {directory}") logger.info(f"Found {len(md_files)} markdown files to process") successful_loads = 0 failed_loads = 0 for i, md_file in enumerate(md_files): if i > 0 and i % 100 == 0: logger.debug(f"Processing file {i}/{len(md_files)}: {md_file.name}") try: logger.debug(f"Reading file: {md_file}") with open(md_file, encoding="utf-8") as f: content = f.read() logger.debug( f"File {md_file.name} loaded, size: {len(content)} characters" ) doc = Document( content=content, metadata={ "source": str(md_file), "filename": md_file.name, "file_size": len(content), "file_path": str(md_file.relative_to(markdown_path)), }, ) documents.append(doc) successful_loads += 1 logger.debug(f"Document created for {md_file.name}") except Exception as e: logger.error(f"Error reading {md_file}: {e}") failed_loads += 1 continue logger.info( f"Successfully loaded {len(documents)} documents (successful: {successful_loads}, failed: {failed_loads})" ) return documents def create_chunks(self, documents: list[Document]) -> list[Document]: """Create chunks from documents with overlap""" logger.info(f"Creating chunks from {len(documents)} documents") all_chunks = [] for i, doc in enumerate(documents): if i > 0 and i % 50 == 0: logger.debug(f"Chunking document {i}/{len(documents)}") logger.debug( f"Chunking document: {doc.metadata.get('filename', 'unknown')}" ) chunks = self._chunk_document(doc) logger.debug( f"Generated {len(chunks)} chunks for document {doc.metadata.get('filename', 'unknown')}" ) all_chunks.extend(chunks) logger.info(f"Created {len(all_chunks)} chunks from {len(documents)} documents") return all_chunks def _chunk_document(self, document: Document) -> list[Document]: """Chunk a single document with markdown awareness""" logger.debug( f"Starting to chunk document with {len(document.content)} characters" ) text = document.content chunks = [] logger.debug("Splitting document by headers") sections = self._split_by_headers(text) logger.debug(f"Split into {len(sections)} sections") for i, section in enumerate(sections): logger.debug( f"Processing section {i + 1}/{len(sections)}, length: {len(section)}" ) if len(section) <= self.chunk_size: logger.debug(f"Section {i + 1} fits in single chunk") chunks.append(section) else: logger.debug(f"Section {i + 1} too large, splitting into sub-chunks") sub_chunks = self._split_large_section(section) logger.debug(f"Section {i + 1} split into {len(sub_chunks)} sub-chunks") chunks.extend(sub_chunks) logger.debug(f"Total chunks created: {len(chunks)}") chunk_documents = [] for i, chunk_text in enumerate(chunks): if chunk_text.strip(): chunk_doc = Document( content=chunk_text, metadata={ **document.metadata, "chunk_id": i, "chunk_length": len(chunk_text), "total_chunks": len(chunks), }, ) chunk_documents.append(chunk_doc) logger.debug( f"Created chunk {i + 1}/{len(chunks)}, length: {len(chunk_text)}" ) else: logger.debug(f"Skipping empty chunk {i + 1}") logger.debug(f"Generated {len(chunk_documents)} non-empty chunk documents") return chunk_documents def _split_by_headers(self, text: str) -> list[str]: """Split text by markdown headers while preserving structure""" logger.debug(f"Splitting text by headers, input length: {len(text)}") header_pattern = r"\n(?=#{1,6}\s+)" sections = re.split(header_pattern, text) logger.debug(f"Initial split resulted in {len(sections)} raw sections") cleaned_sections = [] current_section = "" for i, section in enumerate(sections): if not section.strip(): logger.debug(f"Skipping empty section {i + 1}") continue section_length = len(section) current_length = len(current_section) combined_length = current_length + section_length logger.debug( f"Processing section {i + 1}: current={current_length}, section={section_length}, combined={combined_length}" ) if current_section and combined_length > self.chunk_size: logger.debug( f"Section combination would exceed chunk_size ({self.chunk_size}), finalizing current section" ) cleaned_sections.append(current_section.strip()) current_section = section else: current_section += "\n" + section if current_section else section logger.debug( f"Added section to current, new length: {len(current_section)}" ) if current_section: cleaned_sections.append(current_section.strip()) logger.debug("Added final section") logger.debug( f"Header splitting completed: {len(cleaned_sections)} final sections" ) return cleaned_sections def _split_large_section(self, text: str) -> list[str]: """Split large sections into smaller chunks with overlap""" logger.debug(f"Splitting large section of {len(text)} characters") chunks = [] words = text.split() logger.debug(f"Section contains {len(words)} words") current_chunk = [] current_size = 0 overlap_words = self.chunk_overlap // 10 logger.debug(f"Using overlap of {overlap_words} words") for i, word in enumerate(words): word_size = len(word) + 1 if current_size + word_size > self.chunk_size and current_chunk: chunk_text = " ".join(current_chunk) chunks.append(chunk_text) logger.debug( f"Created chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words" ) overlap_size = min(len(current_chunk), overlap_words) if overlap_size > 0: current_chunk = current_chunk[-overlap_size:] current_size = sum(len(w) + 1 for w in current_chunk) logger.debug( f"Applied overlap: kept {overlap_size} words, new size: {current_size}" ) else: current_chunk = [] current_size = 0 logger.debug("No overlap applied") current_chunk.append(word) current_size += word_size if i > 0 and i % 1000 == 0: logger.debug(f"Processed {i}/{len(words)} words") if current_chunk: chunk_text = " ".join(current_chunk) chunks.append(chunk_text) logger.debug( f"Created final chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words" ) logger.debug(f"Large section splitting completed: {len(chunks)} chunks created") return chunks