Spaces:
Runtime error
Runtime error
| # monitor.py | |
| import os | |
| import utils | |
| import streamlit as st | |
| import geopandas as gpd | |
| from authentication import greeting, check_password | |
| from senHub import SenHub | |
| from datetime import datetime, timedelta | |
| from sentinelhub import SHConfig | |
| import requests | |
| import process | |
| from zipfile import ZipFile | |
| import plotly.express as px | |
| import threading | |
| import pandas as pd | |
| import grpc | |
| import pb.timesfm_pb2_grpc | |
| import pb.timesfm_pb2 | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import time | |
| import dotenv | |
| load_dotenv() | |
| def check_authentication(): | |
| if not check_password(): | |
| st.stop() | |
| config = SHConfig() | |
| config.instance_id = '44e79764-8b9d-43b0-a4bf-15799db2899d' | |
| config.sh_client_id = '4ae34b53-3f81-4ba0-9c7d-b6fb0606dac3' | |
| config.sh_client_secret = '3IPSSqE75fqK38vP85hxttR9PJEs5OxX' | |
| config.sh_timesfm_IP = "34.121.141.161" | |
| try: | |
| OpenAI_key = os.getenv('OPENAI_KEY') | |
| client = OpenAI(api_key= OpenAI_key) | |
| except: | |
| OpenAI_key = "sk-" | |
| client = OpenAI(api_key= OpenAI_key) | |
| def select_field(gdf): | |
| st.markdown(""" | |
| <style> | |
| .stSelectbox > div > div {cursor: pointer;} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| names = gdf['name'].tolist() | |
| names.append("Select Field") | |
| field_name = st.selectbox("Select Field", options=names, key="field_name_monitor", help="Select the field to edit", index=len(names)-1) | |
| return field_name | |
| def calculate_bbox(df, field): | |
| bbox = df.loc[df['name'] == field].bounds | |
| r = bbox.iloc[0] | |
| return [r.minx, r.miny, r.maxx, r.maxy] | |
| def get_available_dates_for_field(df, field, year, start_date='', end_date=''): | |
| bbox = calculate_bbox(df, field) | |
| token = SenHub(config).token | |
| headers = utils.get_bearer_token_headers(token) | |
| if start_date == '' or end_date == '': | |
| start_date = f'{year}-01-01' | |
| end_date = f'{year}-12-31' | |
| data = f'{{ "collections": [ "sentinel-2-l2a" ], "datetime": "{start_date}T00:00:00Z/{end_date}T23:59:59Z", "bbox": {bbox}, "limit": 100, "distinct": "date" }}' | |
| response = requests.post('https://services.sentinel-hub.com/api/v1/catalog/search', headers=headers, data=data) | |
| try: | |
| features = response.json()['features'] | |
| except: | |
| print(response.json()) | |
| features = [] | |
| return features | |
| def get_and_cache_available_dates(_df, field, year, start_date, end_date): | |
| dates = get_available_dates_for_field(_df, field, year, start_date, end_date) | |
| print(f'Caching Dates for {field}') | |
| return dates | |
| # def get_cuarted_df_for_field(df, field, date, metric, clientName): | |
| # curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| # if curated_date_path is not None: | |
| # curated_df = gpd.read_file(curated_date_path) | |
| # else: | |
| # process.Download_image_in_given_date(clientName, metric, df, field, date) | |
| # process.mask_downladed_image(clientName, metric, df, field, date) | |
| # process.convert_maske_image_to_geodataframe(clientName, metric, df, field, date, df.crs) | |
| # curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| # curated_df = gpd.read_file(curated_date_path) | |
| # return curated_df | |
| def get_cuarted_df_for_field(df, field, date, metric, clientName, dates=None): | |
| curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| if curated_date_path is not None: | |
| curated_df = gpd.read_file(curated_date_path) | |
| else: | |
| download_date_data(df, field, [date], metric, clientName,) | |
| curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| print("curr selected date processed") | |
| if dates: | |
| old_dates = [prev_date for prev_date in dates if prev_date != date] | |
| download_thread = threading.Thread(target=download_date_data, name="Downloader", args=(df, field, old_dates, metric, clientName)) | |
| download_thread.start() | |
| curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| curated_df = gpd.read_file(curated_date_path) | |
| return curated_df | |
| # def check_and_download_date_data(df, field, date, metric, clientName,): | |
| # curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| # if curated_date_path is not None: | |
| # curated_df = gpd.read_file(curated_date_path) | |
| # else: | |
| # process.Download_image_in_given_date(clientName, metric, df, field, date) | |
| # process.mask_downladed_image(clientName, metric, df, field, date) | |
| # process.convert_maske_image_to_geodataframe(clientName, metric, df, field, date, df.crs) | |
| # curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) | |
| # curated_df = gpd.read_file(curated_date_path) | |
| # return curated_df | |
| def download_date_data(df, field, dates, metric, clientName,): | |
| for date in dates: | |
| process.Download_image_in_given_date(clientName, metric, df, field, date) | |
| process.mask_downladed_image(clientName, metric, df, field, date) | |
| process.convert_maske_image_to_geodataframe(clientName, metric, df, field, date, df.crs) | |
| # print(f"finished downloading prev dates data") | |
| return | |
| def track(metric, field_name, src_df, client_name): | |
| st.subheader(":green[Select Date and Start Monitoring]") | |
| dates = [] | |
| date = -1 | |
| if 'dates' not in st.session_state: | |
| st.session_state['dates'] = dates | |
| else: | |
| dates = st.session_state['dates'] | |
| if 'date' not in st.session_state: | |
| st.session_state['date'] = date | |
| else: | |
| date = st.session_state['date'] | |
| if True: | |
| start_date = '2024-01-01' | |
| today = datetime.today() | |
| end_date = today.strftime('%Y-%m-%d') | |
| year = '2024' | |
| dates = get_and_cache_available_dates(src_df, field_name, year, start_date, end_date) | |
| # Add None to the end of the list to be used as a default value | |
| #sort the dates from earliest to today | |
| dates = sorted(dates) | |
| #Add the dates to the session state | |
| st.session_state['dates'] = dates | |
| # Display the dropdown menu | |
| if len(dates) > 0: | |
| st.markdown(""" | |
| <style> | |
| .stSelectbox > div > div {cursor: pointer;} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| dates.append(-1) | |
| date = st.selectbox('Select Observation Date: ', dates, index=len(dates)-1, key=f'Select Date Dropdown Menu - {metric}') | |
| if date != -1: | |
| st.write(f'You selected: {date}') | |
| #Add the date to the session state | |
| st.session_state['date'] = date | |
| else: | |
| st.write('Please Select A Date') | |
| else: | |
| st.info('No dates available for the selected field and dates range, select a different range or click the button to fetch the dates again') | |
| st.markdown('---') | |
| st.subheader('Show Field Data') | |
| # If a field and a date are selected, display the field data | |
| if date != -1: | |
| # Get the field data at the selected date | |
| with st.spinner('Loading Field Data...'): | |
| # Get the metric data and cloud cover data for the selected field and date, to enable background download set dates=dates | |
| metric_data = get_cuarted_df_for_field(src_df, field_name, date, metric, client_name, dates=None) | |
| cloud_cover_data = get_cuarted_df_for_field(src_df, field_name, date, 'CLP', client_name, dates=None) | |
| #Merge the metric and cloud cover data on the geometry column | |
| field_data = metric_data.merge(cloud_cover_data, on='geometry') | |
| # Display the field data | |
| avg_clp = field_data[f'CLP_{date}'].mean() *100 | |
| avg_metric = field_data[f'{metric}_{date}'].mean() | |
| st.write(f'Field Data for (Field ID: {field_name}) on {date}') | |
| col1,col3,col5,col2,col4 = st.columns(5) | |
| col1.metric(f":orange[Average {metric}]", value=f"{avg_metric :.2f}") | |
| col2.metric(":green[Cloud Cover]", value=f"{avg_clp :.2f}%") | |
| #Get Avarage Cloud Cover | |
| # If the avarage cloud cover is greater than 80%, display a warning message | |
| if avg_clp > 80: | |
| st.warning(f'β οΈ The Avarage Cloud Cover is {avg_clp}%') | |
| st.info('Please Select A Different Date') | |
| df = field_data.copy() | |
| df['latitude'] = df['geometry'].y | |
| df['longitude'] = df['geometry'].x | |
| # Create a scatter plot | |
| fig = px.scatter_mapbox( | |
| df, | |
| lat='latitude', | |
| lon='longitude', | |
| color=f'{metric}_{date}', | |
| color_continuous_scale='RdYlGn', | |
| range_color=(0, 1), | |
| width= 800, | |
| height=600, | |
| size_max=15, | |
| zoom=13, | |
| ) | |
| # Add the base map | |
| with st.expander("Show Map", expanded=False): | |
| token = open("token.mapbox_token").read() | |
| fig.update_layout(mapbox_style="satellite", mapbox_accesstoken=token) | |
| st.plotly_chart(fig, use_container_width=True) | |
| #Dwonload Links | |
| # If the field data is not empty, display the download links | |
| if len(field_data) > 0: | |
| # Create two columns for the download links | |
| download_as_shp_col, download_as_tiff_col = st.columns(2) | |
| # Create a shapefile of the field data and add a download link | |
| with download_as_shp_col: | |
| #Set the shapefile name and path based on the field id, metric and date | |
| extension = 'shp' | |
| shapefilename = f"{field_name}_{metric}_{date}.{extension}" | |
| path = f'./shapefiles/{field_name}/{metric}/{extension}' | |
| # Create the target directory if it doesn't exist | |
| os.makedirs(path, exist_ok=True) | |
| # Save the field data as a shapefile | |
| field_data.to_file(f'{path}/{shapefilename}') | |
| # Create a zip file of the shapefile | |
| files = [] | |
| for i in os.listdir(path): | |
| if os.path.isfile(os.path.join(path,i)): | |
| if i[0:len(shapefilename)] == shapefilename: | |
| files.append(os.path.join(path,i)) | |
| zipFileName = f'{path}/{field_name}_{metric}_{date}.zip' | |
| zipObj = ZipFile(zipFileName, 'w') | |
| for file in files: | |
| zipObj.write(file) | |
| zipObj.close() | |
| # Add a download link for the zip file | |
| with open(zipFileName, 'rb') as f: | |
| st.download_button('Download as ShapeFile', f,file_name=zipFileName) | |
| # Get the tiff file path and create a download link | |
| with download_as_tiff_col: | |
| #get the tiff file path | |
| tiff_path = utils.get_masked_location_img_path(client_name, metric, date, field_name) | |
| # Add a download link for the tiff file | |
| donwnload_filename = f'{metric}_{field_name}_{date}.tiff' | |
| with open(tiff_path, 'rb') as f: | |
| st.download_button('Download as Tiff File', f,file_name=donwnload_filename) | |
| else: | |
| st.info('Please Select A Field and A Date') | |
| def monitor_fields(): | |
| st.title(":orange[Field Monitoring]") | |
| row1,row2 = st.columns([1,2]) | |
| with row1: | |
| current_user = greeting("Let's take a look how these fields are doing") | |
| if os.path.exists(f"fields_{current_user}.parquet"): | |
| gdf = gpd.read_parquet(f"fields_{current_user}.parquet") | |
| field_name = select_field(gdf) | |
| if field_name == "Select Field": | |
| st.info("No Field Selected Yet!") | |
| else: | |
| metric = st.radio("Select Metric to Monitor", ["NDVI", "LAI", "CAB"], key="metric", index=0, help="Select the metric to monitor") | |
| st.write(f"Monitoring {metric} for {field_name}") | |
| with st.expander("Metrics Explanation", expanded=False): | |
| st.write("NDVI: Normalized Difference Vegetation Index, Mainly used to monitor the health of vegetation") | |
| st.write("LAI: Leaf Area Index, Mainly used to monitor the productivity of vegetation") | |
| st.write("CAB: Chlorophyll Absorption in the Blue band, Mainly used to monitor the chlorophyll content in vegetation") | |
| # st.write("NDMI: Normalized Difference Moisture Index, Mainly used to monitor the moisture content in vegetation") | |
| else: | |
| st.info("No Fields Added Yet!") | |
| return | |
| with row2: | |
| if field_name != "Select Field": | |
| track(metric, field_name, gdf, current_user) | |
| if field_name != "Select Field": | |
| st.title(":orange[Field Health Forecast]") | |
| st.write(f"Press the button below to predict {metric} for the next 30 weeks") | |
| # Reset session state if any of the required keys are missing | |
| required_keys = ['api_token', 'api_token_confirmed', 'valid_until'] | |
| if any(key not in st.session_state for key in required_keys): | |
| st.session_state['api_token'] = '' | |
| st.session_state['api_token_confirmed'] = False | |
| st.session_state['valid_until'] = '' | |
| if not st.session_state['api_token_confirmed']: | |
| st.warning("No Valid API Token Found") | |
| with st.expander("Need a new API Token?", expanded=True): | |
| st.markdown(utils.NEW_TOKEN_INSTRUCTIONS, unsafe_allow_html=True) | |
| with st.expander("Token Usage history", expanded=False): | |
| filename = f'{current_user}_tokens.csv' | |
| if os.path.exists(filename): | |
| token_usage = pd.read_csv(filename) | |
| token_usage['is_expired'] = token_usage['valid_until'].apply(lambda x: 'Yes' if datetime.strptime(x, '%Y-%m-%d %H:%M:%S') < datetime.now() else 'No') | |
| st.write(token_usage) | |
| else: | |
| st.write("No Token Usage History Found") | |
| api_token = st.text_input("API Token", key="api_token_input", help="Enter the API Token From SNET") | |
| if st.button("submit API Token", key="confirm_api_token"): | |
| if utils.confirm_api_token(api_token)['valid']: | |
| st.session_state['api_token'] = api_token | |
| st.session_state['api_token_confirmed'] = True | |
| st.session_state['valid_until'] = utils.load_token_expiration(api_token).strftime('%Y-%m-%d %H:%M:%S') | |
| st.rerun() | |
| else: | |
| st.error(f"Invalid API Token; {utils.confirm_api_token(api_token)['message']}") | |
| else: | |
| now = datetime.now() | |
| valid_until = datetime.strptime(st.session_state['valid_until'], '%Y-%m-%d %H:%M:%S') | |
| time_remaining = valid_until - now | |
| minutes_remaining = int(time_remaining.total_seconds() // 60) | |
| seconds_remaining = int(time_remaining.total_seconds() % 60) | |
| time_left_column, clear_token_column = st.columns([1,1]) | |
| with time_left_column: | |
| st.success(f"API Token Confirmed. Token valid for {minutes_remaining} minutes and {seconds_remaining} seconds") | |
| with clear_token_column: | |
| if st.button("Clear API Token", key="clear_api_token"): | |
| st.session_state['api_token'] = '' | |
| st.session_state['api_token_confirmed'] = False | |
| st.session_state['valid_until'] = '' | |
| st.rerun() | |
| with st.expander("Need a new API Token?", expanded=False): | |
| st.markdown(utils.NEW_TOKEN_INSTRUCTIONS, unsafe_allow_html=True) | |
| with st.expander("Token Usage history", expanded=False): | |
| token_usage = utils.manage_user_tokens(current_user, st.session_state['api_token'], valid_until.strftime('%Y-%m-%d %H:%M:%S')) | |
| token_usage['is_expired'] = token_usage['valid_until'].apply(lambda x: 'Yes' if datetime.strptime(x, '%Y-%m-%d %H:%M:%S') < datetime.now() else 'No') | |
| st.write(token_usage) | |
| lookback_days = st.slider("Select Lookback Days", 10, 360, 30, step=10,key="lookback_days", help="Large lookback days may take longer to load") | |
| subcol1, subcol2, subcol3 = st.columns(3) | |
| if subcol2.button(f'Predict & Recommend', key="predict_button", disabled=not st.session_state['api_token_confirmed']): | |
| # start_date = '2024-01-01' | |
| today = datetime.today() | |
| end_date = today.strftime('%Y-%m-%d') | |
| start_date = today - timedelta(days=lookback_days) | |
| start_date = start_date.strftime('%Y-%m-%d') | |
| year = '2024' | |
| dates = get_and_cache_available_dates(gdf, field_name, year, start_date, end_date) | |
| newest_date, oldest_date = dates[0], dates[-1] | |
| number_of_months = (datetime.strptime(newest_date, '%Y-%m-%d') - datetime.strptime(oldest_date, '%Y-%m-%d')).days//30 | |
| my_bar = st.progress(0, text= f"Downloading Data for the last {number_of_months+1} months ...") | |
| counter = 0 | |
| downloaded_prev_metrics = [] | |
| for index, date in enumerate(dates): | |
| # time.sleep(0.1) | |
| metric_data = get_cuarted_df_for_field(gdf, field_name, date, metric, current_user, dates = None) | |
| # cloud_cover_data = get_cuarted_df_for_field(gdf, field_name, date, 'CLP', current_user, dates = None) | |
| # field_data = metric_data.merge(cloud_cover_data, on='geometry') | |
| avg_metric = metric_data[f'{metric}_{date}'].mean() | |
| downloaded_prev_metrics.append((date, avg_metric)) | |
| counter = counter + 100/(len(dates)) | |
| my_bar.progress(round(counter), text=f"Downloading Data for the last {len(dates)//6} months: {round(counter)}%") | |
| st.subheader('Predictions:') | |
| # chart_data = pd.DataFrame( | |
| # { | |
| # "date": [metric[0] for metric in downloaded_prev_metrics], | |
| # f"{metric}": [metric[1] for metric in downloaded_prev_metrics], | |
| # } | |
| # ) | |
| # st.area_chart(chart_data, x="date", y=f"{metric}") | |
| channel = grpc.insecure_channel(f"{config.sh_timesfm_IP}:50051") | |
| print("runing client request") | |
| stub = pb.timesfm_pb2_grpc.PredictAgriStub(channel) | |
| features = stub.predict_metric(iter([pb.timesfm_pb2.prev_values(value=metric[1], date=metric[0]) for metric in downloaded_prev_metrics])) | |
| print("server streaming:") | |
| predictions = [] | |
| for feature in features: | |
| predictions.append(feature.value) | |
| # do something with the returned output | |
| # print(predictions) | |
| future_dates = [] | |
| # print(dates[0]) | |
| curr_date = datetime.today() | |
| for pred in predictions: | |
| curr_date = curr_date + timedelta(days=7) | |
| future_dates.append(curr_date.strftime('%Y-%m-%d')) | |
| prev_dates = [metric[0] for metric in downloaded_prev_metrics] | |
| history_metric_data = [metric[1] for metric in downloaded_prev_metrics] | |
| future_metric_data = predictions | |
| interval_dates = prev_dates | |
| interval_dates.extend(future_dates) | |
| history_metric_data.extend([0 for i in range(len(predictions))]) | |
| masked_future_metric_data = [0 for i in range(len([metric[1] for metric in downloaded_prev_metrics]))] | |
| masked_future_metric_data.extend(future_metric_data) | |
| # print(f"interval_dates:{len(interval_dates)}") | |
| # print(f"history_metric_data:{len(history_metric_data)}") | |
| # print(f"masked_future_metric_data:{len(masked_future_metric_data)}") | |
| # print(predictions) | |
| # print(interval_dates) | |
| prediction_chart_data = pd.DataFrame( | |
| { | |
| f"history_{metric}_values": history_metric_data, | |
| f"predicted_{metric}_values":masked_future_metric_data, | |
| f"date": interval_dates, | |
| } | |
| ) | |
| # print(prediction_chart_data) | |
| graph_col, recommendation_col = st.columns([1,1]) | |
| with graph_col: | |
| st.area_chart(prediction_chart_data, x="date", y=[f"history_{metric}_values", f"predicted_{metric}_values"]) | |
| with recommendation_col: | |
| st.subheader('Recommendation:') | |
| with st.spinner("Generating Recommendation..."): | |
| crop = gdf.loc[gdf['name'] == field_name].crop if 'crop' in gdf.columns else "Wheat" | |
| try: | |
| weeks = future_dates | |
| gdf_loc = gdf.loc[gdf['name'] == field_name].reset_index(drop=True) | |
| location = utils.get_region_from_coordinates(gdf_loc.geometry[0].centroid.y, gdf_loc.geometry[0].centroid.x) | |
| prompt = f"""The Field Name is {field_name} and is located in {location}. | |
| Analyze {crop} growth conditions for the next {len(weeks)} weeks starting from {weeks[0]} to {weeks[-1]} based on the Forecatsed {metric} values weekly. | |
| {metric}: {predictions} | |
| Provide a concise Short report: | |
| 1. Field Status (use format "Category: Status - One sentence comment", (e.g. Overall Health: Low - The NDVI values consistently below 0.2, indicating weak vegetative growth.) | |
| - Overall Health: | |
| - Growth Stage: | |
| - Pest Risk: | |
| - Disease Risk: | |
| - Stress Level: | |
| 2. Yield Forecast: | |
| [look online for the expected yield for the crop in the region based {metric} values] | |
| 3. Recommendation: | |
| [one actionable advice reasoned based on the forecasted {metric} values, season, crop, and region] | |
| """ | |
| # prompt = f"given the {metric} values weekly for the next 30 weeks, comment if they are appropriate to grow {crop} (write one paragraph showing your conclusion): {metric} values:{predictions}" | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| temperature=1, | |
| max_tokens=256, | |
| top_p=1, | |
| frequency_penalty=0, | |
| presence_penalty=0 | |
| ) | |
| st.markdown(response.choices[0].message.content) | |
| # save the recommendation | |
| recommendation = response.choices[0].message.content | |
| recommendation_filename = f'{current_user}_recommendations.md' | |
| with open(recommendation_filename, 'a') as f: | |
| f.write(f'\n\n## {field_name} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n\n') | |
| f.write(recommendation) | |
| f.write('\n\n') | |
| # Dwonload button for the recommendation | |
| with open(recommendation_filename, 'rb') as f: | |
| st.download_button('Download Recommendation', f,file_name=recommendation_filename) | |
| except Exception as e: | |
| st.code("Server Error: Could't generate recommendation!") | |
| st.error(e) | |
| if __name__ == '__main__': | |
| check_authentication() | |
| monitor_fields() |