Spaces:
Running
Running
File size: 4,258 Bytes
c690006 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
"""Text processing utilities for evidence handling."""
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from src.services.embeddings import EmbeddingService
from src.utils.models import Evidence
def truncate_at_sentence(text: str, max_chars: int = 300) -> str:
"""Truncate text at sentence boundary, preserving meaning.
Args:
text: The text to truncate
max_chars: Maximum characters (default 300)
Returns:
Text truncated at last complete sentence within limit
"""
if len(text) <= max_chars:
return text
# Find truncation point
truncated = text[:max_chars]
# Look for sentence endings: . ! ? followed by space or end
# We check for sep at the END of the truncated string
for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n"]:
last_sep = truncated.rfind(sep)
if last_sep > max_chars // 2: # Don't truncate too aggressively (less than half)
return text[: last_sep + 1].strip()
# Fallback: find last period (even if not followed by space, e.g. end of string)
last_period = truncated.rfind(".")
if last_period > max_chars // 2:
return text[: last_period + 1].strip()
# Last resort: truncate at word boundary
last_space = truncated.rfind(" ")
if last_space > 0:
return text[:last_space].strip() + "..."
return truncated + "..."
async def select_diverse_evidence(
evidence: list["Evidence"], n: int, query: str, embeddings: "EmbeddingService | None" = None
) -> list["Evidence"]:
"""Select n most diverse and relevant evidence items.
Uses Maximal Marginal Relevance (MMR) when embeddings available,
falls back to relevance_score sorting otherwise.
Args:
evidence: All available evidence
n: Number of items to select
query: Original query for relevance scoring
embeddings: Optional EmbeddingService for semantic diversity
Returns:
Selected evidence items, diverse and relevant
"""
if not evidence:
return []
if n >= len(evidence):
return evidence
# Fallback: sort by relevance score if no embeddings
if embeddings is None:
return sorted(
evidence,
key=lambda e: e.relevance, # Use .relevance (from Pydantic model)
reverse=True,
)[:n]
# MMR: Maximal Marginal Relevance for diverse selection
# Score = λ * relevance - (1-λ) * max_similarity_to_selected
lambda_param = 0.7 # Balance relevance vs diversity
# Get query embedding
query_emb = await embeddings.embed(query)
# Get all evidence embeddings
evidence_embs = await embeddings.embed_batch([e.content for e in evidence])
# Cosine similarity helper
def cosine(a: list[float], b: list[float]) -> float:
arr_a, arr_b = np.array(a), np.array(b)
denominator = float(np.linalg.norm(arr_a) * np.linalg.norm(arr_b))
if denominator == 0:
return 0.0
return float(np.dot(arr_a, arr_b) / denominator)
# Compute relevance scores (cosine similarity to query)
# Note: We use semantic relevance to query, not the keyword search 'relevance' score
relevance_scores = [cosine(query_emb, emb) for emb in evidence_embs]
# Greedy MMR selection
selected_indices: list[int] = []
remaining = set(range(len(evidence)))
for _ in range(n):
best_score = float("-inf")
best_idx = -1
for idx in remaining:
# Relevance component
relevance = relevance_scores[idx]
# Diversity component: max similarity to already selected
if selected_indices:
max_sim = max(
cosine(evidence_embs[idx], evidence_embs[sel]) for sel in selected_indices
)
else:
max_sim = 0
# MMR score
mmr_score = lambda_param * relevance - (1 - lambda_param) * max_sim
if mmr_score > best_score:
best_score = mmr_score
best_idx = idx
if best_idx >= 0:
selected_indices.append(best_idx)
remaining.remove(best_idx)
return [evidence[i] for i in selected_indices]
|