Module ambianic.pipeline.ai.fall_detect

Fall detection pipe element.

Expand source code
"""Fall detection pipe element."""
import logging
import math
import time
from pathlib import Path

from ambianic.configuration import DEFAULT_DATA_DIR
from ambianic.pipeline.ai.pose_engine import PoseEngine
from ambianic.pipeline.ai.tf_detect import TFDetectionModel
from PIL import Image, ImageDraw

log = logging.getLogger(__name__)


class FallDetector(TFDetectionModel):

    """Detects falls comparing two images spaced about 1-2 seconds apart."""

    def __init__(self, model=None, confidence_threshold=0.15, **kwargs):
        """Initialize detector with config parameters.
        :Parameters:
        ----------
        model: dict
        {
            'tflite':
                'ai_models/posenet_mobilenet_v1_100_257x257_multi_kpt_stripped.tflite'
            'edgetpu':
                'ai_models/posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite'
        }
        """
        super().__init__(
            model=model, confidence_threshold=confidence_threshold, **kwargs
        )

        if self.context:
            self._sys_data_dir = self.context.data_dir
        else:
            self._sys_data_dir = DEFAULT_DATA_DIR
        self._sys_data_dir = Path(self._sys_data_dir)

        # previous pose detection information for frame at time t-1 and t-2 \
        # to compare pose changes against
        self._prev_data = [None] * 2

        # Data of previous frames lookup constants
        self.POSE_VAL = "_prev_pose_dix"
        self.TIMESTAMP = "_prev_time"
        self.THUMBNAIL = "_prev_thumbnail"
        self.LEFT_ANGLE_WITH_YAXIS = "_prev_left_angle_with_yaxis"
        self.RIGHT_ANGLE_WITH_YAXIS = "_prev_right_angle_with_yaxis"
        self.BODY_VECTOR_SCORE = "_prev_body_vector_score"

        _dix = {
            self.POSE_VAL: [],
            self.TIMESTAMP: time.monotonic(),
            self.THUMBNAIL: None,
            self.LEFT_ANGLE_WITH_YAXIS: None,
            self.RIGHT_ANGLE_WITH_YAXIS: None,
            self.BODY_VECTOR_SCORE: 0,
        }

        # self._prev_data[0] : store data of frame at t-2
        # self._prev_data[1] : store data of frame at t-1
        self._prev_data[0] = self._prev_data[1] = _dix

        self._pose_engine = PoseEngine(self._tfengine, context=self.context)
        self._fall_factor = 60
        self.confidence_threshold = confidence_threshold
        log.debug(
            f"Initializing FallDetector with conficence threshold: \
                  {self.confidence_threshold}"
        )

        # Require a minimum amount of time between two video frames in seconds.
        # Otherwise on high performing hard, the poses could be too close to
        # each other and have negligible difference
        # for fall detection purpose.
        self.min_time_between_frames = 1
        # Require the time distance between two video frames not to exceed
        # a certain limit in seconds.
        # Otherwise there could be data noise which could lead
        # false positive detections.
        self.max_time_between_frames = 10

        # keypoint lookup constants
        self.LEFT_SHOULDER = "left shoulder"
        self.LEFT_HIP = "left hip"
        self.RIGHT_SHOULDER = "right shoulder"
        self.RIGHT_HIP = "right hip"

        self.fall_detect_corr = [
            self.LEFT_SHOULDER,
            self.LEFT_HIP,
            self.RIGHT_SHOULDER,
            self.RIGHT_HIP,
        ]

    def process_sample(self, **sample):
        """Detect objects in sample image."""
        log.debug("%s received new sample", self.__class__.__name__)
        if not sample:
            # pass through empty samples to next element
            yield None
        else:
            try:
                image = sample["image"]
                inference_result, thumbnail = self.fall_detect(image=image)
                inference_result = self.convert_inference_result(inference_result)
                inf_meta = {
                    "display": "Fall Detection",
                }
                # pass on the results to the next connected pipe element
                processed_sample = {
                    "image": image,
                    "thumbnail": thumbnail,
                    "inference_result": inference_result,
                    "inference_meta": inf_meta,
                }
                yield processed_sample
            except Exception as e:
                log.exception(
                    'Error "%s" while processing sample. ' "Dropping sample: %s",
                    str(e),
                    str(sample),
                )

    def calculate_angle(self, p):
        """
        Calculate angle b/w two lines such as
        left shoulder-hip with prev frame's left shoulder-hip or
        right shoulder-hip with prev frame's right shoulder-hip or
        shoulder-hip line with vertical axis
        """

        x1, y1 = p[0][0]
        x2, y2 = p[0][1]

        x3, y3 = p[1][0]
        x4, y4 = p[1][1]

        theta1 = math.degrees(math.atan2(y2 - y1, x1 - x2))
        theta2 = math.degrees(math.atan2(y4 - y3, x3 - x4))

        angle = abs(theta1 - theta2)
        return angle

    def is_body_line_motion_downward(
        self, left_angle_with_yaxis, rigth_angle_with_yaxis, inx
    ):

        test = False

        l_angle = (
            left_angle_with_yaxis
            and self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
            and left_angle_with_yaxis > self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
        )
        r_angle = (
            rigth_angle_with_yaxis
            and self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
            and rigth_angle_with_yaxis
            > self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
        )

        if l_angle or r_angle:
            test = True

        return test

    def find_keypoints(self, image):

        # this score value should be related to the configuration \
        # confidence_threshold parameter
        min_score = self.confidence_threshold
        rotations = [Image.ROTATE_270, Image.ROTATE_90]
        angle = 0
        pose = None
        poses, thumbnail, _ = self._pose_engine.detect_poses(image)
        width, height = thumbnail.size
        # if no pose detected with high confidence,
        # try rotating the image +/- 90' to find a fallen person
        # currently only looking at pose[0] because we are focused \
        # on a lone person falls
        # while (not poses or poses[0].score < min_score) and rotations:
        spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])
        while spinal_vector_score < min_score and rotations:
            angle = rotations.pop()
            transposed = image.transpose(angle)
            # we are interested in the poses but not the rotated thumbnail
            poses, _, _ = self._pose_engine.detect_poses(transposed)
            spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])

        if poses and poses[0]:
            pose = poses[0]

        # lets check if we found a good pose candidate

        if pose and spinal_vector_score >= min_score:
            # if the image was rotated, we need to rotate back to the original\
            # image coordinates
            # before comparing with poses in other frames.
            if angle == Image.ROTATE_90:
                # ROTATE_90 rotates 90' counter clockwise \
                # from ^ to < orientation.
                for _, keypoint in pose.keypoints.items():
                    # keypoint.yx[0] is the x coordinate in an image
                    # keypoint.yx[0] is the y coordinate in an image, \
                    # with 0,0 in the upper left corner (not lower left).
                    tmp_swap = keypoint.yx[0]
                    keypoint.yx[0] = width - keypoint.yx[1]
                    keypoint.yx[1] = tmp_swap
            elif angle == Image.ROTATE_270:
                # ROTATE_270 rotates 90' clockwise from ^ to > orientation.
                for _, keypoint in pose.keypoints.items():
                    tmp_swap = keypoint.yx[0]
                    keypoint.yx[0] = keypoint.yx[1]
                    keypoint.yx[1] = height - tmp_swap
            # we could not detexct a pose with sufficient confidence
            log.info(
                f"""A pose detected with
                    spinal_vector_score={spinal_vector_score} >= {min_score}
                    confidence threshold.
                    Pose keypoints: {pose_dix}"
                """
            )
        else:
            pose = None

        return pose, thumbnail, spinal_vector_score, pose_dix

    def find_changes_in_angle(self, pose_dix, inx):
        """
        Find the changes in angle for shoulder-hip lines
        b/w current and previpus frame.
        """

        prev_leftLine_corr_exist = all(
            e in self._prev_data[inx][self.POSE_VAL]
            for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )
        curr_leftLine_corr_exist = all(
            e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )

        prev_rightLine_corr_exist = all(
            e in self._prev_data[inx][self.POSE_VAL]
            for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )
        curr_rightLine_corr_exist = all(
            e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )

        left_angle = right_angle = 0

        if prev_leftLine_corr_exist and curr_leftLine_corr_exist:
            temp_left_vector = [
                [
                    self._prev_data[inx][self.POSE_VAL][self.LEFT_SHOULDER],
                    self._prev_data[inx][self.POSE_VAL][self.LEFT_HIP],
                ],
                [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]],
            ]
            left_angle = self.calculate_angle(temp_left_vector)
            log.debug("Left shoulder-hip angle: %r", left_angle)

        if prev_rightLine_corr_exist and curr_rightLine_corr_exist:
            temp_right_vector = [
                [
                    self._prev_data[inx][self.POSE_VAL][self.RIGHT_SHOULDER],
                    self._prev_data[inx][self.POSE_VAL][self.RIGHT_HIP],
                ],
                [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]],
            ]
            right_angle = self.calculate_angle(temp_right_vector)
            log.debug("Right shoulder-hip angle: %r", right_angle)

        angle_change = max(left_angle, right_angle)
        return angle_change

    def assign_prev_records(
        self,
        pose_dix,
        left_angle_with_yaxis,
        rigth_angle_with_yaxis,
        now,
        thumbnail,
        current_body_vector_score,
    ):

        curr_data = {
            self.POSE_VAL: pose_dix,
            self.TIMESTAMP: now,
            self.THUMBNAIL: thumbnail,
            self.LEFT_ANGLE_WITH_YAXIS: left_angle_with_yaxis,
            self.RIGHT_ANGLE_WITH_YAXIS: rigth_angle_with_yaxis,
            self.BODY_VECTOR_SCORE: current_body_vector_score,
        }

        self._prev_data[-2] = self._prev_data[-1]
        self._prev_data[-1] = curr_data

    def draw_lines(self, thumbnail, pose_dix, score):
        """Draw body lines if available. Return number of lines drawn."""
        # save an image with drawn lines for debugging
        draw = ImageDraw.Draw(thumbnail)
        body_lines_drawn = 0

        if pose_dix is None:
            return body_lines_drawn

        if pose_dix.keys() >= {self.LEFT_SHOULDER, self.LEFT_HIP}:
            body_line = [
                tuple(pose_dix[self.LEFT_SHOULDER]),
                tuple(pose_dix[self.LEFT_HIP]),
            ]
            draw.line(body_line, fill="red")
            body_lines_drawn += 1

        if pose_dix.keys() >= {self.RIGHT_SHOULDER, self.RIGHT_HIP}:
            body_line = [
                tuple(pose_dix[self.RIGHT_SHOULDER]),
                tuple(pose_dix[self.RIGHT_HIP]),
            ]
            draw.line(body_line, fill="red")
            body_lines_drawn += 1

        # save a thumbnail for debugging
        timestr = int(time.monotonic() * 1000)
        debug_image_file_name = f"tmp-fall-detect-thumbnail-{timestr}-score-{score}.jpg"
        thumbnail.save(Path(self._sys_data_dir, debug_image_file_name), format="JPEG")
        return body_lines_drawn

    def get_line_angles_with_yaxis(self, pose_dix):
        """
        Find the angle b/w shoulder-hip line with yaxis.
        """
        y_axis_corr = [[0, 0], [0, self._pose_engine._tensor_image_height]]

        leftLine_corr_exist = all(
            e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )
        rightLine_corr_exist = all(
            e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )

        l_angle = r_angle = 0

        if leftLine_corr_exist:
            l_angle = self.calculate_angle(
                [y_axis_corr, [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]]
            )

        if rightLine_corr_exist:
            r_angle = self.calculate_angle(
                [y_axis_corr, [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]]
            )

        return (l_angle, r_angle)

    def estimate_spinal_vector_score(self, pose):
        pose_dix = {}
        is_leftVector = is_rightVector = False

        # Calculate leftVectorScore & rightVectorScore
        leftVectorScore = min(
            pose.keypoints[self.LEFT_SHOULDER].score,
            pose.keypoints[self.LEFT_HIP].score,
        )
        rightVectorScore = min(
            pose.keypoints[self.RIGHT_SHOULDER].score,
            pose.keypoints[self.RIGHT_HIP].score,
        )

        if leftVectorScore > self.confidence_threshold:
            is_leftVector = True
            pose_dix[self.LEFT_SHOULDER] = pose.keypoints[self.LEFT_SHOULDER].yx
            pose_dix[self.LEFT_HIP] = pose.keypoints[self.LEFT_HIP].yx

        if rightVectorScore > self.confidence_threshold:
            is_rightVector = True
            pose_dix[self.RIGHT_SHOULDER] = pose.keypoints[self.RIGHT_SHOULDER].yx
            pose_dix[self.RIGHT_HIP] = pose.keypoints[self.RIGHT_HIP].yx

        def find_spinalLine():
            left_spinal_x1 = (
                pose_dix[self.LEFT_SHOULDER][0] + pose_dix[self.RIGHT_SHOULDER][0]
            ) / 2
            left_spinal_y1 = (
                pose_dix[self.LEFT_SHOULDER][1] + pose_dix[self.RIGHT_SHOULDER][1]
            ) / 2

            right_spinal_x1 = (
                pose_dix[self.LEFT_HIP][0] + pose_dix[self.RIGHT_HIP][0]
            ) / 2
            right_spinal_y1 = (
                pose_dix[self.LEFT_HIP][1] + pose_dix[self.RIGHT_HIP][1]
            ) / 2

            return (left_spinal_x1, left_spinal_y1), (right_spinal_x1, right_spinal_y1)

        if is_leftVector and is_rightVector:
            # spinalVectorEstimate = find_spinalLine()
            spinalVectorScore = (leftVectorScore + rightVectorScore) / 2.0
        elif is_leftVector:
            # spinalVectorEstimate = pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]
            # 10% score penalty in conficence as only \
            # left shoulder-hip line is detected
            spinalVectorScore = leftVectorScore * 0.9
        elif is_rightVector:
            # spinalVectorEstimate = (
            #    pose_dix[self.RIGHT_SHOULDER],
            #    pose_dix[self.RIGHT_HIP],
            # )
            # 10% score penalty in conficence as only \
            # right shoulder-hip line is detected
            spinalVectorScore = rightVectorScore * 0.9
        else:
            spinalVectorScore = 0

        log.debug(f"Estimated spinal vector score: {spinalVectorScore}")
        return spinalVectorScore, pose_dix

    def fall_detect(self, image=None):
        assert image
        log.debug("Calling TF engine for inference")
        start_time = time.monotonic()

        now = time.monotonic()
        lapse = now - self._prev_data[-1][self.TIMESTAMP]

        if self._prev_data[-1][self.POSE_VAL] and lapse < self.min_time_between_frames:
            log.debug(
                "Received an image frame too soon after the previous \
                frame. Only %.2f ms apart.\
                Minimum %.2f ms period required for fall detection.",
                lapse,
                self.min_time_between_frames,
            )
            inference_result = None
            thumbnail = self._prev_data[-1][self.THUMBNAIL]
        else:
            # Detection using tensorflow posenet module
            pose, thumbnail, spinal_vector_score, pose_dix = self.find_keypoints(image)

            inference_result = None
            if not pose:
                log.debug(
                    f"No pose detected or detection score does not meet \
                    confidence threshold of {self.confidence_threshold}."
                )
            else:
                inference_result = []

                current_body_vector_score = spinal_vector_score

                # Find line angle with vertcal axis
                (
                    left_angle_with_yaxis,
                    rigth_angle_with_yaxis,
                ) = self.get_line_angles_with_yaxis(pose_dix)

                # save an image with drawn lines for debugging
                if log.getEffectiveLevel() <= logging.DEBUG:
                    # development mode
                    self.draw_lines(thumbnail, pose_dix, spinal_vector_score)

                for t in [-1, -2]:
                    lapse = now - self._prev_data[t][self.TIMESTAMP]

                    if (
                        not self._prev_data[t][self.POSE_VAL]
                        or lapse > self.max_time_between_frames
                    ):
                        log.debug(
                            "No recent pose to compare to. Will save \
                            this frame pose for subsequent comparison."
                        )
                    elif not self.is_body_line_motion_downward(
                        left_angle_with_yaxis, rigth_angle_with_yaxis, inx=t
                    ):
                        log.debug(
                            "The body-line angle with vertical axis is \
                                    decreasing from the previous frame. \
                                    Not likely to be a fall."
                        )
                    else:
                        leaning_angle = self.find_changes_in_angle(pose_dix, inx=t)

                        # Get leaning_probability by comparing leaning_angle
                        # with fall_factor probability.
                        leaning_probability = (
                            1 if leaning_angle > self._fall_factor else 0
                        )

                        # Calculate fall score using average of current and \
                        # previous frame's body vector score with \
                        # leaning_probability
                        fall_score = (
                            leaning_probability
                            * (
                                self._prev_data[t][self.BODY_VECTOR_SCORE]
                                + current_body_vector_score
                            )
                            / 2
                        )

                        if fall_score >= self.confidence_threshold:
                            inference_result.append(
                                ("FALL", fall_score, leaning_angle, pose_dix)
                            )
                            log.info("Fall detected: %r", inference_result)
                            break
                        else:
                            log.debug(
                                f"No fall detected due to low \
                            confidence score:  \
                            {fall_score} < {self.confidence_threshold} \
                            min threshold.Inference result: {inference_result}"
                            )

                log.debug("Saving pose for subsequent comparison.")
                self.assign_prev_records(
                    pose_dix,
                    left_angle_with_yaxis,
                    rigth_angle_with_yaxis,
                    now,
                    thumbnail,
                    current_body_vector_score,
                )

                # log.debug("Logging stats")

        self.log_stats(start_time=start_time)
        log.debug("thumbnail: %r", thumbnail)
        return inference_result, thumbnail

    def convert_inference_result(self, inference_result):
        inf_json = []

        if inference_result:
            for inf in inference_result:
                label, confidence, leaning_angle, keypoint_corr = inf
                log.info(
                    "label: %s , confidence: %.0f, leaning_angle: %.0f, \
                         keypoint_corr: %s",
                    label,
                    confidence,
                    leaning_angle,
                    keypoint_corr,
                )
                one_inf = {
                    "label": label,
                    "confidence": confidence,
                    "leaning_angle": leaning_angle,
                    "keypoint_corr": {
                        "left shoulder": keypoint_corr.get("left shoulder", None),
                        "left hip": keypoint_corr.get("left hip", None),
                        "right shoulder": keypoint_corr.get("right shoulder", None),
                        "right hip": keypoint_corr.get("right hip", None),
                    },
                }
                inf_json.append(one_inf)

        return inf_json

Classes

class FallDetector (model=None, confidence_threshold=0.15, **kwargs)

Detects falls comparing two images spaced about 1-2 seconds apart.

Initialize detector with config parameters. :Parameters:


model: dict { 'tflite': 'ai_models/posenet_mobilenet_v1_100_257x257_multi_kpt_stripped.tflite' 'edgetpu': 'ai_models/posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite' }

Expand source code
class FallDetector(TFDetectionModel):

    """Detects falls comparing two images spaced about 1-2 seconds apart."""

    def __init__(self, model=None, confidence_threshold=0.15, **kwargs):
        """Initialize detector with config parameters.
        :Parameters:
        ----------
        model: dict
        {
            'tflite':
                'ai_models/posenet_mobilenet_v1_100_257x257_multi_kpt_stripped.tflite'
            'edgetpu':
                'ai_models/posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite'
        }
        """
        super().__init__(
            model=model, confidence_threshold=confidence_threshold, **kwargs
        )

        if self.context:
            self._sys_data_dir = self.context.data_dir
        else:
            self._sys_data_dir = DEFAULT_DATA_DIR
        self._sys_data_dir = Path(self._sys_data_dir)

        # previous pose detection information for frame at time t-1 and t-2 \
        # to compare pose changes against
        self._prev_data = [None] * 2

        # Data of previous frames lookup constants
        self.POSE_VAL = "_prev_pose_dix"
        self.TIMESTAMP = "_prev_time"
        self.THUMBNAIL = "_prev_thumbnail"
        self.LEFT_ANGLE_WITH_YAXIS = "_prev_left_angle_with_yaxis"
        self.RIGHT_ANGLE_WITH_YAXIS = "_prev_right_angle_with_yaxis"
        self.BODY_VECTOR_SCORE = "_prev_body_vector_score"

        _dix = {
            self.POSE_VAL: [],
            self.TIMESTAMP: time.monotonic(),
            self.THUMBNAIL: None,
            self.LEFT_ANGLE_WITH_YAXIS: None,
            self.RIGHT_ANGLE_WITH_YAXIS: None,
            self.BODY_VECTOR_SCORE: 0,
        }

        # self._prev_data[0] : store data of frame at t-2
        # self._prev_data[1] : store data of frame at t-1
        self._prev_data[0] = self._prev_data[1] = _dix

        self._pose_engine = PoseEngine(self._tfengine, context=self.context)
        self._fall_factor = 60
        self.confidence_threshold = confidence_threshold
        log.debug(
            f"Initializing FallDetector with conficence threshold: \
                  {self.confidence_threshold}"
        )

        # Require a minimum amount of time between two video frames in seconds.
        # Otherwise on high performing hard, the poses could be too close to
        # each other and have negligible difference
        # for fall detection purpose.
        self.min_time_between_frames = 1
        # Require the time distance between two video frames not to exceed
        # a certain limit in seconds.
        # Otherwise there could be data noise which could lead
        # false positive detections.
        self.max_time_between_frames = 10

        # keypoint lookup constants
        self.LEFT_SHOULDER = "left shoulder"
        self.LEFT_HIP = "left hip"
        self.RIGHT_SHOULDER = "right shoulder"
        self.RIGHT_HIP = "right hip"

        self.fall_detect_corr = [
            self.LEFT_SHOULDER,
            self.LEFT_HIP,
            self.RIGHT_SHOULDER,
            self.RIGHT_HIP,
        ]

    def process_sample(self, **sample):
        """Detect objects in sample image."""
        log.debug("%s received new sample", self.__class__.__name__)
        if not sample:
            # pass through empty samples to next element
            yield None
        else:
            try:
                image = sample["image"]
                inference_result, thumbnail = self.fall_detect(image=image)
                inference_result = self.convert_inference_result(inference_result)
                inf_meta = {
                    "display": "Fall Detection",
                }
                # pass on the results to the next connected pipe element
                processed_sample = {
                    "image": image,
                    "thumbnail": thumbnail,
                    "inference_result": inference_result,
                    "inference_meta": inf_meta,
                }
                yield processed_sample
            except Exception as e:
                log.exception(
                    'Error "%s" while processing sample. ' "Dropping sample: %s",
                    str(e),
                    str(sample),
                )

    def calculate_angle(self, p):
        """
        Calculate angle b/w two lines such as
        left shoulder-hip with prev frame's left shoulder-hip or
        right shoulder-hip with prev frame's right shoulder-hip or
        shoulder-hip line with vertical axis
        """

        x1, y1 = p[0][0]
        x2, y2 = p[0][1]

        x3, y3 = p[1][0]
        x4, y4 = p[1][1]

        theta1 = math.degrees(math.atan2(y2 - y1, x1 - x2))
        theta2 = math.degrees(math.atan2(y4 - y3, x3 - x4))

        angle = abs(theta1 - theta2)
        return angle

    def is_body_line_motion_downward(
        self, left_angle_with_yaxis, rigth_angle_with_yaxis, inx
    ):

        test = False

        l_angle = (
            left_angle_with_yaxis
            and self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
            and left_angle_with_yaxis > self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
        )
        r_angle = (
            rigth_angle_with_yaxis
            and self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
            and rigth_angle_with_yaxis
            > self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
        )

        if l_angle or r_angle:
            test = True

        return test

    def find_keypoints(self, image):

        # this score value should be related to the configuration \
        # confidence_threshold parameter
        min_score = self.confidence_threshold
        rotations = [Image.ROTATE_270, Image.ROTATE_90]
        angle = 0
        pose = None
        poses, thumbnail, _ = self._pose_engine.detect_poses(image)
        width, height = thumbnail.size
        # if no pose detected with high confidence,
        # try rotating the image +/- 90' to find a fallen person
        # currently only looking at pose[0] because we are focused \
        # on a lone person falls
        # while (not poses or poses[0].score < min_score) and rotations:
        spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])
        while spinal_vector_score < min_score and rotations:
            angle = rotations.pop()
            transposed = image.transpose(angle)
            # we are interested in the poses but not the rotated thumbnail
            poses, _, _ = self._pose_engine.detect_poses(transposed)
            spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])

        if poses and poses[0]:
            pose = poses[0]

        # lets check if we found a good pose candidate

        if pose and spinal_vector_score >= min_score:
            # if the image was rotated, we need to rotate back to the original\
            # image coordinates
            # before comparing with poses in other frames.
            if angle == Image.ROTATE_90:
                # ROTATE_90 rotates 90' counter clockwise \
                # from ^ to < orientation.
                for _, keypoint in pose.keypoints.items():
                    # keypoint.yx[0] is the x coordinate in an image
                    # keypoint.yx[0] is the y coordinate in an image, \
                    # with 0,0 in the upper left corner (not lower left).
                    tmp_swap = keypoint.yx[0]
                    keypoint.yx[0] = width - keypoint.yx[1]
                    keypoint.yx[1] = tmp_swap
            elif angle == Image.ROTATE_270:
                # ROTATE_270 rotates 90' clockwise from ^ to > orientation.
                for _, keypoint in pose.keypoints.items():
                    tmp_swap = keypoint.yx[0]
                    keypoint.yx[0] = keypoint.yx[1]
                    keypoint.yx[1] = height - tmp_swap
            # we could not detexct a pose with sufficient confidence
            log.info(
                f"""A pose detected with
                    spinal_vector_score={spinal_vector_score} >= {min_score}
                    confidence threshold.
                    Pose keypoints: {pose_dix}"
                """
            )
        else:
            pose = None

        return pose, thumbnail, spinal_vector_score, pose_dix

    def find_changes_in_angle(self, pose_dix, inx):
        """
        Find the changes in angle for shoulder-hip lines
        b/w current and previpus frame.
        """

        prev_leftLine_corr_exist = all(
            e in self._prev_data[inx][self.POSE_VAL]
            for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )
        curr_leftLine_corr_exist = all(
            e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )

        prev_rightLine_corr_exist = all(
            e in self._prev_data[inx][self.POSE_VAL]
            for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )
        curr_rightLine_corr_exist = all(
            e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )

        left_angle = right_angle = 0

        if prev_leftLine_corr_exist and curr_leftLine_corr_exist:
            temp_left_vector = [
                [
                    self._prev_data[inx][self.POSE_VAL][self.LEFT_SHOULDER],
                    self._prev_data[inx][self.POSE_VAL][self.LEFT_HIP],
                ],
                [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]],
            ]
            left_angle = self.calculate_angle(temp_left_vector)
            log.debug("Left shoulder-hip angle: %r", left_angle)

        if prev_rightLine_corr_exist and curr_rightLine_corr_exist:
            temp_right_vector = [
                [
                    self._prev_data[inx][self.POSE_VAL][self.RIGHT_SHOULDER],
                    self._prev_data[inx][self.POSE_VAL][self.RIGHT_HIP],
                ],
                [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]],
            ]
            right_angle = self.calculate_angle(temp_right_vector)
            log.debug("Right shoulder-hip angle: %r", right_angle)

        angle_change = max(left_angle, right_angle)
        return angle_change

    def assign_prev_records(
        self,
        pose_dix,
        left_angle_with_yaxis,
        rigth_angle_with_yaxis,
        now,
        thumbnail,
        current_body_vector_score,
    ):

        curr_data = {
            self.POSE_VAL: pose_dix,
            self.TIMESTAMP: now,
            self.THUMBNAIL: thumbnail,
            self.LEFT_ANGLE_WITH_YAXIS: left_angle_with_yaxis,
            self.RIGHT_ANGLE_WITH_YAXIS: rigth_angle_with_yaxis,
            self.BODY_VECTOR_SCORE: current_body_vector_score,
        }

        self._prev_data[-2] = self._prev_data[-1]
        self._prev_data[-1] = curr_data

    def draw_lines(self, thumbnail, pose_dix, score):
        """Draw body lines if available. Return number of lines drawn."""
        # save an image with drawn lines for debugging
        draw = ImageDraw.Draw(thumbnail)
        body_lines_drawn = 0

        if pose_dix is None:
            return body_lines_drawn

        if pose_dix.keys() >= {self.LEFT_SHOULDER, self.LEFT_HIP}:
            body_line = [
                tuple(pose_dix[self.LEFT_SHOULDER]),
                tuple(pose_dix[self.LEFT_HIP]),
            ]
            draw.line(body_line, fill="red")
            body_lines_drawn += 1

        if pose_dix.keys() >= {self.RIGHT_SHOULDER, self.RIGHT_HIP}:
            body_line = [
                tuple(pose_dix[self.RIGHT_SHOULDER]),
                tuple(pose_dix[self.RIGHT_HIP]),
            ]
            draw.line(body_line, fill="red")
            body_lines_drawn += 1

        # save a thumbnail for debugging
        timestr = int(time.monotonic() * 1000)
        debug_image_file_name = f"tmp-fall-detect-thumbnail-{timestr}-score-{score}.jpg"
        thumbnail.save(Path(self._sys_data_dir, debug_image_file_name), format="JPEG")
        return body_lines_drawn

    def get_line_angles_with_yaxis(self, pose_dix):
        """
        Find the angle b/w shoulder-hip line with yaxis.
        """
        y_axis_corr = [[0, 0], [0, self._pose_engine._tensor_image_height]]

        leftLine_corr_exist = all(
            e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
        )
        rightLine_corr_exist = all(
            e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
        )

        l_angle = r_angle = 0

        if leftLine_corr_exist:
            l_angle = self.calculate_angle(
                [y_axis_corr, [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]]
            )

        if rightLine_corr_exist:
            r_angle = self.calculate_angle(
                [y_axis_corr, [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]]
            )

        return (l_angle, r_angle)

    def estimate_spinal_vector_score(self, pose):
        pose_dix = {}
        is_leftVector = is_rightVector = False

        # Calculate leftVectorScore & rightVectorScore
        leftVectorScore = min(
            pose.keypoints[self.LEFT_SHOULDER].score,
            pose.keypoints[self.LEFT_HIP].score,
        )
        rightVectorScore = min(
            pose.keypoints[self.RIGHT_SHOULDER].score,
            pose.keypoints[self.RIGHT_HIP].score,
        )

        if leftVectorScore > self.confidence_threshold:
            is_leftVector = True
            pose_dix[self.LEFT_SHOULDER] = pose.keypoints[self.LEFT_SHOULDER].yx
            pose_dix[self.LEFT_HIP] = pose.keypoints[self.LEFT_HIP].yx

        if rightVectorScore > self.confidence_threshold:
            is_rightVector = True
            pose_dix[self.RIGHT_SHOULDER] = pose.keypoints[self.RIGHT_SHOULDER].yx
            pose_dix[self.RIGHT_HIP] = pose.keypoints[self.RIGHT_HIP].yx

        def find_spinalLine():
            left_spinal_x1 = (
                pose_dix[self.LEFT_SHOULDER][0] + pose_dix[self.RIGHT_SHOULDER][0]
            ) / 2
            left_spinal_y1 = (
                pose_dix[self.LEFT_SHOULDER][1] + pose_dix[self.RIGHT_SHOULDER][1]
            ) / 2

            right_spinal_x1 = (
                pose_dix[self.LEFT_HIP][0] + pose_dix[self.RIGHT_HIP][0]
            ) / 2
            right_spinal_y1 = (
                pose_dix[self.LEFT_HIP][1] + pose_dix[self.RIGHT_HIP][1]
            ) / 2

            return (left_spinal_x1, left_spinal_y1), (right_spinal_x1, right_spinal_y1)

        if is_leftVector and is_rightVector:
            # spinalVectorEstimate = find_spinalLine()
            spinalVectorScore = (leftVectorScore + rightVectorScore) / 2.0
        elif is_leftVector:
            # spinalVectorEstimate = pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]
            # 10% score penalty in conficence as only \
            # left shoulder-hip line is detected
            spinalVectorScore = leftVectorScore * 0.9
        elif is_rightVector:
            # spinalVectorEstimate = (
            #    pose_dix[self.RIGHT_SHOULDER],
            #    pose_dix[self.RIGHT_HIP],
            # )
            # 10% score penalty in conficence as only \
            # right shoulder-hip line is detected
            spinalVectorScore = rightVectorScore * 0.9
        else:
            spinalVectorScore = 0

        log.debug(f"Estimated spinal vector score: {spinalVectorScore}")
        return spinalVectorScore, pose_dix

    def fall_detect(self, image=None):
        assert image
        log.debug("Calling TF engine for inference")
        start_time = time.monotonic()

        now = time.monotonic()
        lapse = now - self._prev_data[-1][self.TIMESTAMP]

        if self._prev_data[-1][self.POSE_VAL] and lapse < self.min_time_between_frames:
            log.debug(
                "Received an image frame too soon after the previous \
                frame. Only %.2f ms apart.\
                Minimum %.2f ms period required for fall detection.",
                lapse,
                self.min_time_between_frames,
            )
            inference_result = None
            thumbnail = self._prev_data[-1][self.THUMBNAIL]
        else:
            # Detection using tensorflow posenet module
            pose, thumbnail, spinal_vector_score, pose_dix = self.find_keypoints(image)

            inference_result = None
            if not pose:
                log.debug(
                    f"No pose detected or detection score does not meet \
                    confidence threshold of {self.confidence_threshold}."
                )
            else:
                inference_result = []

                current_body_vector_score = spinal_vector_score

                # Find line angle with vertcal axis
                (
                    left_angle_with_yaxis,
                    rigth_angle_with_yaxis,
                ) = self.get_line_angles_with_yaxis(pose_dix)

                # save an image with drawn lines for debugging
                if log.getEffectiveLevel() <= logging.DEBUG:
                    # development mode
                    self.draw_lines(thumbnail, pose_dix, spinal_vector_score)

                for t in [-1, -2]:
                    lapse = now - self._prev_data[t][self.TIMESTAMP]

                    if (
                        not self._prev_data[t][self.POSE_VAL]
                        or lapse > self.max_time_between_frames
                    ):
                        log.debug(
                            "No recent pose to compare to. Will save \
                            this frame pose for subsequent comparison."
                        )
                    elif not self.is_body_line_motion_downward(
                        left_angle_with_yaxis, rigth_angle_with_yaxis, inx=t
                    ):
                        log.debug(
                            "The body-line angle with vertical axis is \
                                    decreasing from the previous frame. \
                                    Not likely to be a fall."
                        )
                    else:
                        leaning_angle = self.find_changes_in_angle(pose_dix, inx=t)

                        # Get leaning_probability by comparing leaning_angle
                        # with fall_factor probability.
                        leaning_probability = (
                            1 if leaning_angle > self._fall_factor else 0
                        )

                        # Calculate fall score using average of current and \
                        # previous frame's body vector score with \
                        # leaning_probability
                        fall_score = (
                            leaning_probability
                            * (
                                self._prev_data[t][self.BODY_VECTOR_SCORE]
                                + current_body_vector_score
                            )
                            / 2
                        )

                        if fall_score >= self.confidence_threshold:
                            inference_result.append(
                                ("FALL", fall_score, leaning_angle, pose_dix)
                            )
                            log.info("Fall detected: %r", inference_result)
                            break
                        else:
                            log.debug(
                                f"No fall detected due to low \
                            confidence score:  \
                            {fall_score} < {self.confidence_threshold} \
                            min threshold.Inference result: {inference_result}"
                            )

                log.debug("Saving pose for subsequent comparison.")
                self.assign_prev_records(
                    pose_dix,
                    left_angle_with_yaxis,
                    rigth_angle_with_yaxis,
                    now,
                    thumbnail,
                    current_body_vector_score,
                )

                # log.debug("Logging stats")

        self.log_stats(start_time=start_time)
        log.debug("thumbnail: %r", thumbnail)
        return inference_result, thumbnail

    def convert_inference_result(self, inference_result):
        inf_json = []

        if inference_result:
            for inf in inference_result:
                label, confidence, leaning_angle, keypoint_corr = inf
                log.info(
                    "label: %s , confidence: %.0f, leaning_angle: %.0f, \
                         keypoint_corr: %s",
                    label,
                    confidence,
                    leaning_angle,
                    keypoint_corr,
                )
                one_inf = {
                    "label": label,
                    "confidence": confidence,
                    "leaning_angle": leaning_angle,
                    "keypoint_corr": {
                        "left shoulder": keypoint_corr.get("left shoulder", None),
                        "left hip": keypoint_corr.get("left hip", None),
                        "right shoulder": keypoint_corr.get("right shoulder", None),
                        "right hip": keypoint_corr.get("right hip", None),
                    },
                }
                inf_json.append(one_inf)

        return inf_json

Ancestors

Methods

def assign_prev_records(self, pose_dix, left_angle_with_yaxis, rigth_angle_with_yaxis, now, thumbnail, current_body_vector_score)
Expand source code
def assign_prev_records(
    self,
    pose_dix,
    left_angle_with_yaxis,
    rigth_angle_with_yaxis,
    now,
    thumbnail,
    current_body_vector_score,
):

    curr_data = {
        self.POSE_VAL: pose_dix,
        self.TIMESTAMP: now,
        self.THUMBNAIL: thumbnail,
        self.LEFT_ANGLE_WITH_YAXIS: left_angle_with_yaxis,
        self.RIGHT_ANGLE_WITH_YAXIS: rigth_angle_with_yaxis,
        self.BODY_VECTOR_SCORE: current_body_vector_score,
    }

    self._prev_data[-2] = self._prev_data[-1]
    self._prev_data[-1] = curr_data
def calculate_angle(self, p)

Calculate angle b/w two lines such as left shoulder-hip with prev frame's left shoulder-hip or right shoulder-hip with prev frame's right shoulder-hip or shoulder-hip line with vertical axis

Expand source code
def calculate_angle(self, p):
    """
    Calculate angle b/w two lines such as
    left shoulder-hip with prev frame's left shoulder-hip or
    right shoulder-hip with prev frame's right shoulder-hip or
    shoulder-hip line with vertical axis
    """

    x1, y1 = p[0][0]
    x2, y2 = p[0][1]

    x3, y3 = p[1][0]
    x4, y4 = p[1][1]

    theta1 = math.degrees(math.atan2(y2 - y1, x1 - x2))
    theta2 = math.degrees(math.atan2(y4 - y3, x3 - x4))

    angle = abs(theta1 - theta2)
    return angle
def convert_inference_result(self, inference_result)
Expand source code
def convert_inference_result(self, inference_result):
    inf_json = []

    if inference_result:
        for inf in inference_result:
            label, confidence, leaning_angle, keypoint_corr = inf
            log.info(
                "label: %s , confidence: %.0f, leaning_angle: %.0f, \
                     keypoint_corr: %s",
                label,
                confidence,
                leaning_angle,
                keypoint_corr,
            )
            one_inf = {
                "label": label,
                "confidence": confidence,
                "leaning_angle": leaning_angle,
                "keypoint_corr": {
                    "left shoulder": keypoint_corr.get("left shoulder", None),
                    "left hip": keypoint_corr.get("left hip", None),
                    "right shoulder": keypoint_corr.get("right shoulder", None),
                    "right hip": keypoint_corr.get("right hip", None),
                },
            }
            inf_json.append(one_inf)

    return inf_json
def draw_lines(self, thumbnail, pose_dix, score)

Draw body lines if available. Return number of lines drawn.

Expand source code
def draw_lines(self, thumbnail, pose_dix, score):
    """Draw body lines if available. Return number of lines drawn."""
    # save an image with drawn lines for debugging
    draw = ImageDraw.Draw(thumbnail)
    body_lines_drawn = 0

    if pose_dix is None:
        return body_lines_drawn

    if pose_dix.keys() >= {self.LEFT_SHOULDER, self.LEFT_HIP}:
        body_line = [
            tuple(pose_dix[self.LEFT_SHOULDER]),
            tuple(pose_dix[self.LEFT_HIP]),
        ]
        draw.line(body_line, fill="red")
        body_lines_drawn += 1

    if pose_dix.keys() >= {self.RIGHT_SHOULDER, self.RIGHT_HIP}:
        body_line = [
            tuple(pose_dix[self.RIGHT_SHOULDER]),
            tuple(pose_dix[self.RIGHT_HIP]),
        ]
        draw.line(body_line, fill="red")
        body_lines_drawn += 1

    # save a thumbnail for debugging
    timestr = int(time.monotonic() * 1000)
    debug_image_file_name = f"tmp-fall-detect-thumbnail-{timestr}-score-{score}.jpg"
    thumbnail.save(Path(self._sys_data_dir, debug_image_file_name), format="JPEG")
    return body_lines_drawn
def estimate_spinal_vector_score(self, pose)
Expand source code
def estimate_spinal_vector_score(self, pose):
    pose_dix = {}
    is_leftVector = is_rightVector = False

    # Calculate leftVectorScore & rightVectorScore
    leftVectorScore = min(
        pose.keypoints[self.LEFT_SHOULDER].score,
        pose.keypoints[self.LEFT_HIP].score,
    )
    rightVectorScore = min(
        pose.keypoints[self.RIGHT_SHOULDER].score,
        pose.keypoints[self.RIGHT_HIP].score,
    )

    if leftVectorScore > self.confidence_threshold:
        is_leftVector = True
        pose_dix[self.LEFT_SHOULDER] = pose.keypoints[self.LEFT_SHOULDER].yx
        pose_dix[self.LEFT_HIP] = pose.keypoints[self.LEFT_HIP].yx

    if rightVectorScore > self.confidence_threshold:
        is_rightVector = True
        pose_dix[self.RIGHT_SHOULDER] = pose.keypoints[self.RIGHT_SHOULDER].yx
        pose_dix[self.RIGHT_HIP] = pose.keypoints[self.RIGHT_HIP].yx

    def find_spinalLine():
        left_spinal_x1 = (
            pose_dix[self.LEFT_SHOULDER][0] + pose_dix[self.RIGHT_SHOULDER][0]
        ) / 2
        left_spinal_y1 = (
            pose_dix[self.LEFT_SHOULDER][1] + pose_dix[self.RIGHT_SHOULDER][1]
        ) / 2

        right_spinal_x1 = (
            pose_dix[self.LEFT_HIP][0] + pose_dix[self.RIGHT_HIP][0]
        ) / 2
        right_spinal_y1 = (
            pose_dix[self.LEFT_HIP][1] + pose_dix[self.RIGHT_HIP][1]
        ) / 2

        return (left_spinal_x1, left_spinal_y1), (right_spinal_x1, right_spinal_y1)

    if is_leftVector and is_rightVector:
        # spinalVectorEstimate = find_spinalLine()
        spinalVectorScore = (leftVectorScore + rightVectorScore) / 2.0
    elif is_leftVector:
        # spinalVectorEstimate = pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]
        # 10% score penalty in conficence as only \
        # left shoulder-hip line is detected
        spinalVectorScore = leftVectorScore * 0.9
    elif is_rightVector:
        # spinalVectorEstimate = (
        #    pose_dix[self.RIGHT_SHOULDER],
        #    pose_dix[self.RIGHT_HIP],
        # )
        # 10% score penalty in conficence as only \
        # right shoulder-hip line is detected
        spinalVectorScore = rightVectorScore * 0.9
    else:
        spinalVectorScore = 0

    log.debug(f"Estimated spinal vector score: {spinalVectorScore}")
    return spinalVectorScore, pose_dix
def fall_detect(self, image=None)
Expand source code
def fall_detect(self, image=None):
    assert image
    log.debug("Calling TF engine for inference")
    start_time = time.monotonic()

    now = time.monotonic()
    lapse = now - self._prev_data[-1][self.TIMESTAMP]

    if self._prev_data[-1][self.POSE_VAL] and lapse < self.min_time_between_frames:
        log.debug(
            "Received an image frame too soon after the previous \
            frame. Only %.2f ms apart.\
            Minimum %.2f ms period required for fall detection.",
            lapse,
            self.min_time_between_frames,
        )
        inference_result = None
        thumbnail = self._prev_data[-1][self.THUMBNAIL]
    else:
        # Detection using tensorflow posenet module
        pose, thumbnail, spinal_vector_score, pose_dix = self.find_keypoints(image)

        inference_result = None
        if not pose:
            log.debug(
                f"No pose detected or detection score does not meet \
                confidence threshold of {self.confidence_threshold}."
            )
        else:
            inference_result = []

            current_body_vector_score = spinal_vector_score

            # Find line angle with vertcal axis
            (
                left_angle_with_yaxis,
                rigth_angle_with_yaxis,
            ) = self.get_line_angles_with_yaxis(pose_dix)

            # save an image with drawn lines for debugging
            if log.getEffectiveLevel() <= logging.DEBUG:
                # development mode
                self.draw_lines(thumbnail, pose_dix, spinal_vector_score)

            for t in [-1, -2]:
                lapse = now - self._prev_data[t][self.TIMESTAMP]

                if (
                    not self._prev_data[t][self.POSE_VAL]
                    or lapse > self.max_time_between_frames
                ):
                    log.debug(
                        "No recent pose to compare to. Will save \
                        this frame pose for subsequent comparison."
                    )
                elif not self.is_body_line_motion_downward(
                    left_angle_with_yaxis, rigth_angle_with_yaxis, inx=t
                ):
                    log.debug(
                        "The body-line angle with vertical axis is \
                                decreasing from the previous frame. \
                                Not likely to be a fall."
                    )
                else:
                    leaning_angle = self.find_changes_in_angle(pose_dix, inx=t)

                    # Get leaning_probability by comparing leaning_angle
                    # with fall_factor probability.
                    leaning_probability = (
                        1 if leaning_angle > self._fall_factor else 0
                    )

                    # Calculate fall score using average of current and \
                    # previous frame's body vector score with \
                    # leaning_probability
                    fall_score = (
                        leaning_probability
                        * (
                            self._prev_data[t][self.BODY_VECTOR_SCORE]
                            + current_body_vector_score
                        )
                        / 2
                    )

                    if fall_score >= self.confidence_threshold:
                        inference_result.append(
                            ("FALL", fall_score, leaning_angle, pose_dix)
                        )
                        log.info("Fall detected: %r", inference_result)
                        break
                    else:
                        log.debug(
                            f"No fall detected due to low \
                        confidence score:  \
                        {fall_score} < {self.confidence_threshold} \
                        min threshold.Inference result: {inference_result}"
                        )

            log.debug("Saving pose for subsequent comparison.")
            self.assign_prev_records(
                pose_dix,
                left_angle_with_yaxis,
                rigth_angle_with_yaxis,
                now,
                thumbnail,
                current_body_vector_score,
            )

            # log.debug("Logging stats")

    self.log_stats(start_time=start_time)
    log.debug("thumbnail: %r", thumbnail)
    return inference_result, thumbnail
def find_changes_in_angle(self, pose_dix, inx)

Find the changes in angle for shoulder-hip lines b/w current and previpus frame.

Expand source code
def find_changes_in_angle(self, pose_dix, inx):
    """
    Find the changes in angle for shoulder-hip lines
    b/w current and previpus frame.
    """

    prev_leftLine_corr_exist = all(
        e in self._prev_data[inx][self.POSE_VAL]
        for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
    )
    curr_leftLine_corr_exist = all(
        e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
    )

    prev_rightLine_corr_exist = all(
        e in self._prev_data[inx][self.POSE_VAL]
        for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
    )
    curr_rightLine_corr_exist = all(
        e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
    )

    left_angle = right_angle = 0

    if prev_leftLine_corr_exist and curr_leftLine_corr_exist:
        temp_left_vector = [
            [
                self._prev_data[inx][self.POSE_VAL][self.LEFT_SHOULDER],
                self._prev_data[inx][self.POSE_VAL][self.LEFT_HIP],
            ],
            [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]],
        ]
        left_angle = self.calculate_angle(temp_left_vector)
        log.debug("Left shoulder-hip angle: %r", left_angle)

    if prev_rightLine_corr_exist and curr_rightLine_corr_exist:
        temp_right_vector = [
            [
                self._prev_data[inx][self.POSE_VAL][self.RIGHT_SHOULDER],
                self._prev_data[inx][self.POSE_VAL][self.RIGHT_HIP],
            ],
            [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]],
        ]
        right_angle = self.calculate_angle(temp_right_vector)
        log.debug("Right shoulder-hip angle: %r", right_angle)

    angle_change = max(left_angle, right_angle)
    return angle_change
def find_keypoints(self, image)
Expand source code
def find_keypoints(self, image):

    # this score value should be related to the configuration \
    # confidence_threshold parameter
    min_score = self.confidence_threshold
    rotations = [Image.ROTATE_270, Image.ROTATE_90]
    angle = 0
    pose = None
    poses, thumbnail, _ = self._pose_engine.detect_poses(image)
    width, height = thumbnail.size
    # if no pose detected with high confidence,
    # try rotating the image +/- 90' to find a fallen person
    # currently only looking at pose[0] because we are focused \
    # on a lone person falls
    # while (not poses or poses[0].score < min_score) and rotations:
    spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])
    while spinal_vector_score < min_score and rotations:
        angle = rotations.pop()
        transposed = image.transpose(angle)
        # we are interested in the poses but not the rotated thumbnail
        poses, _, _ = self._pose_engine.detect_poses(transposed)
        spinal_vector_score, pose_dix = self.estimate_spinal_vector_score(poses[0])

    if poses and poses[0]:
        pose = poses[0]

    # lets check if we found a good pose candidate

    if pose and spinal_vector_score >= min_score:
        # if the image was rotated, we need to rotate back to the original\
        # image coordinates
        # before comparing with poses in other frames.
        if angle == Image.ROTATE_90:
            # ROTATE_90 rotates 90' counter clockwise \
            # from ^ to < orientation.
            for _, keypoint in pose.keypoints.items():
                # keypoint.yx[0] is the x coordinate in an image
                # keypoint.yx[0] is the y coordinate in an image, \
                # with 0,0 in the upper left corner (not lower left).
                tmp_swap = keypoint.yx[0]
                keypoint.yx[0] = width - keypoint.yx[1]
                keypoint.yx[1] = tmp_swap
        elif angle == Image.ROTATE_270:
            # ROTATE_270 rotates 90' clockwise from ^ to > orientation.
            for _, keypoint in pose.keypoints.items():
                tmp_swap = keypoint.yx[0]
                keypoint.yx[0] = keypoint.yx[1]
                keypoint.yx[1] = height - tmp_swap
        # we could not detexct a pose with sufficient confidence
        log.info(
            f"""A pose detected with
                spinal_vector_score={spinal_vector_score} >= {min_score}
                confidence threshold.
                Pose keypoints: {pose_dix}"
            """
        )
    else:
        pose = None

    return pose, thumbnail, spinal_vector_score, pose_dix
def get_line_angles_with_yaxis(self, pose_dix)

Find the angle b/w shoulder-hip line with yaxis.

Expand source code
def get_line_angles_with_yaxis(self, pose_dix):
    """
    Find the angle b/w shoulder-hip line with yaxis.
    """
    y_axis_corr = [[0, 0], [0, self._pose_engine._tensor_image_height]]

    leftLine_corr_exist = all(
        e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]
    )
    rightLine_corr_exist = all(
        e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]
    )

    l_angle = r_angle = 0

    if leftLine_corr_exist:
        l_angle = self.calculate_angle(
            [y_axis_corr, [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]]
        )

    if rightLine_corr_exist:
        r_angle = self.calculate_angle(
            [y_axis_corr, [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]]
        )

    return (l_angle, r_angle)
def is_body_line_motion_downward(self, left_angle_with_yaxis, rigth_angle_with_yaxis, inx)
Expand source code
def is_body_line_motion_downward(
    self, left_angle_with_yaxis, rigth_angle_with_yaxis, inx
):

    test = False

    l_angle = (
        left_angle_with_yaxis
        and self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
        and left_angle_with_yaxis > self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
    )
    r_angle = (
        rigth_angle_with_yaxis
        and self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
        and rigth_angle_with_yaxis
        > self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]
    )

    if l_angle or r_angle:
        test = True

    return test
def process_sample(self, **sample)

Detect objects in sample image.

Expand source code
def process_sample(self, **sample):
    """Detect objects in sample image."""
    log.debug("%s received new sample", self.__class__.__name__)
    if not sample:
        # pass through empty samples to next element
        yield None
    else:
        try:
            image = sample["image"]
            inference_result, thumbnail = self.fall_detect(image=image)
            inference_result = self.convert_inference_result(inference_result)
            inf_meta = {
                "display": "Fall Detection",
            }
            # pass on the results to the next connected pipe element
            processed_sample = {
                "image": image,
                "thumbnail": thumbnail,
                "inference_result": inference_result,
                "inference_meta": inf_meta,
            }
            yield processed_sample
        except Exception as e:
            log.exception(
                'Error "%s" while processing sample. ' "Dropping sample: %s",
                str(e),
                str(sample),
            )

Inherited members