| import argparse | |
| import itertools | |
| import random | |
| import time | |
| import torch | |
| import cv2 | |
| import numpy as np | |
| from PIL import ImageDraw, Image | |
| from backbone.base import Base as BackboneBase | |
| from config.eval_config import EvalConfig as Config | |
| from dataset.base import Base as DatasetBase | |
| from bbox import BBox | |
| from model import Model | |
| from roi.pooler import Pooler | |
| def _infer_stream(path_to_input_stream_endpoint: str, period_of_inference: int, path_to_checkpoint: str, dataset_name: str, backbone_name: str, prob_thresh: float): | |
| dataset_class = DatasetBase.from_name(dataset_name) | |
| backbone = BackboneBase.from_name(backbone_name)(pretrained=False) | |
| model = Model(backbone, dataset_class.num_classes(), pooler_mode=Config.POOLER_MODE, | |
| anchor_ratios=Config.ANCHOR_RATIOS, anchor_sizes=Config.ANCHOR_SIZES, | |
| rpn_pre_nms_top_n=Config.RPN_PRE_NMS_TOP_N, rpn_post_nms_top_n=Config.RPN_POST_NMS_TOP_N).cuda() | |
| model.load(path_to_checkpoint) | |
| if path_to_input_stream_endpoint.isdigit(): | |
| path_to_input_stream_endpoint = int(path_to_input_stream_endpoint) | |
| video_capture = cv2.VideoCapture(path_to_input_stream_endpoint) | |
| with torch.no_grad(): | |
| for sn in itertools.count(start=1): | |
| success, frame = video_capture.read() | |
| if not success: | |
| break | |
| if sn % period_of_inference != 0: | |
| continue | |
| timestamp = time.time() | |
| image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| image = Image.fromarray(image) | |
| image_tensor, scale = dataset_class.preprocess(image, Config.IMAGE_MIN_SIDE, Config.IMAGE_MAX_SIDE) | |
| detection_bboxes, detection_classes, detection_probs, _ = \ | |
| model.eval().forward(image_tensor.unsqueeze(dim=0).cuda()) | |
| detection_bboxes /= scale | |
| kept_indices = detection_probs > prob_thresh | |
| detection_bboxes = detection_bboxes[kept_indices] | |
| detection_classes = detection_classes[kept_indices] | |
| detection_probs = detection_probs[kept_indices] | |
| draw = ImageDraw.Draw(image) | |
| for bbox, cls, prob in zip(detection_bboxes.tolist(), detection_classes.tolist(), detection_probs.tolist()): | |
| color = random.choice(['red', 'green', 'blue', 'yellow', 'purple', 'white']) | |
| bbox = BBox(left=bbox[0], top=bbox[1], right=bbox[2], bottom=bbox[3]) | |
| category = dataset_class.LABEL_TO_CATEGORY_DICT[cls] | |
| draw.rectangle(((bbox.left, bbox.top), (bbox.right, bbox.bottom)), outline=color) | |
| draw.text((bbox.left, bbox.top), text=f'{category:s} {prob:.3f}', fill=color) | |
| image = np.array(image) | |
| frame = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| elapse = time.time() - timestamp | |
| fps = 1 / elapse | |
| cv2.putText(frame, f'FPS = {fps:.1f}', (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) | |
| cv2.imshow('easy-faster-rcnn-pytorch', frame) | |
| if cv2.waitKey(10) == 27: | |
| break | |
| video_capture.release() | |
| cv2.destroyAllWindows() | |
| if __name__ == '__main__': | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-s', '--dataset', type=str, choices=DatasetBase.OPTIONS, required=True, help='name of dataset') | |
| parser.add_argument('-b', '--backbone', type=str, choices=BackboneBase.OPTIONS, required=True, help='name of backbone model') | |
| parser.add_argument('-c', '--checkpoint', type=str, required=True, help='path to checkpoint') | |
| parser.add_argument('-p', '--probability_threshold', type=float, default=0.6, help='threshold of detection probability') | |
| parser.add_argument('--image_min_side', type=float, help='default: {:g}'.format(Config.IMAGE_MIN_SIDE)) | |
| parser.add_argument('--image_max_side', type=float, help='default: {:g}'.format(Config.IMAGE_MAX_SIDE)) | |
| parser.add_argument('--anchor_ratios', type=str, help='default: "{!s}"'.format(Config.ANCHOR_RATIOS)) | |
| parser.add_argument('--anchor_sizes', type=str, help='default: "{!s}"'.format(Config.ANCHOR_SIZES)) | |
| parser.add_argument('--pooler_mode', type=str, choices=Pooler.OPTIONS, help='default: {.value:s}'.format(Config.POOLER_MODE)) | |
| parser.add_argument('--rpn_pre_nms_top_n', type=int, help='default: {:d}'.format(Config.RPN_PRE_NMS_TOP_N)) | |
| parser.add_argument('--rpn_post_nms_top_n', type=int, help='default: {:d}'.format(Config.RPN_POST_NMS_TOP_N)) | |
| parser.add_argument('--stream_input', type=str, help='path to input stream endpoint') | |
| parser.add_argument('--period', type=int, help='period of inference') | |
| args = parser.parse_args() | |
| path_to_input_stream_endpoint = args.stream_input | |
| period_of_inference = args.period | |
| dataset_name = args.dataset | |
| backbone_name = args.backbone | |
| path_to_checkpoint = args.checkpoint | |
| prob_thresh = args.probability_threshold | |
| Config.setup(image_min_side=args.image_min_side, image_max_side=args.image_max_side, | |
| anchor_ratios=args.anchor_ratios, anchor_sizes=args.anchor_sizes, pooler_mode=args.pooler_mode, | |
| rpn_pre_nms_top_n=args.rpn_pre_nms_top_n, rpn_post_nms_top_n=args.rpn_post_nms_top_n) | |
| print('Arguments:') | |
| for k, v in vars(args).items(): | |
| print(f'\t{k} = {v}') | |
| print(Config.describe()) | |
| _infer_stream(path_to_input_stream_endpoint, period_of_inference, path_to_checkpoint, dataset_name, backbone_name, prob_thresh) | |
| main() | |