import base64 import re import sqlite3 from datetime import datetime, date, time from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Optional, Dict from dataclasses import dataclass import pytesseract from PIL import Image @dataclass class Settings: telegram_bot_token: str authorized_user_id: int gemini_api_key: str gemini_model: str pms_blueprint_path: Path database_path: Path slip_storage_dir: Path webapp_base_url: str webapp_static_dir: Path webapp_host: str webapp_port: int ocr_language: str tesseract_cmd: Optional[str] amount_tolerance: Decimal @dataclass class SlipExtractionResult: raw_text: str amount: Optional[Decimal] payment_date: Optional[date] payment_time: Optional[time] def save_base64_image(b64: str, dest: Path) -> None: """Decode base64-encoded image and save to file.""" try: data = base64.b64decode(b64) dest.write_bytes(data) except Exception as exc: raise RuntimeError(f"ไม่สามารถบันทึกรูปภาพได้: {exc}") def extract_slip_information(image_path: Path, settings: Settings) -> SlipExtractionResult: """Use Tesseract OCR to extract payment info from slip.""" try: img = Image.open(image_path) except Exception as exc: raise RuntimeError(f"เปิดไฟล์ภาพไม่ได้: {exc}") raw_text = pytesseract.image_to_string(img, lang=settings.ocr_language) # Extract amount amount: Optional[Decimal] = None amt_match = re.search(r'(\d{1,3}(?:[,\s]\d{3})*(?:\.\d{2})|\d+\.\d{2})', raw_text) if amt_match: amt_raw = amt_match.group(1).replace(',', '').replace(' ', '') try: amount = Decimal(amt_raw) except InvalidOperation: pass # Extract date payment_date: Optional[date] = None date_match = re.search(r'(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', raw_text) if date_match: for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"): try: payment_date = datetime.strptime(date_match.group(1), fmt).date() break except ValueError: continue # Extract time payment_time: Optional[time] = None time_match = re.search(r'(\d{1,2}:\d{2})(?:\s?(AM|PM))?', raw_text, re.IGNORECASE) if time_match: try: t_str = time_match.group(1) ampm = time_match.group(2) if ampm: payment_time = datetime.strptime(f"{t_str} {ampm}", "%I:%M %p").time() else: payment_time = datetime.strptime(t_str, "%H:%M").time() except ValueError: pass return SlipExtractionResult( raw_text=raw_text, amount=amount, payment_date=payment_date, payment_time=payment_time, ) def verify_booking_amount( settings: Settings, booking_id: str, extracted_amount: Optional[Decimal], ) -> Dict[str, str]: """Verify extracted amount against booking.""" with sqlite3.connect(settings.database_path) as conn: row = conn.execute("SELECT total_due FROM bookings WHERE id = ?", (booking_id,)).fetchone() if not row: return {"status": "booking_not_found"} expected = Decimal(str(row[0])) if extracted_amount is None: return {"status": "amount_missing"} if abs(expected - extracted_amount) <= settings.amount_tolerance: return {"status": "verified", "expected_amount": str(expected)} else: return {"status": "amount_mismatch", "expected_amount": str(expected)} def chunk_for_telegram(text: str, limit: int = 4096) -> list[str]: """Split text into Telegram-compatible chunks.""" if len(text) <= limit: return [text] return [text[i:i + limit] for i in range(0, len(text), limit)]