Spaces:
Running
Running
| from datetime import date, datetime, time, timedelta | |
| import json | |
| from pathlib import Path | |
| import ssl | |
| import tempfile | |
| import xml.etree.ElementTree as ET | |
| from typing import List, Optional, Tuple | |
| import gradio as gr | |
| import folium | |
| from folium.plugins import MarkerCluster | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download | |
| try: | |
| from gradio.components import Date as GrDateComponent | |
| except (ImportError, AttributeError): | |
| GrDateComponent = getattr(gr, "Date", None) or getattr(gr, "DatePicker", None) | |
| try: | |
| from shapely import wkt as shapely_wkt | |
| from shapely.geometry import Point | |
| SHAPELY_AVAILABLE = True | |
| except Exception: # ImportError or attribute issues | |
| shapely_wkt = None | |
| Point = None | |
| SHAPELY_AVAILABLE = False | |
| DEFAULT_CENTER = "41.9028,12.4964" | |
| DEFAULT_ZOOM = 12 | |
| DEFAULT_TILES = "CartoDB positron" | |
| DEFAULT_DATE_PROMPT = "Select the date to pull AIS data." | |
| DEFAULT_TIME_PROMPT = "Set start and end times to describe the daily window." | |
| DEFAULT_DATE = "2025-08-25" | |
| DEFAULT_START_TIME = "10:00:00" | |
| DEFAULT_END_TIME = "12:00:00" | |
| DEFAULT_AOI_WKT = """POLYGON((4.2100 51.3700,4.4800 51.3700,4.5100 51.2900,4.4650 51.1700,4.2500 51.1700,4.1900 51.2500,4.2100 51.3700))""" | |
| HF_REPO_ID = "Lore0123/AISPortal" | |
| HF_FILE_TEMPLATE = "{date}_ais.parquet" | |
| DATE_FMT = "%Y-%m-%d" | |
| DEFAULT_DATE_OBJ = datetime.strptime(DEFAULT_DATE, DATE_FMT).date() | |
| MAX_POINTS = 10_000 | |
| BANNER_PATH = (Path(__file__).resolve().parent / "src" / "banner.png") | |
| TILE_OPTIONS = { | |
| "OpenStreetMap": { | |
| "tiles": "OpenStreetMap", | |
| "attr": "© OpenStreetMap contributors", | |
| }, | |
| "Stamen Terrain": { | |
| "tiles": "Stamen Terrain", | |
| "attr": "Map tiles by Stamen Design, CC BY 3.0 — Data © OpenStreetMap contributors", | |
| }, | |
| "CartoDB positron": { | |
| "tiles": "https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png", | |
| "attr": "© OpenStreetMap contributors © CARTO", | |
| }, | |
| "CartoDB dark_matter": { | |
| "tiles": "https://{s}.basemaps.cartocdn.com/dark_all/{z}/{x}/{y}{r}.png", | |
| "attr": "© OpenStreetMap contributors © CARTO", | |
| }, | |
| } | |
| def _parse_center(center: str) -> Tuple[float, float]: | |
| """ | |
| Parse "lat,lon" into (lat, lon). | |
| """ | |
| try: | |
| lat_str, lon_str = [x.strip() for x in center.split(",")] | |
| lat, lon = float(lat_str), float(lon_str) | |
| if not (-90 <= lat <= 90 and -180 <= lon <= 180): | |
| raise ValueError | |
| return lat, lon | |
| except Exception: | |
| # Default: Rome | |
| return 41.9028, 12.4964 | |
| def _parse_date(value) -> Optional[date]: | |
| if not value: | |
| return None | |
| if isinstance(value, date): | |
| return value | |
| if isinstance(value, str): | |
| raw = value.strip() | |
| if not raw: | |
| return None | |
| try: | |
| return datetime.strptime(raw, DATE_FMT).date() | |
| except ValueError: | |
| return None | |
| return None | |
| def _iterate_dates(start: Optional[date], end: Optional[date]) -> List[date]: | |
| if start and end: | |
| if end < start: | |
| start, end = end, start | |
| elif start: | |
| end = start | |
| elif end: | |
| start = end | |
| else: | |
| return [] | |
| current = start | |
| dates: List[date] = [] | |
| while current <= end: | |
| dates.append(current) | |
| current += timedelta(days=1) | |
| return dates | |
| def _normalize_column_key(value: str) -> str: | |
| return "".join(ch for ch in value.lower() if ch.isalnum()) | |
| def _find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: | |
| normalized_map = {} | |
| for col in df.columns: | |
| normalized_map.setdefault(_normalize_column_key(col), col) | |
| for candidate in candidates: | |
| key = _normalize_column_key(candidate) | |
| if key in normalized_map: | |
| return normalized_map[key] | |
| return None | |
| def _parse_time(value: Optional[str]) -> Optional[time]: | |
| if not value: | |
| return None | |
| if isinstance(value, str): | |
| raw = value.strip() | |
| if not raw: | |
| return None | |
| for fmt in ("%H:%M:%S", "%H:%M"): | |
| try: | |
| parsed = datetime.strptime(raw, fmt) | |
| return parsed.time() | |
| except ValueError: | |
| continue | |
| return None | |
| return None | |
| def _build_time_mask(datetimes: pd.Series, | |
| start_time_obj: Optional[time], | |
| end_time_obj: Optional[time]) -> Optional[pd.Series]: | |
| if start_time_obj is None and end_time_obj is None: | |
| return None | |
| dt_series = pd.to_datetime(datetimes, errors="coerce", utc=False) | |
| valid = dt_series.notna() | |
| times = dt_series.dt.time | |
| cond = pd.Series(True, index=dt_series.index) | |
| if start_time_obj and end_time_obj: | |
| if start_time_obj <= end_time_obj: | |
| cond &= (times >= start_time_obj) & (times <= end_time_obj) | |
| else: | |
| cond &= (times >= start_time_obj) | (times <= end_time_obj) | |
| elif start_time_obj: | |
| cond &= times >= start_time_obj | |
| else: | |
| cond &= times <= end_time_obj | |
| return cond & valid | |
| def _load_ais_points(start_date: Optional[str], | |
| end_date: Optional[str], | |
| start_time: Optional[str], | |
| end_time: Optional[str]) -> Tuple[pd.DataFrame, List[str]]: | |
| """Download AIS parquet files, filter them, and return the full filtered rows.""" | |
| start = _parse_date(start_date) | |
| end = _parse_date(end_date) | |
| dates = _iterate_dates(start, end) | |
| if not dates: | |
| return pd.DataFrame(columns=["name", "lat", "lon", "source_date", "timestamp", "mmsi"]), [] | |
| frames: List[pd.DataFrame] = [] | |
| errors: List[str] = [] | |
| start_time_obj = _parse_time(start_time) | |
| end_time_obj = _parse_time(end_time) | |
| for day in dates: | |
| filename = HF_FILE_TEMPLATE.format(date=day.isoformat()) | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=filename, | |
| repo_type="dataset" | |
| ) | |
| except Exception as exc: # pragma: no cover - network dependent | |
| errors.append(f"{day}: download failed ({exc})") | |
| continue | |
| try: | |
| df = pd.read_parquet(local_path) | |
| except Exception as exc: # pragma: no cover - file dependent | |
| errors.append(f"{day}: failed to read parquet ({exc})") | |
| continue | |
| lat_col = _find_column(df, ["lat", "latitude"]) | |
| lon_col = _find_column(df, ["lon", "longitude", "long", "lng"]) | |
| if lat_col is None or lon_col is None: | |
| errors.append(f"{day}: missing latitude/longitude columns") | |
| continue | |
| time_col = _find_column(df, [ | |
| "tstamp", | |
| "timestamp", | |
| "time", | |
| "datetime", | |
| "basedatetime", | |
| "baseDateTime", | |
| "received_time", | |
| "receivedtime" | |
| ]) | |
| if time_col is not None: | |
| mask = _build_time_mask(df[time_col], start_time_obj, end_time_obj) | |
| if mask is not None: | |
| df = df[mask.fillna(False)] | |
| elif start_time_obj or end_time_obj: | |
| errors.append(f"{day}: no timestamp column for time filtering") | |
| if df.empty: | |
| continue | |
| lat_series = pd.to_numeric(df[lat_col], errors="coerce") | |
| lon_series = pd.to_numeric(df[lon_col], errors="coerce") | |
| valid_mask = lat_series.notna() & lon_series.notna() | |
| if not valid_mask.any(): | |
| continue | |
| subset = df.loc[valid_mask].copy() | |
| subset["lat"] = lat_series.loc[valid_mask].astype(float) | |
| subset["lon"] = lon_series.loc[valid_mask].astype(float) | |
| name_col = _find_column(df, ["name", "shipname", "vessel", "imo", "callsign", "vesselname"]) | |
| if name_col is not None: | |
| subset_names = subset[name_col].fillna("").astype(str) | |
| else: | |
| subset_names = pd.Series("", index=subset.index) | |
| subset["name"] = subset_names.replace({"nan": "", "None": ""}) | |
| subset["source_date"] = day.isoformat() | |
| mmsi_col = _find_column(df, ["mmsi", "mmsi_id"]) | |
| if mmsi_col is not None: | |
| subset_mmsi = subset[mmsi_col].fillna("").astype(str) | |
| subset_mmsi = subset_mmsi.replace({"nan": "", "None": ""}) | |
| subset["mmsi"] = subset_mmsi | |
| else: | |
| subset["mmsi"] = "" | |
| if time_col is not None: | |
| ts_series = pd.to_datetime(subset[time_col], errors="coerce", utc=True) | |
| try: | |
| ts_local = ts_series.dt.tz_convert(None) | |
| except TypeError: # already naive | |
| ts_local = ts_series | |
| subset["timestamp"] = ts_local.dt.strftime("%Y-%m-%d %H:%M:%S").fillna("") | |
| else: | |
| subset["timestamp"] = "" | |
| frames.append(subset.reset_index(drop=True)) | |
| if not frames: | |
| return pd.DataFrame(columns=[ | |
| "name", | |
| "lat", | |
| "lon", | |
| "source_date", | |
| "timestamp", | |
| "mmsi" | |
| ]), errors | |
| result = pd.concat(frames, ignore_index=True) | |
| return result, errors | |
| def render_map(selected_date, | |
| start_time: Optional[str], | |
| end_time: Optional[str], | |
| aoi_wkt: Optional[str]) -> Tuple[str, str, str]: | |
| """ | |
| Build a Leaflet map and return full HTML (rendered by Gradio HTML component). | |
| """ | |
| lat, lon = _parse_center(DEFAULT_CENTER) | |
| tile_cfg = TILE_OPTIONS[DEFAULT_TILES] | |
| map_kwargs = { | |
| "location": [lat, lon], | |
| "zoom_start": DEFAULT_ZOOM, | |
| "tiles": tile_cfg.get("tiles", DEFAULT_TILES), | |
| "control_scale": True, | |
| "width": "100%", | |
| "height": "600px", | |
| } | |
| attr = tile_cfg.get("attr") | |
| if attr: | |
| map_kwargs["attr"] = attr | |
| m = folium.Map(**map_kwargs) | |
| # Points | |
| bounds: List[Tuple[float, float]] = [] | |
| point_count = 0 | |
| error_message: Optional[str] = None | |
| error_marker_added = False | |
| selected_date_str = _coerce_date_string(selected_date) | |
| export_df = pd.DataFrame() | |
| try: | |
| export_df, errors = _load_ais_points(selected_date_str, selected_date_str, start_time, end_time) | |
| if not export_df.empty: | |
| export_df, aoi_error = _filter_by_aoi(export_df, aoi_wkt) | |
| if aoi_error: | |
| errors.append(aoi_error) | |
| map_df = pd.DataFrame() | |
| if not export_df.empty: | |
| map_df = export_df[["name", "lat", "lon", "source_date", "timestamp", "mmsi"]].copy() | |
| if len(map_df) > MAX_POINTS: | |
| sampled_idx = map_df.sample(MAX_POINTS, random_state=0).index | |
| map_df = map_df.loc[sampled_idx] | |
| map_df = map_df.reset_index(drop=True) | |
| if not map_df.empty: | |
| cluster = MarkerCluster(name="AIS Points").add_to(m) | |
| for _, r in map_df.iterrows(): | |
| name_raw = r.get("name") | |
| name = str(name_raw).strip() if name_raw is not None else "" | |
| if name.lower() == "nan": | |
| name = "" | |
| source_date = r.get("source_date", "?") | |
| timestamp = r.get("timestamp") | |
| mmsi = str(r.get("mmsi") or "").strip() | |
| details = [] | |
| if name: | |
| details.append(f"Name: {name}") | |
| if mmsi: | |
| details.append(f"MMSI: {mmsi}") | |
| details.append(f"Date: {source_date}") | |
| if isinstance(timestamp, str) and timestamp: | |
| details.append(f"Timestamp: {timestamp}") | |
| details.append(f"Lat: {r['lat']:.6f}") | |
| details.append(f"Lon: {r['lon']:.6f}") | |
| popup = "<br>".join(details) | |
| folium.Marker([r["lat"], r["lon"]], popup=popup).add_to(cluster) | |
| bounds.append((r["lat"], r["lon"])) | |
| point_count = len(map_df) | |
| error_message = _summarize_errors(errors) | |
| except Exception as e: | |
| error_message = f"AIS data error: {e}" | |
| _add_error_marker(m, lat, lon, error_message) | |
| error_marker_added = True | |
| if error_message and not error_marker_added: | |
| _add_error_marker(m, lat, lon, error_message) | |
| # Fit to data if any bounds collected | |
| if bounds: | |
| m.fit_bounds(bounds, padding=(20, 20)) | |
| html = m._repr_html_() | |
| date_range = _format_date_display(selected_date_str, default_prompt=DEFAULT_DATE_PROMPT) | |
| time_range = _format_range(start_time, end_time, default_prompt=DEFAULT_TIME_PROMPT) | |
| info_lines = [ | |
| "### Selected Period", | |
| f"- Date: {date_range}", | |
| f"- Times: {time_range}", | |
| f"- Points on map: {point_count}" | |
| ] | |
| if error_message: | |
| info_lines.append(f"- Error: {error_message}") | |
| ssl_msg = _ssl_warning() | |
| if ssl_msg: | |
| info_lines.append(f"- SSL: {ssl_msg}") | |
| export_payload = export_df.reset_index(drop=True) | |
| data_json = export_payload.to_json(orient="records") if not export_payload.empty else "[]" | |
| return html, "\n".join(info_lines), data_json | |
| def _format_range(start: Optional[str], end: Optional[str], default_prompt: str) -> str: | |
| start_clean = _clean_input(start) | |
| end_clean = _clean_input(end) | |
| if not start_clean and not end_clean: | |
| return default_prompt | |
| return f"{start_clean or '—'} → {end_clean or '—'}" | |
| def _clean_input(value: Optional[str]) -> Optional[str]: | |
| if value is None: | |
| return None | |
| if isinstance(value, str): | |
| cleaned = value.strip() | |
| return cleaned or None | |
| return str(value) | |
| def _filter_by_aoi(df: pd.DataFrame, wkt_text: Optional[str]) -> Tuple[pd.DataFrame, Optional[str]]: | |
| wkt_clean = _clean_input(wkt_text) | |
| if not wkt_clean: | |
| return df, None | |
| if not SHAPELY_AVAILABLE or shapely_wkt is None or Point is None: | |
| return df, "AOI filter unavailable: install shapely." | |
| try: | |
| geom = shapely_wkt.loads(wkt_clean) | |
| except Exception as exc: | |
| return df, f"AOI parse error: {exc}" | |
| if geom.is_empty: | |
| return df, "AOI geometry is empty." | |
| def contains_point(row) -> bool: | |
| try: | |
| pt = Point(float(row["lon"]), float(row["lat"])) | |
| except Exception: | |
| return False | |
| return geom.contains(pt) or geom.touches(pt) | |
| mask = df.apply(contains_point, axis=1) | |
| if mask.sum() == 0: | |
| return df.iloc[0:0].copy(), "AOI filter removed all points." | |
| return df[mask].reset_index(drop=True), None | |
| def _summarize_errors(errors: List[str]) -> Optional[str]: | |
| if not errors: | |
| return None | |
| unique: List[str] = [] | |
| for err in errors: | |
| if err not in unique: | |
| unique.append(err) | |
| if len(unique) == 3: | |
| break | |
| extra = len(errors) - len(unique) | |
| message = "; ".join(unique) | |
| if extra > 0: | |
| message += f"; (+{extra} more)" | |
| return message | |
| def _add_error_marker(map_obj: folium.Map, lat: float, lon: float, message: str) -> None: | |
| folium.Marker( | |
| [lat, lon], | |
| icon=folium.DivIcon(html=f"<div style='color:red;font-weight:bold;'>{message}</div>") | |
| ).add_to(map_obj) | |
| def _ssl_warning() -> Optional[str]: | |
| backend = getattr(ssl, "OPENSSL_VERSION", "") | |
| if "LibreSSL" in backend: | |
| return "Detected LibreSSL; Hugging Face downloads need OpenSSL 1.1.1+. Use Python from python.org or upgrade SSL." | |
| return None | |
| def export_data(fmt: str, data_json: Optional[str]) -> str: | |
| fmt_clean = (fmt or "").strip().upper() | |
| if not data_json or not data_json.strip(): | |
| raise gr.Error("No AIS data available to export.") | |
| try: | |
| records = json.loads(data_json) | |
| except json.JSONDecodeError as exc: | |
| raise gr.Error(f"Export failed: invalid data ({exc}).") | |
| if not records: | |
| raise gr.Error("No AIS data available to export.") | |
| df = pd.DataFrame(records) | |
| if df.empty: | |
| raise gr.Error("No AIS data available to export.") | |
| suffix = { | |
| "CSV": ".csv", | |
| "JSON": ".json", | |
| "XML": ".xml", | |
| }.get(fmt_clean) | |
| if suffix is None: | |
| raise gr.Error(f"Unsupported format: {fmt}.") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| path = tmp.name | |
| if fmt_clean == "CSV": | |
| df.to_csv(path, index=False) | |
| elif fmt_clean == "JSON": | |
| df.to_json(path, orient="records", indent=2) | |
| else: # XML | |
| root = ET.Element("AISData") | |
| for record in records: | |
| entry = ET.SubElement(root, "Record") | |
| for key, value in record.items(): | |
| child = ET.SubElement(entry, key) | |
| child.text = "" if value is None else str(value) | |
| tree = ET.ElementTree(root) | |
| tree.write(path, encoding="utf-8", xml_declaration=True) | |
| return path | |
| def _coerce_date_string(value) -> Optional[str]: | |
| parsed = _parse_date(value) | |
| if parsed is not None: | |
| return parsed.isoformat() | |
| cleaned = _clean_input(value) | |
| return cleaned | |
| def _format_date_display(value: Optional[str], default_prompt: str) -> str: | |
| parsed = _parse_date(value) | |
| if parsed is not None: | |
| return parsed.isoformat() | |
| cleaned = _clean_input(value) | |
| return cleaned or default_prompt | |
| with gr.Blocks(title="AIS MAP - ESA") as demo: | |
| if BANNER_PATH.exists(): | |
| gr.Image( | |
| value=str(BANNER_PATH), | |
| show_label=False, | |
| interactive=False, | |
| elem_id="banner", | |
| ) | |
| gr.Markdown( | |
| """ | |
| #### This data access provides globally collected Automatic Identification System (AIS) data, structured and organized on a daily basis for consistent access and analysis. Lightweight utilities to fetch and normalize AIS (Automatic Identification System) data from the AIS Hub webservice. | |
| """ | |
| ) | |
| gr.Markdown( | |
| """ | |
| *--Developed by ESA Φ-lab - accelerating the future of Earth Observation (EO) through disruptive/transformational innovations and commercialisation.--* | |
| """ | |
| ) | |
| gr.Markdown("## Φ-lab Interactive AIS Map") | |
| gr.Markdown( | |
| """ | |
| ### Quick guide | |
| Select the **date** to retrieve AIS snapshots, optionally narrow the **UTC time window**, and focus on your study area by pasting an **AOI polygon** in WKT form. Hit **Apply Filters** to refresh the map; use **Export** to download the full table of filtered messages. | |
| """ | |
| ) | |
| initial_date_value = DEFAULT_DATE_OBJ if GrDateComponent is not None else DEFAULT_DATE | |
| with gr.Row(): | |
| if GrDateComponent is not None: | |
| selected_date = GrDateComponent( | |
| label="Date", | |
| value=initial_date_value, | |
| ) | |
| else: | |
| selected_date = gr.Textbox( | |
| label="Date (YYYY-MM-DD)", | |
| value=initial_date_value, | |
| placeholder="YYYY-MM-DD", | |
| scale=1, | |
| max_lines=1, | |
| min_width=160, | |
| ) | |
| start_time = gr.Textbox( | |
| label="Start time", | |
| placeholder="HH:MM:SS", | |
| value=DEFAULT_START_TIME, | |
| scale=1, | |
| max_lines=1, | |
| min_width=120, | |
| ) | |
| end_time = gr.Textbox( | |
| label="End time", | |
| placeholder="HH:MM:SS", | |
| value=DEFAULT_END_TIME, | |
| scale=1, | |
| max_lines=1, | |
| min_width=120, | |
| ) | |
| with gr.Row(): | |
| aoi_wkt = gr.Textbox( | |
| label="AOI (Polygon WKT)", | |
| placeholder="POLYGON((lon lat, ...))", | |
| value=DEFAULT_AOI_WKT, | |
| lines=3, | |
| max_lines=6, | |
| ) | |
| btn = gr.Button("Apply Filters", variant="primary") | |
| initial_map, initial_info, initial_data = render_map( | |
| initial_date_value, | |
| DEFAULT_START_TIME, | |
| DEFAULT_END_TIME, | |
| DEFAULT_AOI_WKT | |
| ) | |
| out = gr.HTML(label="Map", value=initial_map, elem_id="map-view") | |
| period = gr.Markdown(value=initial_info, elem_id="period-info") | |
| data_state = gr.State(initial_data) | |
| input_components = [selected_date, start_time, end_time, aoi_wkt] | |
| with gr.Row(): | |
| export_format = gr.Dropdown( | |
| ["CSV", "JSON", "XML"], | |
| value="CSV", | |
| label="Export format", | |
| scale=1, | |
| ) | |
| export_btn = gr.Button("Export", variant="secondary") | |
| download = gr.File(label="Download", file_count="single") | |
| demo.load(render_map, inputs=input_components, outputs=[out, period, data_state]) | |
| btn.click(render_map, inputs=input_components, outputs=[out, period, data_state]) | |
| export_btn.click(export_data, inputs=[export_format, data_state], outputs=download) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |