Spaces:
Runtime error
Runtime error
| # add_field.py | |
| import utils | |
| import os | |
| import folium | |
| import pandas as pd | |
| import streamlit as st | |
| import geopandas as gpd | |
| from folium.plugins import Draw | |
| from shapely.geometry import Polygon | |
| from streamlit_folium import st_folium | |
| from authentication import greeting, check_password | |
| import geopy | |
| from pyproj import Transformer | |
| from shapely.ops import transform | |
| from geopy.geocoders import Nominatim | |
| from shapely.ops import transform | |
| from geopy.geocoders import Nominatim | |
| def check_authentication(): | |
| if not check_password(): | |
| st.stop() | |
| # Function to get coordinates from a location name | |
| def get_location_coordinates(location_name,geolocator): | |
| try: | |
| location = geolocator.geocode(location_name) | |
| if location: | |
| return location.latitude, location.longitude | |
| else: | |
| return None, None | |
| except: | |
| return None, None | |
| def display_existing_fields(current_user): | |
| with st.expander("Existing Fields", expanded=False): | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| st.table(gdf.name.tolist()) | |
| mm = gdf.explore() | |
| st_folium(mm) | |
| else: | |
| st.info("No Fields Added Yet!") | |
| def add_existing_fields_to_map(field_map, current_user): | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| fg = folium.FeatureGroup(name="Existing Fields", control=True).add_to(field_map) | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| for i, row in gdf.iterrows(): | |
| edges = row['geometry'].exterior.coords.xy | |
| edges = [[i[1], i[0]] for i in zip(*edges)] | |
| folium.Polygon(edges, color='blue', fill=True, fill_color='blue', fill_opacity=0.6).add_to(fg) | |
| return field_map | |
| def get_center_of_existing_fields(current_user): | |
| location_name = st.text_input('Enter a location to search:') | |
| if location_name: | |
| geolocator = Nominatim(user_agent=current_user) | |
| geopy.geocoders.options.default_user_agent = current_user | |
| lat, lon = get_location_coordinates(location_name,geolocator) | |
| if lat is not None and lon is not None: | |
| return [lat, lon] | |
| else: | |
| st.error('Location not found. Please try again.') | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| edges = gdf['geometry'][0].exterior.coords.xy | |
| edges = [[i[1], i[0]] for i in zip(*edges)] | |
| edges_center = [sum([i[0] for i in edges]) / len(edges), sum([i[1] for i in edges]) / len(edges)] | |
| return edges_center | |
| return [15.572363674301132, 32.69167103104079] | |
| def display_map_and_drawing_controls(field_map, center_start): | |
| zoom_start = 13 | |
| if st.session_state['active_drawing'] is None: | |
| st.info("IMPORTANT: Click on the drawing to confirm the drawn field", icon="🚨") | |
| sat_basemap = utils.basemaps['Google Satellite Hybrid'] # Change this line to use 'Google Satellite Hybrid' | |
| sat_basemap.add_to(field_map) | |
| folium.plugins.Geocoder().add_to(field_map) | |
| folium.LayerControl().add_to(field_map) | |
| output = st_folium(field_map, center=center_start, zoom=zoom_start, key="new", width=900) | |
| active_drawing = output['last_active_drawing'] | |
| st.session_state['active_drawing'] = active_drawing | |
| return False | |
| else: | |
| st.info("Drawing Captured! Click on the button below to Clear Drawing and Draw Again") | |
| active_drawing = st.session_state['active_drawing'] | |
| new_map = folium.Map(location=center_start, zoom_start=8) | |
| edges = [[i[1], i[0]] for i in active_drawing['geometry']['coordinates'][0]] | |
| edges_center = [sum([i[0] for i in edges]) / len(edges), sum([i[1] for i in edges]) / len(edges)] | |
| folium.Polygon(edges, color='green', fill=True, fill_color='green', fill_opacity=0.6, name="New Field").add_to(new_map) | |
| sat_basemap = utils.basemaps['Google Satellite'] | |
| sat_basemap.add_to(new_map) | |
| folium.LayerControl().add_to(new_map) | |
| st_folium(new_map, center=edges_center, zoom=zoom_start, key="drawn", width=900) | |
| return True | |
| def handle_user_actions(active_drawing, current_user, intersects, within_area): | |
| draw_again_col, add_field_info_col = st.columns([1, 1]) | |
| with draw_again_col: | |
| draw_again = st.button("Draw Again", key="draw_again", help="Click to Clear Drawing and Draw Again", | |
| type="primary", use_container_width=True, disabled=st.session_state['active_drawing'] is None) | |
| if draw_again: | |
| st.session_state['active_drawing'] = None | |
| st.rerun() | |
| with add_field_info_col: | |
| if st.session_state['active_drawing'] is None: | |
| st.info("Drawing not captured yet!") | |
| else: | |
| field_name = st.text_input("Field Name*", help="Enter a distinct name for the field", key="field_name") | |
| if field_name == "": | |
| st.warning("Field Name cannot be empty!") | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| if field_name in gdf['name'].tolist(): | |
| st.warning("Field Name already exists. Please enter a different name!") | |
| submit = st.button("Submit", key="submit", help="Click to Submit Field Information", type="primary", | |
| use_container_width=True,disabled=(st.session_state['active_drawing'] is None or field_name == "") or intersects or not within_area) | |
| if submit: | |
| save_field_information(active_drawing, field_name, current_user) | |
| st.success("Field Information Submitted Successfully!") | |
| st.session_state['active_drawing'] = None | |
| st.rerun() | |
| def save_field_information(active_drawing, field_name, current_user): | |
| edges = [[i[0], i[1]] for i in active_drawing['geometry']['coordinates'][0]] | |
| geom = Polygon(edges) | |
| field_dict = { | |
| "name": field_name, | |
| "geometry": geom | |
| } | |
| gdf = gpd.GeoDataFrame([field_dict], geometry='geometry') | |
| gdf.crs = "EPSG:4326" | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| old_gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| gdf = gpd.GeoDataFrame(pd.concat([old_gdf, gdf], ignore_index=True), crs="EPSG:4326") | |
| gdf.to_parquet(f"fields_{current_user}.parquet") | |
| def initialize_active_drawing_state(): | |
| if 'active_drawing' not in st.session_state: | |
| st.session_state['active_drawing'] = None | |
| if 'current_user' not in st.session_state: | |
| st.session_state['current_user'] = None | |
| def check_intersection_with_existing_fields(active_drawing, current_user): | |
| if active_drawing is None: | |
| return False | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| edges = [[i[0], i[1]] for i in active_drawing['geometry']['coordinates'][0]] | |
| geom = Polygon(edges) | |
| geom = gpd.GeoSeries([geom]*len(gdf), crs="EPSG:4326") | |
| geom1 = geom.to_crs(gdf.crs) | |
| geom2 = gdf.geometry.to_crs(gdf.crs) | |
| if geom1.overlaps(geom2).any(): | |
| st.warning("Field intersects with existing fields. Please draw again!") | |
| with st.expander("Intersecting Fields", expanded=False): | |
| field_map = geom1.explore(name= "New Field", color="red") | |
| field_map = gdf.explore(m=field_map, name="Existing Fields", color="blue") | |
| st_folium(field_map) | |
| return True | |
| return False | |
| def check_polygon_area_within_range(active_drawing, min_area_km2=1, max_area_km2=10): | |
| if active_drawing is None: | |
| return | |
| transformer = Transformer.from_crs("EPSG:4326", "EPSG:6933", always_xy=True) | |
| edges = [[i[0], i[1]] for i in active_drawing['geometry']['coordinates'][0]] | |
| geom = Polygon(edges) | |
| transformed_geom = transform(transformer.transform, geom) | |
| area_km2 = transformed_geom.area / 10**6 | |
| if area_km2 < min_area_km2: | |
| st.warning(f"Field area {area_km2 :.2f} is less than {min_area_km2} km². Please draw again!") | |
| return False | |
| if area_km2 > max_area_km2: | |
| st.warning(f"Field area {area_km2 :.2f} is more than {max_area_km2} km². Please draw again!") | |
| return False | |
| st.success(f"Field area is {area_km2 :.2f} km², now give it a unique name {st.session_state['current_user']}!") | |
| return True | |
| def add_drawing(): | |
| initialize_active_drawing_state() | |
| current_user = greeting("Drag and Zoom and draw your fields on the map, make sure to name them uniquely") | |
| current_user = st.session_state['current_user'] | |
| display_existing_fields(current_user) | |
| center_start = get_center_of_existing_fields(current_user) | |
| zoom_start = 13 | |
| field_map = folium.Map(location=center_start, zoom_start=zoom_start) | |
| draw_options = {'polyline': False, 'polygon': True, 'rectangle': True, 'circle': False, 'marker': False, 'circlemarker': False} | |
| Draw(export=True, draw_options=draw_options).add_to(field_map) | |
| field_map = add_existing_fields_to_map(field_map, current_user) | |
| captured = display_map_and_drawing_controls(field_map, center_start) | |
| if captured: | |
| intersects = check_intersection_with_existing_fields(st.session_state['active_drawing'], current_user) | |
| within_area = check_polygon_area_within_range(st.session_state['active_drawing']) | |
| handle_user_actions(st.session_state['active_drawing'], current_user, intersects, within_area) | |
| if __name__ == '__main__': | |
| check_authentication() | |
| add_drawing() |