Spaces:
Running
Running
| import logging | |
| import re | |
| from pathlib import Path | |
| from app.models.document import Document | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| """Process and chunk documents for indexing""" | |
| def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): | |
| logger.debug( | |
| f"Initializing DocumentProcessor with chunk_size={chunk_size}, chunk_overlap={chunk_overlap}" | |
| ) | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| logger.debug("DocumentProcessor initialized successfully") | |
| def load_markdown_files(self, directory: str) -> list[Document]: | |
| """Load all markdown files from a directory""" | |
| logger.info(f"Loading markdown files from directory: {directory}") | |
| documents = [] | |
| markdown_path = Path(directory) | |
| if not markdown_path.exists(): | |
| logger.error(f"Directory {directory} does not exist") | |
| raise ValueError(f"Directory {directory} does not exist") | |
| logger.debug(f"Searching for markdown files in {markdown_path}") | |
| md_files = list(markdown_path.glob("**/*.md")) | |
| if not md_files: | |
| logger.error(f"No markdown files found in {directory}") | |
| raise ValueError(f"No markdown files found in {directory}") | |
| logger.info(f"Found {len(md_files)} markdown files to process") | |
| successful_loads = 0 | |
| failed_loads = 0 | |
| for i, md_file in enumerate(md_files): | |
| if i > 0 and i % 100 == 0: | |
| logger.debug(f"Processing file {i}/{len(md_files)}: {md_file.name}") | |
| try: | |
| logger.debug(f"Reading file: {md_file}") | |
| with open(md_file, encoding="utf-8") as f: | |
| content = f.read() | |
| logger.debug( | |
| f"File {md_file.name} loaded, size: {len(content)} characters" | |
| ) | |
| doc = Document( | |
| content=content, | |
| metadata={ | |
| "source": str(md_file), | |
| "filename": md_file.name, | |
| "file_size": len(content), | |
| "file_path": str(md_file.relative_to(markdown_path)), | |
| }, | |
| ) | |
| documents.append(doc) | |
| successful_loads += 1 | |
| logger.debug(f"Document created for {md_file.name}") | |
| except Exception as e: | |
| logger.error(f"Error reading {md_file}: {e}") | |
| failed_loads += 1 | |
| continue | |
| logger.info( | |
| f"Successfully loaded {len(documents)} documents (successful: {successful_loads}, failed: {failed_loads})" | |
| ) | |
| return documents | |
| def create_chunks(self, documents: list[Document]) -> list[Document]: | |
| """Create chunks from documents with overlap""" | |
| logger.info(f"Creating chunks from {len(documents)} documents") | |
| all_chunks = [] | |
| for i, doc in enumerate(documents): | |
| if i > 0 and i % 50 == 0: | |
| logger.debug(f"Chunking document {i}/{len(documents)}") | |
| logger.debug( | |
| f"Chunking document: {doc.metadata.get('filename', 'unknown')}" | |
| ) | |
| chunks = self._chunk_document(doc) | |
| logger.debug( | |
| f"Generated {len(chunks)} chunks for document {doc.metadata.get('filename', 'unknown')}" | |
| ) | |
| all_chunks.extend(chunks) | |
| logger.info(f"Created {len(all_chunks)} chunks from {len(documents)} documents") | |
| return all_chunks | |
| def _chunk_document(self, document: Document) -> list[Document]: | |
| """Chunk a single document with markdown awareness""" | |
| logger.debug( | |
| f"Starting to chunk document with {len(document.content)} characters" | |
| ) | |
| text = document.content | |
| chunks = [] | |
| logger.debug("Splitting document by headers") | |
| sections = self._split_by_headers(text) | |
| logger.debug(f"Split into {len(sections)} sections") | |
| for i, section in enumerate(sections): | |
| logger.debug( | |
| f"Processing section {i + 1}/{len(sections)}, length: {len(section)}" | |
| ) | |
| if len(section) <= self.chunk_size: | |
| logger.debug(f"Section {i + 1} fits in single chunk") | |
| chunks.append(section) | |
| else: | |
| logger.debug(f"Section {i + 1} too large, splitting into sub-chunks") | |
| sub_chunks = self._split_large_section(section) | |
| logger.debug(f"Section {i + 1} split into {len(sub_chunks)} sub-chunks") | |
| chunks.extend(sub_chunks) | |
| logger.debug(f"Total chunks created: {len(chunks)}") | |
| chunk_documents = [] | |
| for i, chunk_text in enumerate(chunks): | |
| if chunk_text.strip(): | |
| chunk_doc = Document( | |
| content=chunk_text, | |
| metadata={ | |
| **document.metadata, | |
| "chunk_id": i, | |
| "chunk_length": len(chunk_text), | |
| "total_chunks": len(chunks), | |
| }, | |
| ) | |
| chunk_documents.append(chunk_doc) | |
| logger.debug( | |
| f"Created chunk {i + 1}/{len(chunks)}, length: {len(chunk_text)}" | |
| ) | |
| else: | |
| logger.debug(f"Skipping empty chunk {i + 1}") | |
| logger.debug(f"Generated {len(chunk_documents)} non-empty chunk documents") | |
| return chunk_documents | |
| def _split_by_headers(self, text: str) -> list[str]: | |
| """Split text by markdown headers while preserving structure""" | |
| logger.debug(f"Splitting text by headers, input length: {len(text)}") | |
| header_pattern = r"\n(?=#{1,6}\s+)" | |
| sections = re.split(header_pattern, text) | |
| logger.debug(f"Initial split resulted in {len(sections)} raw sections") | |
| cleaned_sections = [] | |
| current_section = "" | |
| for i, section in enumerate(sections): | |
| if not section.strip(): | |
| logger.debug(f"Skipping empty section {i + 1}") | |
| continue | |
| section_length = len(section) | |
| current_length = len(current_section) | |
| combined_length = current_length + section_length | |
| logger.debug( | |
| f"Processing section {i + 1}: current={current_length}, section={section_length}, combined={combined_length}" | |
| ) | |
| if current_section and combined_length > self.chunk_size: | |
| logger.debug( | |
| f"Section combination would exceed chunk_size ({self.chunk_size}), finalizing current section" | |
| ) | |
| cleaned_sections.append(current_section.strip()) | |
| current_section = section | |
| else: | |
| current_section += "\n" + section if current_section else section | |
| logger.debug( | |
| f"Added section to current, new length: {len(current_section)}" | |
| ) | |
| if current_section: | |
| cleaned_sections.append(current_section.strip()) | |
| logger.debug("Added final section") | |
| logger.debug( | |
| f"Header splitting completed: {len(cleaned_sections)} final sections" | |
| ) | |
| return cleaned_sections | |
| def _split_large_section(self, text: str) -> list[str]: | |
| """Split large sections into smaller chunks with overlap""" | |
| logger.debug(f"Splitting large section of {len(text)} characters") | |
| chunks = [] | |
| words = text.split() | |
| logger.debug(f"Section contains {len(words)} words") | |
| current_chunk = [] | |
| current_size = 0 | |
| overlap_words = self.chunk_overlap // 10 | |
| logger.debug(f"Using overlap of {overlap_words} words") | |
| for i, word in enumerate(words): | |
| word_size = len(word) + 1 | |
| if current_size + word_size > self.chunk_size and current_chunk: | |
| chunk_text = " ".join(current_chunk) | |
| chunks.append(chunk_text) | |
| logger.debug( | |
| f"Created chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words" | |
| ) | |
| overlap_size = min(len(current_chunk), overlap_words) | |
| if overlap_size > 0: | |
| current_chunk = current_chunk[-overlap_size:] | |
| current_size = sum(len(w) + 1 for w in current_chunk) | |
| logger.debug( | |
| f"Applied overlap: kept {overlap_size} words, new size: {current_size}" | |
| ) | |
| else: | |
| current_chunk = [] | |
| current_size = 0 | |
| logger.debug("No overlap applied") | |
| current_chunk.append(word) | |
| current_size += word_size | |
| if i > 0 and i % 1000 == 0: | |
| logger.debug(f"Processed {i}/{len(words)} words") | |
| if current_chunk: | |
| chunk_text = " ".join(current_chunk) | |
| chunks.append(chunk_text) | |
| logger.debug( | |
| f"Created final chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words" | |
| ) | |
| logger.debug(f"Large section splitting completed: {len(chunks)} chunks created") | |
| return chunks | |