diff --git a/ml/messages/__init__.py b/ml/messages/__init__.py index e84ed0ea..6f7d8afb 100644 --- a/ml/messages/__init__.py +++ b/ml/messages/__init__.py @@ -1,6 +1,9 @@ from .img_detections import ImgDetectionWithKeypoints, ImgDetectionsWithKeypoints +from .keypoints import HandKeypoints, Keypoints __all__ = [ "ImgDetectionWithKeypoints", "ImgDetectionsWithKeypoints", -] + "HandKeypoints", + "Keypoints", +] \ No newline at end of file diff --git a/ml/messages/keypoints.py b/ml/messages/keypoints.py new file mode 100644 index 00000000..8dc7f403 --- /dev/null +++ b/ml/messages/keypoints.py @@ -0,0 +1,48 @@ +import depthai as dai +from typing import List + + +class Keypoints(dai.Buffer): + def __init__(self): + super().__init__() + self._keypoints: List[dai.Point3f] = [] + + @property + def keypoints(self) -> List[dai.Point3f]: + return self._keypoints + + @keypoints.setter + def keypoints(self, value: List[dai.Point3f]): + if not isinstance(value, list): + raise TypeError("keypoints must be a list.") + for item in value: + if not isinstance(item, dai.Point3f): + raise TypeError("All items in keypoints must be of type dai.Point3f.") + self._keypoints = value + + +class HandKeypoints(Keypoints): + def __init__(self): + Keypoints.__init__(self) + self._confidence: float = 0.0 + self._handdedness: float = 0.0 + + @property + def confidence(self) -> float: + return self._confidence + + @confidence.setter + def confidence(self, value: float): + if not isinstance(value, float): + raise TypeError("confidence must be a float.") + self._confidence = value + + @property + def handdedness(self) -> float: + return self._handdedness + + @handdedness.setter + def handdedness(self, value: float): + if not isinstance(value, float): + raise TypeError("handdedness must be a float.") + self._handdedness = value \ No newline at end of file diff --git a/ml/postprocessing/__init__.py b/ml/postprocessing/__init__.py index 5499c8fc..9c9bdba8 100644 --- a/ml/postprocessing/__init__.py +++ b/ml/postprocessing/__init__.py @@ -2,10 +2,18 @@ from .dncnn3 import DnCNN3Parser from .depth_anything import DepthAnythingParser from .yunet import YuNetParser +from .mediapipe_hand_detection import MPHandDetectionParser +from .mediapipe_hand_landmarker import MPHandLandmarkParser +from .scrfd import SCRFDParser +from .segmentation import SegmentationParser __all__ = [ 'ZeroDCEParser', 'DnCNN3Parser', 'DepthAnythingParser', - 'YuNetParser' + 'YuNetParser', + 'MPHandDetectionParser', + 'MPHandLandmarkParser', + 'SCRFDParser', + 'SegmentationParser', ] diff --git a/ml/postprocessing/mediapipe_hand_detection.py b/ml/postprocessing/mediapipe_hand_detection.py new file mode 100644 index 00000000..6740daaa --- /dev/null +++ b/ml/postprocessing/mediapipe_hand_detection.py @@ -0,0 +1,74 @@ +import depthai as dai +import numpy as np +import cv2 + +from .utils.message_creation import create_detection_message +from .utils.medipipe import generate_anchors_and_decode + +class MPHandDetectionParser(dai.node.ThreadedHostNode): + def __init__( + self, + score_threshold=0.5, + nms_threshold=0.5, + top_k=100 + ): + dai.node.ThreadedHostNode.__init__(self) + self.input = dai.Node.Input(self) + self.out = dai.Node.Output(self) + + self.score_threshold = score_threshold + self.nms_threshold = nms_threshold + self.top_k = top_k + + def setConfidenceThreshold(self, threshold): + self.score_threshold = threshold + + def setNMSThreshold(self, threshold): + self.nms_threshold = threshold + + def setTopK(self, top_k): + self.top_k = top_k + + def run(self): + """ + Postprocessing logic for MediPipe Hand detection model. + + Returns: + dai.ImgDetections containing bounding boxes, labels, and confidence scores of detected hands. + """ + + while self.isRunning(): + + try: + output: dai.NNData = self.input.get() + except dai.MessageQueue.QueueException as e: + break # Pipeline was stopped + + tensorInfo = output.getTensorInfo("Identity") + bboxes = output.getTensor(f"Identity").reshape(2016, 18).astype(np.float32) + bboxes = (bboxes - tensorInfo.qpZp) * tensorInfo.qpScale + tensorInfo = output.getTensorInfo("Identity_1") + scores = output.getTensor(f"Identity_1").reshape(2016).astype(np.float32) + scores = (scores - tensorInfo.qpZp) * tensorInfo.qpScale + + decoded_bboxes = generate_anchors_and_decode(bboxes=bboxes, scores=scores, threshold=self.score_threshold, scale=192) + + bboxes = [] + scores = [] + + for hand in decoded_bboxes: + extended_points = hand.rect_points + xmin = int(min(extended_points[0][0], extended_points[1][0])) + ymin = int(min(extended_points[0][1], extended_points[1][1])) + xmax = int(max(extended_points[2][0], extended_points[3][0])) + ymax = int(max(extended_points[2][1], extended_points[3][1])) + + bboxes.append([xmin, ymin, xmax, ymax]) + scores.append(hand.pd_score) + + indices = cv2.dnn.NMSBoxes(bboxes, scores, self.score_threshold, self.nms_threshold, top_k=self.top_k) + bboxes = np.array(bboxes)[indices] + scores = np.array(scores)[indices] + + detections_msg = create_detection_message(bboxes, scores, labels=None) + self.out.send(detections_msg) \ No newline at end of file diff --git a/ml/postprocessing/mediapipe_hand_landmarker.py b/ml/postprocessing/mediapipe_hand_landmarker.py new file mode 100644 index 00000000..9c5cca8e --- /dev/null +++ b/ml/postprocessing/mediapipe_hand_landmarker.py @@ -0,0 +1,57 @@ +import depthai as dai +import numpy as np +import cv2 + +from .utils.message_creation import create_hand_keypoints_message + +class MPHandLandmarkParser(dai.node.ThreadedHostNode): + def __init__( + self, + score_threshold=0.5, + scale_factor=224 + ): + dai.node.ThreadedHostNode.__init__(self) + self.input = dai.Node.Input(self) + self.out = dai.Node.Output(self) + + self.score_threshold = score_threshold + self.scale_factor = scale_factor + + def setScoreThreshold(self, threshold): + self.score_threshold = threshold + + def setScaleFactor(self, scale_factor): + self.scale_factor = scale_factor + + def run(self): + """ + Postprocessing logic for MediaPipe Hand landmark model. + + Returns: + HandLandmarks containing normalized 21 landmarks, confidence score, and handdedness score (right or left hand). + """ + + while self.isRunning(): + + try: + output: dai.NNData = self.input.get() + except dai.MessageQueue.QueueException as e: + break # Pipeline was stopped + + tensorInfo = output.getTensorInfo("Identity") + landmarks = output.getTensor(f"Identity").reshape(21, 3).astype(np.float32) + landmarks = (landmarks - tensorInfo.qpZp) * tensorInfo.qpScale + tensorInfo = output.getTensorInfo("Identity_1") + hand_score = output.getTensor(f"Identity_1").reshape(-1).astype(np.float32) + hand_score = (hand_score - tensorInfo.qpZp) * tensorInfo.qpScale + hand_score = hand_score[0] + tensorInfo = output.getTensorInfo("Identity_2") + handedness = output.getTensor(f"Identity_2").reshape(-1).astype(np.float32) + handedness = (handedness - tensorInfo.qpZp) * tensorInfo.qpScale + handedness = handedness[0] + + # normalize landmarks + landmarks /= self.scale_factor + + hand_landmarks_msg = create_hand_keypoints_message(landmarks, float(handedness), float(hand_score), self.score_threshold) + self.out.send(hand_landmarks_msg) \ No newline at end of file diff --git a/ml/postprocessing/scrfd.py b/ml/postprocessing/scrfd.py index 939b1dab..22f609e6 100644 --- a/ml/postprocessing/scrfd.py +++ b/ml/postprocessing/scrfd.py @@ -2,21 +2,19 @@ import numpy as np import cv2 -from ..custom_messages.img_detections import ImgDetectionsWithKeypoints +from ..messages import ImgDetectionsWithKeypoints class SCRFDParser(dai.node.ThreadedHostNode): def __init__( self, score_threshold=0.5, nms_threshold=0.5, - top_k=100, - input_size=(640, 640), # WH + top_k=100 ): dai.node.ThreadedHostNode.__init__(self) self.input = dai.Node.Input(self) self.out = dai.Node.Output(self) - self.input_size = input_size self.score_threshold = score_threshold self.nms_threshold = nms_threshold self.top_k = top_k @@ -30,9 +28,6 @@ def setNMSThreshold(self, threshold): def setTopK(self, top_k): self.top_k = top_k - def setInputSize(self, width, height): - self.input_size = (width, height) - def run(self): """ Postprocessing logic for SCRFD model. diff --git a/ml/postprocessing/segmentation.py b/ml/postprocessing/segmentation.py new file mode 100644 index 00000000..5fd6f25a --- /dev/null +++ b/ml/postprocessing/segmentation.py @@ -0,0 +1,35 @@ +import depthai as dai +import numpy as np +import cv2 +from .utils.message_creation import create_segmentation_message + +class SegmentationParser(dai.node.ThreadedHostNode): + def __init__( + self, + ): + dai.node.ThreadedHostNode.__init__(self) + self.input = dai.Node.Input(self) + self.out = dai.Node.Output(self) + + def run(self): + """ + Postprocessing logic for Segmentation model with `num_classes` classes including background at index 0. + + Returns: + Segmenation mask with `num_classes` classes, 0 - background. + """ + + while self.isRunning(): + + try: + output: dai.NNData = self.input.get() + except dai.MessageQueue.QueueException as e: + break # Pipeline was stopped + + segmentation_mask = output.getTensor("output") + segmentation_mask = segmentation_mask[0] # num_clases x H x W + segmentation_mask = np.vstack((np.zeros((1, segmentation_mask.shape[1], segmentation_mask.shape[2]), dtype=np.float32), segmentation_mask)) + overlay_image = np.argmax(segmentation_mask, axis=0).reshape(segmentation_mask.shape[1], segmentation_mask.shape[2], 1).astype(np.uint8) + + imgFrame = create_segmentation_message(overlay_image) + self.out.send(imgFrame) \ No newline at end of file diff --git a/ml/postprocessing/selfie_seg.py b/ml/postprocessing/selfie_seg.py deleted file mode 100644 index 4c9979e1..00000000 --- a/ml/postprocessing/selfie_seg.py +++ /dev/null @@ -1,58 +0,0 @@ -import depthai as dai -import numpy as np -import cv2 - -class SeflieSegParser(dai.node.ThreadedHostNode): - def __init__( - self, - threshold=0.5, - input_size=(256, 144), - mask_color=[0, 255, 0], - ): - dai.node.ThreadedHostNode.__init__(self) - self.input = dai.Node.Input(self) - self.out = dai.Node.Output(self) - - self.input_size = input_size - self.threshold = threshold - self.mask_color = mask_color - - def setMaskColor(self, mask_color): - self.mask_color = mask_color - - def setConfidenceThreshold(self, threshold): - self.threshold = threshold - - def setInputSize(self, width, height): - self.input_size = (width, height) - - def run(self): - """ - Postprocessing logic for SCRFD model. - - Returns: - ... - """ - - while self.isRunning(): - - try: - output: dai.NNData = self.input.get() - print(f"output = {output}") - except dai.MessageQueue.QueueException as e: - break # Pipeline was stopped - - print(f"Layer names = {output.getAllLayerNames()}") - - segmentation_mask = output.getTensor("output") - segmentation_mask = segmentation_mask[0].squeeze() > self.threshold - overlay_image = np.ones((segmentation_mask.shape[0], segmentation_mask.shape[1], 3), dtype=np.uint8) * 255 - overlay_image[segmentation_mask] = self.mask_color - - imgFrame = dai.ImgFrame() - imgFrame.setFrame(overlay_image) - imgFrame.setWidth(overlay_image.shape[1]) - imgFrame.setHeight(overlay_image.shape[0]) - imgFrame.setType(dai.ImgFrame.Type.BGR888i) - - self.out.send(imgFrame) \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/depth_segmentation.py b/ml/postprocessing/utils/detection.py similarity index 100% rename from ml/postprocessing/utils/message_creation/depth_segmentation.py rename to ml/postprocessing/utils/detection.py diff --git a/ml/postprocessing/utils/medipipe.py b/ml/postprocessing/utils/medipipe.py new file mode 100644 index 00000000..ad762e1c --- /dev/null +++ b/ml/postprocessing/utils/medipipe.py @@ -0,0 +1,346 @@ +""" +mediapipe.py + +Description: This script contains utility functions for decoding the output of the MediaPipe hand tracking model. + +This script contains code that is based on or directly taken from a public GitHub repository: +https://github.com/geaxgx/depthai_hand_tracker + +Original code author(s): geaxgx + +License: MIT License + +MIT License +----------- + +Copyright (c) [2021] [geax] + +""" + +import math +import numpy as np +from collections import namedtuple + +class HandRegion: + """ + Attributes: + pd_score : detection score + pd_box : detection box [x, y, w, h], normalized [0,1] in the squared image + pd_kps : detection keypoints coordinates [x, y], normalized [0,1] in the squared image + rect_x_center, rect_y_center : center coordinates of the rotated bounding rectangle, normalized [0,1] in the squared image + rect_w, rect_h : width and height of the rotated bounding rectangle, normalized in the squared image (may be > 1) + rotation : rotation angle of rotated bounding rectangle with y-axis in radian + rect_x_center_a, rect_y_center_a : center coordinates of the rotated bounding rectangle, in pixels in the squared image + rect_w, rect_h : width and height of the rotated bounding rectangle, in pixels in the squared image + rect_points : list of the 4 points coordinates of the rotated bounding rectangle, in pixels + expressed in the squared image during processing, + expressed in the source rectangular image when returned to the user + """ + def __init__(self, pd_score=None, pd_box=None, pd_kps=None): + self.pd_score = pd_score # Palm detection score + self.pd_box = pd_box # Palm detection box [x, y, w, h] normalized + self.pd_kps = pd_kps # Palm detection keypoints + +SSDAnchorOptions = namedtuple('SSDAnchorOptions',[ + 'num_layers', + 'min_scale', + 'max_scale', + 'input_size_height', + 'input_size_width', + 'anchor_offset_x', + 'anchor_offset_y', + 'strides', + 'aspect_ratios', + 'reduce_boxes_in_lowest_layer', + 'interpolated_scale_aspect_ratio', + 'fixed_anchor_size']) + +def calculate_scale(min_scale, max_scale, stride_index, num_strides): + if num_strides == 1: + return (min_scale + max_scale) / 2 + else: + return min_scale + (max_scale - min_scale) * stride_index / (num_strides - 1) + +def generate_anchors(options): + """ + option : SSDAnchorOptions + # https://github.com/google/mediapipe/blob/master/mediapipe/calculators/tflite/ssd_anchors_calculator.cc + """ + anchors = [] + layer_id = 0 + n_strides = len(options.strides) + while layer_id < n_strides: + anchor_height = [] + anchor_width = [] + aspect_ratios = [] + scales = [] + # For same strides, we merge the anchors in the same order. + last_same_stride_layer = layer_id + while last_same_stride_layer < n_strides and \ + options.strides[last_same_stride_layer] == options.strides[layer_id]: + scale = calculate_scale(options.min_scale, options.max_scale, last_same_stride_layer, n_strides) + if last_same_stride_layer == 0 and options.reduce_boxes_in_lowest_layer: + # For first layer, it can be specified to use predefined anchors. + aspect_ratios += [1.0, 2.0, 0.5] + scales += [0.1, scale, scale] + else: + aspect_ratios += options.aspect_ratios + scales += [scale] * len(options.aspect_ratios) + if options.interpolated_scale_aspect_ratio > 0: + if last_same_stride_layer == n_strides -1: + scale_next = 1.0 + else: + scale_next = calculate_scale(options.min_scale, options.max_scale, last_same_stride_layer+1, n_strides) + scales.append(math.sqrt(scale * scale_next)) + aspect_ratios.append(options.interpolated_scale_aspect_ratio) + last_same_stride_layer += 1 + + for i,r in enumerate(aspect_ratios): + ratio_sqrts = math.sqrt(r) + anchor_height.append(scales[i] / ratio_sqrts) + anchor_width.append(scales[i] * ratio_sqrts) + + stride = options.strides[layer_id] + feature_map_height = math.ceil(options.input_size_height / stride) + feature_map_width = math.ceil(options.input_size_width / stride) + + for y in range(feature_map_height): + for x in range(feature_map_width): + for anchor_id in range(len(anchor_height)): + x_center = (x + options.anchor_offset_x) / feature_map_width + y_center = (y + options.anchor_offset_y) / feature_map_height + # new_anchor = Anchor(x_center=x_center, y_center=y_center) + if options.fixed_anchor_size: + new_anchor = [x_center, y_center, 1.0, 1.0] + # new_anchor.w = 1.0 + # new_anchor.h = 1.0 + else: + new_anchor = [x_center, y_center, anchor_width[anchor_id], anchor_height[anchor_id]] + # new_anchor.w = anchor_width[anchor_id] + # new_anchor.h = anchor_height[anchor_id] + anchors.append(new_anchor) + + layer_id = last_same_stride_layer + return np.array(anchors) + +def generate_handtracker_anchors(input_size_width, input_size_height): + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/palm_detection/palm_detection_cpu.pbtxt + anchor_options = SSDAnchorOptions(num_layers=4, + min_scale=0.1484375, + max_scale=0.75, + input_size_height=input_size_height, + input_size_width=input_size_width, + anchor_offset_x=0.5, + anchor_offset_y=0.5, + strides=[8, 16, 16, 16], + aspect_ratios= [1.0], + reduce_boxes_in_lowest_layer=False, + interpolated_scale_aspect_ratio=1.0, + fixed_anchor_size=True) + return generate_anchors(anchor_options) + +def decode_bboxes(score_thresh, scores, bboxes, anchors, scale=128, best_only=False): + """ + wi, hi : NN input shape + mediapipe/calculators/tflite/tflite_tensors_to_detections_calculator.cc + # Decodes the detection tensors generated by the model, based on + # the SSD anchors and the specification in the options, into a vector of + # detections. Each detection describes a detected object. + + https://github.com/google/mediapipe/blob/master/mediapipe/modules/palm_detection/palm_detection_cpu.pbtxt : + node { + calculator: "TensorsToDetectionsCalculator" + input_stream: "TENSORS:detection_tensors" + input_side_packet: "ANCHORS:anchors" + output_stream: "DETECTIONS:unfiltered_detections" + options: { + [mediapipe.TensorsToDetectionsCalculatorOptions.ext] { + num_classes: 1 + num_boxes: 896 + num_coords: 18 + box_coord_offset: 0 + keypoint_coord_offset: 4 + num_keypoints: 7 + num_values_per_keypoint: 2 + sigmoid_score: true + score_clipping_thresh: 100.0 + reverse_output_order: true + + x_scale: 128.0 + y_scale: 128.0 + h_scale: 128.0 + w_scale: 128.0 + min_score_thresh: 0.5 + } + } + } + node { + calculator: "TensorsToDetectionsCalculator" + input_stream: "TENSORS:detection_tensors" + input_side_packet: "ANCHORS:anchors" + output_stream: "DETECTIONS:unfiltered_detections" + options: { + [mediapipe.TensorsToDetectionsCalculatorOptions.ext] { + num_classes: 1 + num_boxes: 2016 + num_coords: 18 + box_coord_offset: 0 + keypoint_coord_offset: 4 + num_keypoints: 7 + num_values_per_keypoint: 2 + sigmoid_score: true + score_clipping_thresh: 100.0 + reverse_output_order: true + + x_scale: 192.0 + y_scale: 192.0 + w_scale: 192.0 + h_scale: 192.0 + min_score_thresh: 0.5 + } + } + } + + scores: shape = [number of anchors 896 or 2016] + bboxes: shape = [ number of anchors x 18], 18 = 4 (bounding box : (cx,cy,w,h) + 14 (7 palm keypoints) + """ + regions = [] + scores = 1 / (1 + np.exp(-scores)) + if best_only: + best_id = np.argmax(scores) + if scores[best_id] < score_thresh: return regions + det_scores = scores[best_id:best_id+1] + det_bboxes2 = bboxes[best_id:best_id+1] + det_anchors = anchors[best_id:best_id+1] + else: + detection_mask = scores > score_thresh + det_scores = scores[detection_mask] + if det_scores.size == 0: return regions + det_bboxes2 = bboxes[detection_mask] + det_anchors = anchors[detection_mask] + + det_bboxes = det_bboxes2* np.tile(det_anchors[:,2:4], 9) / scale + np.tile(det_anchors[:,0:2],9) + det_bboxes[:,2:4] = det_bboxes[:,2:4] - det_anchors[:,0:2] + det_bboxes[:,0:2] = det_bboxes[:,0:2] - det_bboxes[:,3:4] * 0.5 + + for i in range(det_bboxes.shape[0]): + score = det_scores[i] + box = det_bboxes[i,0:4] + # Decoded detection boxes could have negative values for width/height due + # to model prediction. Filter out those boxes + if box[2] < 0 or box[3] < 0: continue + kps = [] + # 0 : wrist + # 1 : index finger joint + # 2 : middle finger joint + # 3 : ring finger joint + # 4 : little finger joint + # 5 : + # 6 : thumb joint + for kp in range(7): + kps.append(det_bboxes[i,4+kp*2:6+kp*2]) + regions.append(HandRegion(float(score), box, kps)) + return regions + +def rect_transformation(regions, w, h): + """ + w, h : image input shape + """ + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/hand_landmark/palm_detection_detection_to_roi.pbtxt + # # Expands and shifts the rectangle that contains the palm so that it's likely + # # to cover the entire hand. + # node { + # calculator: "RectTransformationCalculator" + # input_stream: "NORM_RECT:raw_roi" + # input_stream: "IMAGE_SIZE:image_size" + # output_stream: "roi" + # options: { + # [mediapipe.RectTransformationCalculatorOptions.ext] { + # scale_x: 2.6 + # scale_y: 2.6 + # shift_y: -0.5 + # square_long: true + # } + # } + # IMHO 2.9 is better than 2.6. With 2.6, it may happen that finger tips stay outside of the bouding rotated rectangle + scale_x = 2.9 + scale_y = 2.9 + shift_x = 0 + shift_y = -0.5 + for region in regions: + width = region.rect_w + height = region.rect_h + rotation = 0 + if rotation == 0: + region.rect_x_center_a = (region.rect_x_center + width * shift_x) * w + region.rect_y_center_a = (region.rect_y_center + height * shift_y) * h + else: + x_shift = (w * width * shift_x * math.cos(rotation) - h * height * shift_y * math.sin(rotation)) #/ w + y_shift = (w * width * shift_x * math.sin(rotation) + h * height * shift_y * math.cos(rotation)) #/ h + region.rect_x_center_a = region.rect_x_center*w + x_shift + region.rect_y_center_a = region.rect_y_center*h + y_shift + + long_side = max(width * w, height * h) + region.rect_w_a = long_side * scale_x + region.rect_h_a = long_side * scale_y + region.rect_points = rotated_rect_to_points(region.rect_x_center_a, region.rect_y_center_a, region.rect_w_a, region.rect_h_a, region.rotation) + +def rotated_rect_to_points(cx, cy, w, h, rotation): + b = math.cos(rotation) * 0.5 + a = math.sin(rotation) * 0.5 + points = [] + p0x = cx - a*h - b*w + p0y = cy + b*h - a*w + p1x = cx + a*h - b*w + p1y = cy - b*h - a*w + p2x = int(2*cx - p0x) + p2y = int(2*cy - p0y) + p3x = int(2*cx - p1x) + p3y = int(2*cy - p1y) + p0x, p0y, p1x, p1y = int(p0x), int(p0y), int(p1x), int(p1y) + return [[p0x,p0y], [p1x,p1y], [p2x,p2y], [p3x,p3y]] + +def detections_to_rect(regions): + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/hand_landmark/palm_detection_detection_to_roi.pbtxt + # # Converts results of palm detection into a rectangle (normalized by image size) + # # that encloses the palm and is rotated such that the line connecting center of + # # the wrist and MCP of the middle finger is aligned with the Y-axis of the + # # rectangle. + # node { + # calculator: "DetectionsToRectsCalculator" + # input_stream: "DETECTION:detection" + # input_stream: "IMAGE_SIZE:image_size" + # output_stream: "NORM_RECT:raw_roi" + # options: { + # [mediapipe.DetectionsToRectsCalculatorOptions.ext] { + # rotation_vector_start_keypoint_index: 0 # Center of wrist. + # rotation_vector_end_keypoint_index: 2 # MCP of middle finger. + # rotation_vector_target_angle_degrees: 90 + # } + # } + + target_angle = math.pi * 0.5 # 90 = pi/2 + for region in regions: + + region.rect_w = region.pd_box[2] + region.rect_h = region.pd_box[3] + region.rect_x_center = region.pd_box[0] + region.rect_w / 2 + region.rect_y_center = region.pd_box[1] + region.rect_h / 2 + + x0, y0 = region.pd_kps[0] # wrist center + x1, y1 = region.pd_kps[2] # middle finger + rotation = target_angle - math.atan2(-(y1 - y0), x1 - x0) + region.rotation = normalize_radians(rotation) + +def normalize_radians(angle): + return angle - 2 * math.pi * math.floor((angle + math.pi) / (2 * math.pi)) + +def generate_anchors_and_decode(bboxes, scores, threshold=0.5, scale=192): + """ + Generate anchors and decode bounding boxes for mediapipe hand detection model. + """ + anchors = generate_handtracker_anchors(scale, scale) + decoded_bboxes = decode_bboxes(threshold, scores, bboxes, anchors, scale=scale) + detections_to_rect(decoded_bboxes) + rect_transformation(decoded_bboxes, scale, scale) + return decoded_bboxes \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/__init__.py b/ml/postprocessing/utils/message_creation/__init__.py new file mode 100644 index 00000000..e0b7be5f --- /dev/null +++ b/ml/postprocessing/utils/message_creation/__init__.py @@ -0,0 +1,11 @@ +from .depth import create_depth_message +from .segmentation import create_segmentation_message +from .keypoints import create_hand_keypoints_message +from .detection import create_detection_message + +__all__ = [ + "create_depth_message", + "create_segmentation_message", + "create_hand_keypoints_message", + "create_detection_message", +] \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/depth.py b/ml/postprocessing/utils/message_creation/depth.py new file mode 100644 index 00000000..52ee9fcd --- /dev/null +++ b/ml/postprocessing/utils/message_creation/depth.py @@ -0,0 +1,28 @@ +import depthai as dai +import numpy as np + +def create_depth_message(x: np.array) -> dai.ImgFrame: + """ + Create a message for the depth image. Input is of the shape (H, W, 1). + In the third dimesion we specify the depth in the image. + + Args: + x (np.array): Input from the depth node. + + Returns: + dai.ImgFrame: Output depth message in ImgFrame.Type.RAW16. + """ + + if not isinstance(x, np.ndarray): + raise ValueError(f"Expected numpy array, got {type(x)}.") + if len(x.shape) != 3: + raise ValueError(f"Expected 3D input, got {len(x.shape)}D input.") + if x.shape[2] != 1: + raise ValueError(f"Expected 1 channel in the third dimension, got {x.shape[2]} channels.") + + imgFrame = dai.ImgFrame() + imgFrame.setFrame(x) + imgFrame.setWidth(x.shape[1]) + imgFrame.setHeight(x.shape[0]) + imgFrame.setType(dai.ImgFrame.Type.RAW16) + return imgFrame \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/detection.py b/ml/postprocessing/utils/message_creation/detection.py index e69de29b..c2791b46 100644 --- a/ml/postprocessing/utils/message_creation/detection.py +++ b/ml/postprocessing/utils/message_creation/detection.py @@ -0,0 +1,56 @@ +import depthai as dai +import numpy as np +from typing import List + +def create_detection_message(bboxes: np.ndarray, scores: np.ndarray, labels: List[int] = None) -> dai.ImgDetections: + """ + Create a message for the detection. The message contains the bounding boxes, labels, and confidence scores of detected objects. + If there are no labels or we only have one class, we can set labels to None and all detections will have label set to 0. + + Args: + bboxes (np.ndarray): Detected bounding boxes of shape (N,4) meaning [...,[x_min, y_min, x_max, y_max],...]. + scores (np.ndarray): Confidence scores of detected objects of shape (N,). + labels (List[int], optional): Labels of detected objects of shape (N,). Defaults to None. + + Returns: + dai.ImgDetections: Message containing the bounding boxes, labels, and confidence scores of detected objects. + """ + + if not isinstance(bboxes, np.ndarray): + raise ValueError(f"bboxes should be numpy array, got {type(bboxes)}.") + if len(bboxes.shape) != 2: + raise ValueError(f"bboxes should be of shape (N,4) meaning [...,[x_min, y_min, x_max, y_max],...], got {bboxes.shape}.") + if bboxes.shape[1] != 4: + raise ValueError(f"bboxes 2nd dimension should be of size 4 e.g. [x_min, y_min, x_max, y_max] got {bboxes.shape[1]}.") + if not isinstance(scores, np.ndarray): + raise ValueError(f"scores should be numpy array, got {type(scores)}.") + if len(scores.shape) != 1: + raise ValueError(f"scores should be of shape (N,) meaning, got {scores.shape}.") + if scores.shape[0] != bboxes.shape[0]: + raise ValueError(f"scores should have same length as bboxes, got {scores.shape[0]} and {bboxes.shape[0]}.") + if labels is not None: + if not isinstance(labels, List): + raise ValueError(f"labels should be list, got {type(labels)}.") + for label in labels: + if not isinstance(label, int): + raise ValueError(f"labels should be list of integers, got {type(label)}.") + if len(labels) != bboxes.shape[0]: + raise ValueError(f"labels should have same length as bboxes, got {len(labels)} and {bboxes.shape[0]}.") + + if labels is None: + labels = [0 for _ in range(bboxes.shape[0])] + + detections = [] + for bbox, score, label in zip(bboxes, scores, labels): + detection = dai.ImgDetection() + detection.confidence = score + detection.label = label + detection.xmin = bbox[0] + detection.ymin = bbox[1] + detection.xmax = bbox[2] + detection.ymax = bbox[3] + detections.append(detection) + + detections_msg = dai.ImgDetections() + detections_msg.detections = detections + return detections_msg \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/keypoints.py b/ml/postprocessing/utils/message_creation/keypoints.py new file mode 100644 index 00000000..a8bd1482 --- /dev/null +++ b/ml/postprocessing/utils/message_creation/keypoints.py @@ -0,0 +1,44 @@ +import depthai as dai +import numpy as np +from typing import List +from ....messages import HandKeypoints + +def create_hand_keypoints_message(hand_keypoints: np.ndarray, handedness: float, confidence: float, confidence_threshold: float) -> HandKeypoints: + """ + Create a message for the hand keypoint detection. The message contains the 3D coordinates of the detected hand keypoints, handedness, and confidence score. + + Args: + hand_keypoints (np.ndarray): Detected hand keypoints of shape (N,3) meaning [...,[x, y, z],...]. + handedness (float): Handedness score of the detected hand (left or right). + confidence (float): Confidence score of the detected hand. + confidence_threshold (float): Confidence threshold for the overall hand. + + Returns: + HandKeypoints: Message containing the 3D coordinates of the detected hand keypoints, handedness, and confidence score. + """ + + if not isinstance(hand_keypoints, np.ndarray): + raise ValueError(f"hand_keypoints should be numpy array, got {type(hand_keypoints)}.") + if len(hand_keypoints.shape) != 2: + raise ValueError(f"hand_keypoints should be of shape (N,3) meaning [...,[x, y, z],...], got {hand_keypoints.shape}.") + if hand_keypoints.shape[1] != 3: + raise ValueError(f"hand_keypoints 2nd dimension should be of size 3 e.g. [x, y, z], got {hand_keypoints.shape[1]}.") + if not isinstance(handedness, float): + raise ValueError(f"handedness should be float, got {type(handedness)}.") + if not isinstance(confidence, float): + raise ValueError(f"confidence should be float, got {type(confidence)}.") + + hand_keypoints_msg = HandKeypoints() + hand_keypoints_msg.handedness = handedness + hand_keypoints_msg.confidence = confidence + points = [] + if confidence >= confidence_threshold: + for i in range(hand_keypoints.shape[0]): + pt = dai.Point3f() + pt.x = hand_keypoints[i][0] + pt.y = hand_keypoints[i][1] + pt.z = hand_keypoints[i][2] + points.append(pt) + hand_keypoints_msg.keypoints = points + + return hand_keypoints_msg \ No newline at end of file diff --git a/ml/postprocessing/utils/message_creation/segmentation.py b/ml/postprocessing/utils/message_creation/segmentation.py new file mode 100644 index 00000000..25da189a --- /dev/null +++ b/ml/postprocessing/utils/message_creation/segmentation.py @@ -0,0 +1,28 @@ +import depthai as dai +import numpy as np + +def create_segmentation_message(x: np.array) -> dai.ImgFrame: + """ + Create a message for the segmentation node output. Input is of the shape (H, W, 1). + In the third dimesion we specify the class of the segmented objects. + + Args: + x (np.array): Input from the segmentation node. + + Returns: + dai.ImgFrame: Output segmentaion message in ImgFrame.Type.RAW8. + """ + + if not isinstance(x, np.ndarray): + raise ValueError(f"Expected numpy array, got {type(x)}.") + if len(x.shape) != 3: + raise ValueError(f"Expected 3D input, got {len(x.shape)}D input.") + if x.shape[2] != 1: + raise ValueError(f"Expected 1 channel in the third dimension, got {x.shape[2]} channels.") + + imgFrame = dai.ImgFrame() + imgFrame.setFrame(x) + imgFrame.setWidth(x.shape[1]) + imgFrame.setHeight(x.shape[0]) + imgFrame.setType(dai.ImgFrame.Type.RAW8) + return imgFrame \ No newline at end of file