Spaces:
Sleeping
Sleeping
| #import tensorflow as tf # Tensorflow use deprecated | |
| import torch | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Type, Dict, List, Tuple | |
| from datetime import datetime | |
| PandasDataFrame = Type[pd.DataFrame] | |
| PandasSeries = Type[pd.Series] | |
| MatchedResults = Dict[str,Tuple[str,int]] | |
| array = List[str] | |
| today = datetime.now().strftime("%d%m%Y") | |
| today_rev = datetime.now().strftime("%Y%m%d") | |
| # # Neural net functions | |
| def vocab_lookup(characters: str, vocab) -> (int, np.ndarray): | |
| """ | |
| Taken from the function from the addressnet package by Jason Rigby | |
| Converts a string into a list of vocab indices | |
| :param characters: the string to convert | |
| :param training: if True, artificial typos will be introduced | |
| :return: the string length and an array of vocab indices | |
| """ | |
| result = list() | |
| for c in characters.lower(): | |
| try: | |
| result.append(vocab.index(c) + 1) | |
| except ValueError: | |
| result.append(0) | |
| return len(characters), np.array(result, dtype=np.int64) | |
| # ## Neural net predictor functions | |
| def text_to_model_input_local(in_text, vocab, model_type = "estimator"): | |
| addresses_out = [] | |
| model_input_out = [] | |
| encoded_text = [] | |
| # Calculate longest string length | |
| import heapq | |
| # get the index of the largest element in the list | |
| index = heapq.nlargest(1, range(len(in_text)), key=lambda x: len(in_text[x]))[0] | |
| # use the index to get the corresponding string | |
| longest_string = len(in_text[index]) | |
| #print("Longest string is: " + str(longest_string)) | |
| for x in range(0, len(in_text)): | |
| out = vocab_lookup(in_text[x], vocab) | |
| addresses_out.append(out) | |
| #print(out) | |
| # Tensorflow model use deprecated | |
| # if model_type == "estimator": | |
| # model_input_add= tf.train.Example(features=tf.train.Features(feature={ | |
| # 'lengths': tf.train.Feature(int64_list=tf.train.Int64List(value=[out[0]])), | |
| # 'encoded_text': tf.train.Feature(int64_list=tf.train.Int64List(value=out[1].tolist())) | |
| # })).SerializeToString() | |
| # model_input_out.append(model_input_add) | |
| if model_type == "keras": | |
| encoded_text.append(out[1]) | |
| # Tensorflow model use deprecated | |
| # if model_type == "keras": | |
| # # Pad out the strings so they're all the same length. 69 seems to be the value for spaces | |
| # model_input_out = tf.keras.utils.pad_sequences(encoded_text, maxlen=longest_string, padding="post", truncating="post", value=0)#69) | |
| return addresses_out, model_input_out | |
| def reformat_predictions_local(predict_out): | |
| predictions_list_reformat = [] | |
| for x in range(0,len(predict_out['pred_output_classes'])): | |
| new_entry = {'class_ids': predict_out['pred_output_classes'][x], 'probabilities': predict_out['probabilities'][x]} | |
| predictions_list_reformat.append(new_entry) | |
| return predictions_list_reformat | |
| def predict_serve_conv_local(in_text:List[str], labels_list, predictions) -> List[Dict[str, str]]: | |
| class_names = [l.replace("_code", "") for l in labels_list] | |
| class_names = [l.replace("_abbreviation", "") for l in class_names] | |
| #print(input_text) | |
| #print(list(zip(input_text, predictions))) | |
| for addr, res in zip(in_text, predictions): | |
| #print(zip(input_text, predictions)) | |
| mappings = dict() | |
| #print(addr.upper()) | |
| #print(res['class_ids']) | |
| for char, class_id in zip(addr.upper(), res['class_ids']): | |
| #print(char) | |
| if class_id == 0: | |
| continue | |
| cls = class_names[class_id - 1] | |
| mappings[cls] = mappings.get(cls, "") + char | |
| #print(mappings) | |
| yield mappings | |
| #return mappings | |
| def prep_predict_export(prediction_outputs, in_text): | |
| out_list = list(prediction_outputs) | |
| df_out = pd.DataFrame(out_list) | |
| #print(in_text) | |
| #print(df_out) | |
| df_out["address"] = in_text | |
| return out_list, df_out | |
| def full_predict_func(list_to_predict, model, vocab, labels_list): | |
| if hasattr(model, "summary"): # Indicates this is a keras model rather than an estimator | |
| model_type = "keras" | |
| else: model_type = "estimator" | |
| list_to_predict = [x.upper() for x in list_to_predict] | |
| addresses_out, model_input = text_to_model_input_local(list_to_predict, vocab, model_type) | |
| if hasattr(model, "summary"): | |
| probs = model.predict(model_input, use_multiprocessing=True) | |
| classes = probs.argmax(axis=-1) | |
| predictions = {'pred_output_classes':classes, 'probabilities':probs} | |
| else: | |
| print("Tensorflow use deprecated") | |
| #predictions = model.signatures["predict_output"](predictor_inputs=tf.constant(model_input)) # This was for when using the contrib module | |
| #predictions = model.signatures["serving_default"](predictor_inputs=tf.constant(model_input)) | |
| predictions_list_reformat = reformat_predictions_local(predictions) | |
| #### Final output as list or dataframe | |
| output = predict_serve_conv_local(list(list_to_predict), labels_list, predictions_list_reformat) | |
| list_out, predict_df = prep_predict_export(output, list_to_predict) | |
| # Add organisation as a column if it doesn't already exist | |
| if 'Organisation' not in predict_df.columns: | |
| predict_df['Organisation'] = "" | |
| return list_out, predict_df | |
| # - | |
| def predict_torch(model, model_type, input_text, word_to_index, device): | |
| #print(device) | |
| # Convert input_text to tensor of character indices | |
| indexed_texts = [[word_to_index.get(char, word_to_index['<UNK>']) for char in text] for text in input_text] | |
| # Calculate max_len based on indexed_texts | |
| max_len = max(len(text) for text in indexed_texts) | |
| # Pad sequences and convert to tensor | |
| padded_texts = torch.tensor([text + [word_to_index['<pad>']] * (max_len - len(text)) for text in indexed_texts]) | |
| with torch.no_grad(): | |
| texts = padded_texts.to(device) | |
| if (model_type == "lstm") | (model_type == "gru"): | |
| text_lengths = texts.ne(word_to_index['<pad>']).sum(dim=1) | |
| predictions = model(texts, text_lengths) | |
| if model_type == "transformer": | |
| # Call model with texts and pad_idx | |
| predictions = model(texts, word_to_index['<pad>']) | |
| # Convert predictions to most likely category indices | |
| _, predicted_indices = predictions.max(2) | |
| return predicted_indices | |
| def torch_predictions_to_dicts(input_text, predicted_indices, index_to_category): | |
| results = [] | |
| for i, text in enumerate(input_text): | |
| # Treat each character in the input text as a "token" | |
| tokens = list(text) # Convert string to a list of characters | |
| # Create a dictionary for the current text | |
| curr_dict = {} | |
| # Iterate over the predicted categories and the tokens together | |
| for category_index, token in zip(predicted_indices[i], tokens): | |
| # Convert the category index to its name | |
| category_name = index_to_category[category_index.item()] | |
| # Append the token to the category in the dictionary (or create the category if it doesn't exist) | |
| if category_name in curr_dict: | |
| curr_dict[category_name] += token # No space needed between characters | |
| else: | |
| curr_dict[category_name] = token | |
| results.append(curr_dict) | |
| return results | |
| def torch_prep_predict_export(prediction_outputs, in_text): | |
| #out_list = list(prediction_outputs) | |
| df_out = pd.DataFrame(prediction_outputs).drop("IGNORE", axis = 1) | |
| #print(in_text) | |
| #print(df_out) | |
| df_out["address"] = in_text | |
| return df_out | |
| def full_predict_torch(model, model_type, input_text, word_to_index, cat_to_idx, device): | |
| input_text = [x.upper() for x in input_text] | |
| predicted_indices = predict_torch(model, model_type, input_text, word_to_index, device) | |
| index_to_category = {v: k for k, v in cat_to_idx.items()} | |
| results_dict = torch_predictions_to_dicts(input_text, predicted_indices, index_to_category) | |
| df_out = torch_prep_predict_export(results_dict, input_text) | |
| return results_dict, df_out | |
| def post_predict_clean(predict_df, orig_search_df, ref_address_cols, search_df_key_field): | |
| # Add address to ref_address_cols | |
| ref_address_cols_add = ref_address_cols.copy() | |
| ref_address_cols_add.extend(['address']) | |
| # Create column if it doesn't exist | |
| for x in ref_address_cols: | |
| predict_df[x] = predict_df.get(x, np.nan) | |
| predict_df = predict_df[ref_address_cols_add] | |
| #Columns that are in the ref and model, but are not matched in this instance, need to be filled in with blanks | |
| predict_cols_match = list(predict_df.drop(["address"],axis=1).columns) | |
| predict_cols_match_uprn = predict_cols_match.copy() | |
| predict_cols_match_uprn.append("UPRN") | |
| pred_output_missing_cols = list(set(ref_address_cols) - set(predict_cols_match)) | |
| predict_df[pred_output_missing_cols] = np.nan | |
| predict_df = predict_df.fillna("").infer_objects(copy=False) | |
| #Convert all columns to string | |
| all_columns = list(predict_df) # Creates list of all column headers | |
| predict_df[all_columns] = predict_df[all_columns].astype(str) | |
| predict_df = predict_df.replace("\.0","",regex=True) | |
| #When comparing with ref, the postcode existing in the data will be used to compare rather than the postcode predicted by the model. This is to minimise errors in matching | |
| predict_df = predict_df.rename(columns={"Postcode":"Postcode_predict"}) | |
| orig_search_df_pc = orig_search_df[[search_df_key_field, "postcode"]].rename(columns={"postcode":"Postcode"}).reset_index(drop=True) | |
| predict_df = predict_df.merge(orig_search_df_pc, left_index=True, right_index=True, how = "left") | |
| predict_df[search_df_key_field] = predict_df[search_df_key_field].astype(str) | |
| return predict_df | |