aivre / app /services /preprocessing.py
Vedang Barhate
chore: copied from assist repo
cfc8e23
raw
history blame
9.4 kB
import logging
import re
from pathlib import Path
from app.models.document import Document
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""Process and chunk documents for indexing"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
logger.debug(
f"Initializing DocumentProcessor with chunk_size={chunk_size}, chunk_overlap={chunk_overlap}"
)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
logger.debug("DocumentProcessor initialized successfully")
def load_markdown_files(self, directory: str) -> list[Document]:
"""Load all markdown files from a directory"""
logger.info(f"Loading markdown files from directory: {directory}")
documents = []
markdown_path = Path(directory)
if not markdown_path.exists():
logger.error(f"Directory {directory} does not exist")
raise ValueError(f"Directory {directory} does not exist")
logger.debug(f"Searching for markdown files in {markdown_path}")
md_files = list(markdown_path.glob("**/*.md"))
if not md_files:
logger.error(f"No markdown files found in {directory}")
raise ValueError(f"No markdown files found in {directory}")
logger.info(f"Found {len(md_files)} markdown files to process")
successful_loads = 0
failed_loads = 0
for i, md_file in enumerate(md_files):
if i > 0 and i % 100 == 0:
logger.debug(f"Processing file {i}/{len(md_files)}: {md_file.name}")
try:
logger.debug(f"Reading file: {md_file}")
with open(md_file, encoding="utf-8") as f:
content = f.read()
logger.debug(
f"File {md_file.name} loaded, size: {len(content)} characters"
)
doc = Document(
content=content,
metadata={
"source": str(md_file),
"filename": md_file.name,
"file_size": len(content),
"file_path": str(md_file.relative_to(markdown_path)),
},
)
documents.append(doc)
successful_loads += 1
logger.debug(f"Document created for {md_file.name}")
except Exception as e:
logger.error(f"Error reading {md_file}: {e}")
failed_loads += 1
continue
logger.info(
f"Successfully loaded {len(documents)} documents (successful: {successful_loads}, failed: {failed_loads})"
)
return documents
def create_chunks(self, documents: list[Document]) -> list[Document]:
"""Create chunks from documents with overlap"""
logger.info(f"Creating chunks from {len(documents)} documents")
all_chunks = []
for i, doc in enumerate(documents):
if i > 0 and i % 50 == 0:
logger.debug(f"Chunking document {i}/{len(documents)}")
logger.debug(
f"Chunking document: {doc.metadata.get('filename', 'unknown')}"
)
chunks = self._chunk_document(doc)
logger.debug(
f"Generated {len(chunks)} chunks for document {doc.metadata.get('filename', 'unknown')}"
)
all_chunks.extend(chunks)
logger.info(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
return all_chunks
def _chunk_document(self, document: Document) -> list[Document]:
"""Chunk a single document with markdown awareness"""
logger.debug(
f"Starting to chunk document with {len(document.content)} characters"
)
text = document.content
chunks = []
logger.debug("Splitting document by headers")
sections = self._split_by_headers(text)
logger.debug(f"Split into {len(sections)} sections")
for i, section in enumerate(sections):
logger.debug(
f"Processing section {i + 1}/{len(sections)}, length: {len(section)}"
)
if len(section) <= self.chunk_size:
logger.debug(f"Section {i + 1} fits in single chunk")
chunks.append(section)
else:
logger.debug(f"Section {i + 1} too large, splitting into sub-chunks")
sub_chunks = self._split_large_section(section)
logger.debug(f"Section {i + 1} split into {len(sub_chunks)} sub-chunks")
chunks.extend(sub_chunks)
logger.debug(f"Total chunks created: {len(chunks)}")
chunk_documents = []
for i, chunk_text in enumerate(chunks):
if chunk_text.strip():
chunk_doc = Document(
content=chunk_text,
metadata={
**document.metadata,
"chunk_id": i,
"chunk_length": len(chunk_text),
"total_chunks": len(chunks),
},
)
chunk_documents.append(chunk_doc)
logger.debug(
f"Created chunk {i + 1}/{len(chunks)}, length: {len(chunk_text)}"
)
else:
logger.debug(f"Skipping empty chunk {i + 1}")
logger.debug(f"Generated {len(chunk_documents)} non-empty chunk documents")
return chunk_documents
def _split_by_headers(self, text: str) -> list[str]:
"""Split text by markdown headers while preserving structure"""
logger.debug(f"Splitting text by headers, input length: {len(text)}")
header_pattern = r"\n(?=#{1,6}\s+)"
sections = re.split(header_pattern, text)
logger.debug(f"Initial split resulted in {len(sections)} raw sections")
cleaned_sections = []
current_section = ""
for i, section in enumerate(sections):
if not section.strip():
logger.debug(f"Skipping empty section {i + 1}")
continue
section_length = len(section)
current_length = len(current_section)
combined_length = current_length + section_length
logger.debug(
f"Processing section {i + 1}: current={current_length}, section={section_length}, combined={combined_length}"
)
if current_section and combined_length > self.chunk_size:
logger.debug(
f"Section combination would exceed chunk_size ({self.chunk_size}), finalizing current section"
)
cleaned_sections.append(current_section.strip())
current_section = section
else:
current_section += "\n" + section if current_section else section
logger.debug(
f"Added section to current, new length: {len(current_section)}"
)
if current_section:
cleaned_sections.append(current_section.strip())
logger.debug("Added final section")
logger.debug(
f"Header splitting completed: {len(cleaned_sections)} final sections"
)
return cleaned_sections
def _split_large_section(self, text: str) -> list[str]:
"""Split large sections into smaller chunks with overlap"""
logger.debug(f"Splitting large section of {len(text)} characters")
chunks = []
words = text.split()
logger.debug(f"Section contains {len(words)} words")
current_chunk = []
current_size = 0
overlap_words = self.chunk_overlap // 10
logger.debug(f"Using overlap of {overlap_words} words")
for i, word in enumerate(words):
word_size = len(word) + 1
if current_size + word_size > self.chunk_size and current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append(chunk_text)
logger.debug(
f"Created chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words"
)
overlap_size = min(len(current_chunk), overlap_words)
if overlap_size > 0:
current_chunk = current_chunk[-overlap_size:]
current_size = sum(len(w) + 1 for w in current_chunk)
logger.debug(
f"Applied overlap: kept {overlap_size} words, new size: {current_size}"
)
else:
current_chunk = []
current_size = 0
logger.debug("No overlap applied")
current_chunk.append(word)
current_size += word_size
if i > 0 and i % 1000 == 0:
logger.debug(f"Processed {i}/{len(words)} words")
if current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append(chunk_text)
logger.debug(
f"Created final chunk {len(chunks)}: {len(chunk_text)} characters, {len(current_chunk)} words"
)
logger.debug(f"Large section splitting completed: {len(chunks)} chunks created")
return chunks