Spaces:
No application file
No application file
| import base64 | |
| import re | |
| import sqlite3 | |
| from datetime import datetime, date, time | |
| from decimal import Decimal, InvalidOperation | |
| from pathlib import Path | |
| from typing import Optional, Dict | |
| from dataclasses import dataclass | |
| import pytesseract | |
| from PIL import Image | |
| class Settings: | |
| telegram_bot_token: str | |
| authorized_user_id: int | |
| gemini_api_key: str | |
| gemini_model: str | |
| pms_blueprint_path: Path | |
| database_path: Path | |
| slip_storage_dir: Path | |
| webapp_base_url: str | |
| webapp_static_dir: Path | |
| webapp_host: str | |
| webapp_port: int | |
| ocr_language: str | |
| tesseract_cmd: Optional[str] | |
| amount_tolerance: Decimal | |
| class SlipExtractionResult: | |
| raw_text: str | |
| amount: Optional[Decimal] | |
| payment_date: Optional[date] | |
| payment_time: Optional[time] | |
| def save_base64_image(b64: str, dest: Path) -> None: | |
| """Decode base64-encoded image and save to file.""" | |
| try: | |
| data = base64.b64decode(b64) | |
| dest.write_bytes(data) | |
| except Exception as exc: | |
| raise RuntimeError(f"ไม่สามารถบันทึกรูปภาพได้: {exc}") | |
| def extract_slip_information(image_path: Path, settings: Settings) -> SlipExtractionResult: | |
| """Use Tesseract OCR to extract payment info from slip.""" | |
| try: | |
| img = Image.open(image_path) | |
| except Exception as exc: | |
| raise RuntimeError(f"เปิดไฟล์ภาพไม่ได้: {exc}") | |
| raw_text = pytesseract.image_to_string(img, lang=settings.ocr_language) | |
| # Extract amount | |
| amount: Optional[Decimal] = None | |
| amt_match = re.search(r'(\d{1,3}(?:[,\s]\d{3})*(?:\.\d{2})|\d+\.\d{2})', raw_text) | |
| if amt_match: | |
| amt_raw = amt_match.group(1).replace(',', '').replace(' ', '') | |
| try: | |
| amount = Decimal(amt_raw) | |
| except InvalidOperation: | |
| pass | |
| # Extract date | |
| payment_date: Optional[date] = None | |
| date_match = re.search(r'(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', raw_text) | |
| if date_match: | |
| for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"): | |
| try: | |
| payment_date = datetime.strptime(date_match.group(1), fmt).date() | |
| break | |
| except ValueError: | |
| continue | |
| # Extract time | |
| payment_time: Optional[time] = None | |
| time_match = re.search(r'(\d{1,2}:\d{2})(?:\s?(AM|PM))?', raw_text, re.IGNORECASE) | |
| if time_match: | |
| try: | |
| t_str = time_match.group(1) | |
| ampm = time_match.group(2) | |
| if ampm: | |
| payment_time = datetime.strptime(f"{t_str} {ampm}", "%I:%M %p").time() | |
| else: | |
| payment_time = datetime.strptime(t_str, "%H:%M").time() | |
| except ValueError: | |
| pass | |
| return SlipExtractionResult( | |
| raw_text=raw_text, | |
| amount=amount, | |
| payment_date=payment_date, | |
| payment_time=payment_time, | |
| ) | |
| def verify_booking_amount( | |
| settings: Settings, | |
| booking_id: str, | |
| extracted_amount: Optional[Decimal], | |
| ) -> Dict[str, str]: | |
| """Verify extracted amount against booking.""" | |
| with sqlite3.connect(settings.database_path) as conn: | |
| row = conn.execute("SELECT total_due FROM bookings WHERE id = ?", (booking_id,)).fetchone() | |
| if not row: | |
| return {"status": "booking_not_found"} | |
| expected = Decimal(str(row[0])) | |
| if extracted_amount is None: | |
| return {"status": "amount_missing"} | |
| if abs(expected - extracted_amount) <= settings.amount_tolerance: | |
| return {"status": "verified", "expected_amount": str(expected)} | |
| else: | |
| return {"status": "amount_mismatch", "expected_amount": str(expected)} | |
| def chunk_for_telegram(text: str, limit: int = 4096) -> list[str]: | |
| """Split text into Telegram-compatible chunks.""" | |
| if len(text) <= limit: | |
| return [text] | |
| return [text[i:i + limit] for i in range(0, len(text), limit)] | |