Bot_telegram / app /utils.py
nssuwan186's picture
Initial upload of Hotel OS Bot project
cf064e5 verified
raw
history blame
3.93 kB
import base64
import re
import sqlite3
from datetime import datetime, date, time
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Optional, Dict
from dataclasses import dataclass
import pytesseract
from PIL import Image
@dataclass
class Settings:
telegram_bot_token: str
authorized_user_id: int
gemini_api_key: str
gemini_model: str
pms_blueprint_path: Path
database_path: Path
slip_storage_dir: Path
webapp_base_url: str
webapp_static_dir: Path
webapp_host: str
webapp_port: int
ocr_language: str
tesseract_cmd: Optional[str]
amount_tolerance: Decimal
@dataclass
class SlipExtractionResult:
raw_text: str
amount: Optional[Decimal]
payment_date: Optional[date]
payment_time: Optional[time]
def save_base64_image(b64: str, dest: Path) -> None:
"""Decode base64-encoded image and save to file."""
try:
data = base64.b64decode(b64)
dest.write_bytes(data)
except Exception as exc:
raise RuntimeError(f"ไม่สามารถบันทึกรูปภาพได้: {exc}")
def extract_slip_information(image_path: Path, settings: Settings) -> SlipExtractionResult:
"""Use Tesseract OCR to extract payment info from slip."""
try:
img = Image.open(image_path)
except Exception as exc:
raise RuntimeError(f"เปิดไฟล์ภาพไม่ได้: {exc}")
raw_text = pytesseract.image_to_string(img, lang=settings.ocr_language)
# Extract amount
amount: Optional[Decimal] = None
amt_match = re.search(r'(\d{1,3}(?:[,\s]\d{3})*(?:\.\d{2})|\d+\.\d{2})', raw_text)
if amt_match:
amt_raw = amt_match.group(1).replace(',', '').replace(' ', '')
try:
amount = Decimal(amt_raw)
except InvalidOperation:
pass
# Extract date
payment_date: Optional[date] = None
date_match = re.search(r'(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', raw_text)
if date_match:
for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"):
try:
payment_date = datetime.strptime(date_match.group(1), fmt).date()
break
except ValueError:
continue
# Extract time
payment_time: Optional[time] = None
time_match = re.search(r'(\d{1,2}:\d{2})(?:\s?(AM|PM))?', raw_text, re.IGNORECASE)
if time_match:
try:
t_str = time_match.group(1)
ampm = time_match.group(2)
if ampm:
payment_time = datetime.strptime(f"{t_str} {ampm}", "%I:%M %p").time()
else:
payment_time = datetime.strptime(t_str, "%H:%M").time()
except ValueError:
pass
return SlipExtractionResult(
raw_text=raw_text,
amount=amount,
payment_date=payment_date,
payment_time=payment_time,
)
def verify_booking_amount(
settings: Settings,
booking_id: str,
extracted_amount: Optional[Decimal],
) -> Dict[str, str]:
"""Verify extracted amount against booking."""
with sqlite3.connect(settings.database_path) as conn:
row = conn.execute("SELECT total_due FROM bookings WHERE id = ?", (booking_id,)).fetchone()
if not row:
return {"status": "booking_not_found"}
expected = Decimal(str(row[0]))
if extracted_amount is None:
return {"status": "amount_missing"}
if abs(expected - extracted_amount) <= settings.amount_tolerance:
return {"status": "verified", "expected_amount": str(expected)}
else:
return {"status": "amount_mismatch", "expected_amount": str(expected)}
def chunk_for_telegram(text: str, limit: int = 4096) -> list[str]:
"""Split text into Telegram-compatible chunks."""
if len(text) <= limit:
return [text]
return [text[i:i + limit] for i in range(0, len(text), limit)]