Spaces:
Sleeping
Sleeping
| # rag_pipeline.py | |
| import os, json, re, gzip, shutil | |
| from typing import List, Dict, Tuple | |
| from functools import lru_cache | |
| import faiss | |
| import numpy as np | |
| from providers import embed, generate, rerank, qa_extract | |
| # ========================= | |
| # Dosya yolları ve sabitler | |
| # ========================= | |
| VSTORE_DIR = "vectorstore" | |
| FAISS_FILE = "index.faiss" | |
| META_JSONL = "meta.jsonl" | |
| META_JSONL_GZ = "meta.jsonl.gz" | |
| # ========================= | |
| # Hız / kalite ayarları | |
| # ========================= | |
| TOP_K_DEFAULT = 4 # Kaç pasaj döndürelim? | |
| FETCH_K_DEFAULT = 16 # FAISS'ten kaç aday çekelim? | |
| HIGH_SCORE_THRES = 0.78 # erken karar eşiği (cosine) | |
| MARGIN_THRES = 0.06 # top1 - top2 farkı | |
| CTX_CHAR_LIMIT = 1400 # LLM'e verilecek maksimum bağlam karakteri | |
| QA_SCORE_THRES = 0.25 # ekstraktif QA güven eşiği | |
| QA_PER_PASSAGES = 4 # kaç hit üzerinde tek tek QA denensin | |
| # Basit boost ağırlıkları | |
| W_TITLE_BOOST = 0.25 | |
| W_LEXICAL = 0.15 | |
| # ========================= | |
| # Kural-tabanlı çıkarım yardımcıları (tarih/kuruluş) | |
| # ========================= | |
| DATE_RX = re.compile( | |
| r"\b(\d{1,2}\s+(Ocak|Şubat|Mart|Nisan|Mayıs|Haziran|Temmuz|Ağustos|Eylül|Ekim|Kasım|Aralık)\s+\d{3,4}" | |
| r"|\d{1,2}\.\d{1,2}\.\d{2,4}" | |
| r"|\d{4})\b", | |
| flags=re.IGNORECASE, | |
| ) | |
| DEATH_KEYS = ["öldü", "vefat", "hayatını kaybet", "ölümü", "ölüm"] | |
| FOUND_KEYS = ["kuruldu", "kuruluş", "kurulmuştur", "kuruluşu", "kuruluş tarihi"] | |
| def _split_sentences(txt: str) -> List[str]: | |
| parts = re.split(r"(?<=[.!?])\s+", (txt or "").strip()) | |
| return [p.strip() for p in parts if p.strip()] | |
| def _extract_fact_sentence(query: str, hits: List[Dict]) -> Tuple[str, str]: | |
| """ | |
| 'ne zaman öldü / ne zaman kuruldu' tipindeki sorularda | |
| tarih + anahtar kelime içeren ilk cümleyi yakala. | |
| Dönen: (cümle, kaynak_url) | ("", "") | |
| """ | |
| q = (query or "").lower() | |
| if "ne zaman" not in q: | |
| return "", "" | |
| if any(k in q for k in ["öldü", "vefat", "ölümü", "ölüm"]): | |
| keylist = DEATH_KEYS | |
| elif any(k in q for k in ["kuruldu", "kuruluş"]): | |
| keylist = FOUND_KEYS | |
| else: | |
| keylist = DEATH_KEYS + FOUND_KEYS | |
| for h in hits: | |
| for s in _split_sentences(h.get("text", "")): | |
| if any(k in s.lower() for k in keylist) and DATE_RX.search(s): | |
| return s, h.get("source", "") | |
| return "", "" | |
| # ========================= | |
| # İsim normalizasyonu (kısa span → tam özel ad) | |
| # ========================= | |
| NAME_RX = re.compile(r"\b([A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+(?:\s+[A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+){0,3})\b") | |
| def _expand_named_span(answer: str, hits: List[Dict]) -> str: | |
| """ | |
| QA'dan gelen 'Kemal' gibi kısa/eksik özel adı, | |
| bağlamdaki en uzun uygun özel adla genişletir. | |
| """ | |
| ans = (answer or "").strip() | |
| if not ans or len(ans.split()) > 2: | |
| return ans | |
| ans_low = ans.lower() | |
| preferred_aliases = [ | |
| "Mustafa Kemal Atatürk", | |
| "Sabiha Gökçen", | |
| "İsmet İnönü", | |
| ] | |
| # Tercihli alias varsa onu döndür | |
| for h in hits: | |
| text = h.get("text", "") | |
| for alias in preferred_aliases: | |
| if alias.lower().find(ans_low) != -1 and alias in text: | |
| return alias | |
| # Ans'ı içeren en uzun özel adı ara | |
| best = ans | |
| for h in hits: | |
| for sent in _split_sentences(h.get("text", "")): | |
| if ans_low not in sent.lower(): | |
| continue | |
| for m in NAME_RX.finditer(sent): | |
| cand = m.group(1).strip() | |
| if ans_low in cand.lower(): | |
| if len(cand) >= len(best) and any(ch.islower() for ch in cand): | |
| best = cand if len(cand.split()) >= len(best.split()) else best | |
| return best | |
| # ========================= | |
| # Vectorstore: LFS/Xet için otomatik indirme | |
| # ========================= | |
| def _open_meta(path: str): | |
| return gzip.open(path, "rt", encoding="utf-8") if path.endswith(".gz") else open(path, "r", encoding="utf-8") | |
| def _ensure_local_vectorstore(vstore_dir: str): | |
| """ | |
| vectorstore klasörü yoksa veya LFS/Xet pointer yüzünden gerçek içerik yoksa | |
| Space deposundan indir ve vstore_dir içine kopyala. | |
| """ | |
| os.makedirs(vstore_dir, exist_ok=True) | |
| faiss_path = os.path.join(vstore_dir, FAISS_FILE) | |
| meta_path = os.path.join(vstore_dir, META_JSONL) | |
| meta_gz = os.path.join(vstore_dir, META_JSONL_GZ) | |
| have_faiss = os.path.exists(faiss_path) | |
| have_meta = os.path.exists(meta_path) or os.path.exists(meta_gz) | |
| if have_faiss and have_meta: | |
| return # her şey hazır | |
| # huggingface_hub ile repo'dan yalnız vectorstore/* indir | |
| try: | |
| from huggingface_hub import snapshot_download | |
| except Exception as e: | |
| raise FileNotFoundError( | |
| f"'{faiss_path}' indirilemedi veya bulunamadı ve 'huggingface_hub' yok: {e}" | |
| ) | |
| repo_id = os.environ.get("HF_SPACE_REPO_ID") | |
| if not repo_id: | |
| owner = os.environ.get("SPACE_AUTHOR_NAME") | |
| space = os.environ.get("SPACE_REPO_NAME") | |
| if owner and space: | |
| repo_id = f"{owner}/{space}" | |
| else: | |
| raise FileNotFoundError( | |
| "HF_SPACE_REPO_ID tanımlı değil. Settings ▸ Variables bölümüne " | |
| "HF_SPACE_REPO_ID = <kullanıcı>/<space> olarak ekleyin." | |
| ) | |
| cache_dir = snapshot_download( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| allow_patterns=["vectorstore/*"], | |
| ignore_patterns=["*.ipynb", "*.png", "*.jpg", "*.jpeg", "*.gif"], | |
| local_files_only=False, | |
| ) | |
| src_faiss = os.path.join(cache_dir, "vectorstore", FAISS_FILE) | |
| src_meta = os.path.join(cache_dir, "vectorstore", META_JSONL) | |
| src_metagz = os.path.join(cache_dir, "vectorstore", META_JSONL_GZ) | |
| if not os.path.exists(src_faiss): | |
| raise FileNotFoundError(f"'{FAISS_FILE}' Space deposunda bulunamadı (repo: {repo_id}).") | |
| shutil.copy2(src_faiss, faiss_path) | |
| if os.path.exists(src_metagz): | |
| shutil.copy2(src_metagz, meta_gz) | |
| elif os.path.exists(src_meta): | |
| shutil.copy2(src_meta, meta_path) | |
| else: | |
| raise FileNotFoundError(f"'meta.jsonl(.gz)' Space deposunda bulunamadı (repo: {repo_id}).") | |
| def load_vectorstore(vstore_dir: str = VSTORE_DIR) -> Tuple[faiss.Index, List[Dict]]: | |
| """ | |
| HF Spaces'ta LFS/Xet pointer dosyaları yüzünden yerel kopya yoksa, | |
| gerekli dosyaları repo'dan indirir ve okur. | |
| """ | |
| _ensure_local_vectorstore(vstore_dir) | |
| index_path = os.path.join(vstore_dir, FAISS_FILE) | |
| meta_path_gz = os.path.join(vstore_dir, META_JSONL_GZ) | |
| meta_path = meta_path_gz if os.path.exists(meta_path_gz) else os.path.join(vstore_dir, META_JSONL) | |
| if not (os.path.exists(index_path) and os.path.exists(meta_path)): | |
| raise FileNotFoundError( | |
| "Vektör deposu bulunamadı. Lütfen 'vectorstore/index.faiss' ile " | |
| "'vectorstore/meta.jsonl' (veya meta.jsonl.gz) dosyalarının mevcut olduğundan emin olun." | |
| ) | |
| index = faiss.read_index(index_path) | |
| # IVF/HNSW için arama derinliği parametreleri | |
| # IVF/HNSW için arama derinliği parametreleri (varsa ayarla) | |
| try: | |
| # Ortam değişkeniyle özelleştirilebilir; yoksa 32 | |
| ef = int(os.environ.get("FAISS_EFSEARCH", "32")) | |
| if hasattr(index, "hnsw"): | |
| index.hnsw.efSearch = ef | |
| except Exception: | |
| pass | |
| # meta.jsonl(.gz) oku | |
| records: List[Dict] = [] | |
| with _open_meta(meta_path) as f: | |
| for line in f: | |
| if not line.strip(): | |
| continue | |
| obj = json.loads(line) | |
| records.append({ | |
| "text": obj.get("text", ""), | |
| "metadata": obj.get("metadata", {}) | |
| }) | |
| if not records: | |
| raise RuntimeError("meta.jsonl(.gz) boş görünüyor veya okunamadı.") | |
| return index, records | |
| # ========================= | |
| # Anahtar kelime çıkarımı + lexical puan | |
| # ========================= | |
| _CAP_WORD = re.compile(r"\b([A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+(?:\s+[A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+)*)\b") | |
| def _keywords_from_query(q: str) -> List[str]: | |
| q = (q or "").strip() | |
| caps = [m.group(1) for m in _CAP_WORD.finditer(q)] | |
| nums = re.findall(r"\b\d{3,4}\b", q) | |
| base = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", q) | |
| base = [w.lower() for w in base if len(w) > 2] | |
| # tekrarları at | |
| return list(dict.fromkeys(caps + nums + base)) | |
| def _lexical_overlap(q_tokens: List[str], text: str) -> float: | |
| toks = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", (text or "").lower()) | |
| if not toks: | |
| return 0.0 | |
| qset = set([t for t in q_tokens if len(t) > 2]) | |
| tset = set([t for t in toks if len(t) > 2]) | |
| inter = len(qset & tset) | |
| denom = len(qset) or 1 | |
| return inter / denom | |
| # ========================= | |
| # Retrieval + (koşullu) Rerank + title/lexical boost | |
| # ========================= | |
| def _cached_query_vec(e5_query: str) -> np.ndarray: | |
| """E5 sorgu embedding'ini cache'ler.""" | |
| v = embed([e5_query]).astype("float32") | |
| return v | |
| def search_chunks( | |
| query: str, | |
| index: faiss.Index, | |
| records: List[Dict], | |
| top_k: int = TOP_K_DEFAULT, | |
| fetch_k: int = FETCH_K_DEFAULT, | |
| ) -> List[Dict]: | |
| q = (query or "").strip() | |
| q_e5 = "query: " + q | |
| q_vec = _cached_query_vec(q_e5) | |
| faiss.normalize_L2(q_vec) | |
| scores, idxs = index.search(q_vec, fetch_k) | |
| pool: List[Dict] = [] | |
| for i, s in zip(idxs[0], scores[0]): | |
| if 0 <= i < len(records): | |
| md = records[i]["metadata"] or {} | |
| pool.append({ | |
| "text": records[i]["text"], | |
| "title": md.get("title", ""), | |
| "source": md.get("source", ""), | |
| "score_vec": float(s), | |
| }) | |
| if not pool: | |
| return [] | |
| # --- title & lexical boost --- | |
| q_tokens = _keywords_from_query(q) | |
| q_tokens_lower = [t.lower() for t in q_tokens] | |
| for p in pool: | |
| title = (p.get("title") or "").lower() | |
| # Büyük harfle başlayan query token'ı başlıkta geçiyorsa boost | |
| title_hit = any(tok.lower() in title for tok in q_tokens if tok and tok[0].isupper()) | |
| title_boost = W_TITLE_BOOST if title_hit else 0.0 | |
| lex = _lexical_overlap(q_tokens_lower, p["text"]) * W_LEXICAL | |
| p["score_boosted"] = p["score_vec"] + title_boost + lex | |
| pool_by_boost = sorted(pool, key=lambda x: x["score_boosted"], reverse=True) | |
| # --- erken karar: top1 güçlü ve fark yüksekse rerank yapma --- | |
| if len(pool_by_boost) >= 2: | |
| top1, top2 = pool_by_boost[0]["score_boosted"], pool_by_boost[1]["score_boosted"] | |
| else: | |
| top1, top2 = pool_by_boost[0]["score_boosted"], 0.0 | |
| do_rerank = not (top1 >= HIGH_SCORE_THRES and (top1 - top2) >= MARGIN_THRES) | |
| if do_rerank: | |
| try: | |
| rs = rerank(q, [p["text"] for p in pool_by_boost]) | |
| for p, r in zip(pool_by_boost, rs): | |
| p["score_rerank"] = float(r) | |
| pool_by_boost.sort( | |
| key=lambda x: (x.get("score_rerank", 0.0), x["score_boosted"]), | |
| reverse=True, | |
| ) | |
| except Exception: | |
| # Rerank başarısızsa boost'lu sırayı kullan | |
| pass | |
| return pool_by_boost[:top_k] | |
| # ========================= | |
| # LLM bağlamı ve kaynak listesi | |
| # ========================= | |
| def _format_sources(hits: List[Dict]) -> str: | |
| seen, urls = set(), [] | |
| for h in hits: | |
| u = (h.get("source") or "").strip() | |
| if u and u not in seen: | |
| urls.append(u) | |
| seen.add(u) | |
| return "\n".join(f"- {u}" for u in urls) if urls else "- (yok)" | |
| def _llm_context(hits: List[Dict], limit: int = CTX_CHAR_LIMIT) -> str: | |
| ctx, total = [], 0 | |
| for i, h in enumerate(hits, 1): | |
| block = f"[{i}] {h.get('title','')} — {h.get('source','')}\n{h.get('text','')}" | |
| if total + len(block) > limit: | |
| break | |
| ctx.append(block) | |
| total += len(block) | |
| return "\n\n---\n\n".join(ctx) | |
| # ========================= | |
| # Nihai cevap (kural → QA → LLM → güvenli özet) | |
| # ========================= | |
| def generate_answer( | |
| query: str, | |
| index: faiss.Index, | |
| records: List[Dict], | |
| top_k: int = TOP_K_DEFAULT, | |
| ) -> str: | |
| hits = search_chunks(query, index, records, top_k=top_k) | |
| if not hits: | |
| return "Bilgi bulunamadı." | |
| # 0) Kural-tabanlı hızlı çıkarım (tarih/kuruluş soruları) | |
| rule_sent, rule_src = _extract_fact_sentence(query, hits) | |
| if rule_sent: | |
| return f"{rule_sent}\n\nKaynaklar:\n- {rule_src if rule_src else _format_sources(hits)}" | |
| # 1) Pasaj bazlı ekstraktif QA | |
| best = {"answer": None, "score": 0.0, "src": None} | |
| for h in hits[:QA_PER_PASSAGES]: | |
| try: | |
| qa = qa_extract(query, h["text"]) | |
| except Exception: | |
| qa = None | |
| if qa and qa.get("answer"): | |
| score = float(qa.get("score", 0.0)) | |
| ans = (qa.get("answer") or "").strip() | |
| # Cevap tarih/özel ad içeriyorsa ekstra güven | |
| if re.search(r"\b(19\d{2}|20\d{2}|Atatürk|Gökçen|Kemal|Ankara|Fenerbahçe)\b", | |
| ans, flags=re.IGNORECASE): | |
| score += 0.30 | |
| # Çok kısa veya eksik isimse → bağlamdan tam özel ada genişlet | |
| if len(ans.split()) <= 2: | |
| ans = _expand_named_span(ans, hits) | |
| if score > best["score"]: | |
| best = {"answer": ans, "score": score, "src": h.get("source")} | |
| if best["answer"] and best["score"] >= QA_SCORE_THRES: | |
| final = best["answer"].strip() | |
| # Soru "kimdir/kim" ise doğal cümleye dök | |
| if any(k in (query or "").lower() for k in ["kimdir", "kim"]): | |
| if not final.endswith("."): | |
| final += "." | |
| final = f"{final} {query.rstrip('?')} sorusunun yanıtıdır." | |
| src_line = f"Kaynaklar:\n- {best['src']}" if best["src"] else "Kaynaklar:\n" + _format_sources(hits) | |
| return f"{final}\n\n{src_line}" | |
| # 2) QA düşük güven verdiyse → LLM (varsa) | |
| context = _llm_context(hits) | |
| prompt = ( | |
| "Aşağıdaki BAĞLAM Wikipedia parçalarından alınmıştır.\n" | |
| "Sadece bu bağlamdan yararlanarak soruya kısa, net ve doğru bir Türkçe cevap ver.\n" | |
| "Uydurma yapma, sadece metinlerde geçen bilgileri kullan.\n\n" | |
| f"Soru:\n{query}\n\nBağlam:\n{context}\n\nYanıtı 1-2 cümlede ver." | |
| ) | |
| llm_ans = (generate(prompt) or "").strip() | |
| # 3) LLM yapılandırılmamışsa → güvenli özet fallback | |
| if (not llm_ans) or ("yapılandırılmadı" in llm_ans.lower()): | |
| text = hits[0].get("text", "") | |
| first = re.split(r"(?<=[.!?])\s+", text.strip())[:2] | |
| llm_ans = " ".join(first).strip() or "Verilen bağlamda bu sorunun cevabı bulunmamaktadır." | |
| if "Kaynaklar:" not in llm_ans: | |
| llm_ans += "\n\nKaynaklar:\n" + _format_sources(hits) | |
| return llm_ans | |
| # ========================= | |
| # Hızlı test | |
| # ========================= | |
| if __name__ == "__main__": | |
| idx, recs = load_vectorstore(VSTORE_DIR) | |
| for q in [ | |
| "Atatürk ne zaman öldü?", | |
| "Türkiye'nin ilk cumhurbaşkanı kimdir?", | |
| "Fenerbahçe ne zaman kuruldu?", | |
| "Türkiye'nin başkenti neresidir?", | |
| "Türkiye'nin ilk kadın pilotu kimdir?", | |
| ]: | |
| print("Soru:", q) | |
| print(generate_answer(q, idx, recs, top_k=TOP_K_DEFAULT)) | |
| print("-" * 80) |