Module ambianic.pipeline.interpreter

Ambianic pipeline interpreter module.

Expand source code
"""Ambianic pipeline interpreter module."""
import logging
import threading
import time

from ambianic.configuration import DEFAULT_DATA_DIR, get_root_config
from ambianic.pipeline import HealthChecker, PipeElement, pipeline_event
from ambianic.pipeline.ai.face_detect import FaceDetector
from ambianic.pipeline.ai.fall_detect import FallDetector
from ambianic.pipeline.ai.object_detect import ObjectDetector
from ambianic.pipeline.save_event import SaveDetectionEvents
from ambianic.util import ManagedService, ThreadedJob, stacktrace

from .avsource.av_element import AVSourceElement

log = logging.getLogger(__name__)

# Pipeline class, overridden by test
PIPELINE_CLASS = None


def get_pipelines(pipelines_config, data_dir=None):
    """Initialize and return pipelines given config parameters.

    :Parameters:
    ----------
    pipelines_config : dict
        Example:
        daytime_front_door_watch:
        - source: *src_front_door_cam
          ...
        - detect_objects: # run ai inference on the input data
          ...

    :Returns:
    -------
    list
        List of configured pipelines.

    """
    pipelines = []
    if pipelines_config:
        for pname, pdef in pipelines_config.items():
            log.info("loading %s pipeline configuration", pname)
            pipeline_class = Pipeline if PIPELINE_CLASS is None else PIPELINE_CLASS
            pipe = pipeline_class(pname=pname, pconfig=pdef, data_dir=data_dir)
            pipelines.append(pipe)
    else:
        log.warning("No pipelines configured.")
    return pipelines


class PipelineServer(ManagedService):
    """Thin wrapper around PipelineServer constructs.

    Allows controlled start and stop of the web app server
    in a separate process.

    Parameters
    ----------
    config : yaml
        reference to the yaml configuration file

    """

    def __init__(self, config):
        self.config = config
        self.pipeline_server_job = None

    def start(self, **kwargs):
        log.info("PipelineServer server job starting...")
        f = PipelineServerJob(self.config)
        self.pipeline_server_job = ThreadedJob(f)
        self.pipeline_server_job.start()
        log.info("Pipeline server job started")

    def healthcheck(self):
        return time.monotonic(), True

    def heal(self):
        """Heal the server.

        TODO: Keep an eye for potential scenarios that cause this server to
         become unresponsive.
        """

    def stop(self):
        if self.pipeline_server_job:
            log.info("Pipeline server job stopping...")
            try:
                self.pipeline_server_job.stop()
                self.pipeline_server_job.join()
            except RuntimeError as err:
                log.warning("Failed stopping: %s" % err)
            self.pipeline_server_job = None
            log.info("Pipeline server job stopped.")


class PipelineServerJob(ManagedService):
    """Main pipeline interpreter class.

    Responsible for loading, running and overseeing the health
    of all Ambianic pipelines.

    """

    MAX_HEARTBEAT_INTERVAL = 40
    TERMINAL_HEALTH_INTERVAL = MAX_HEARTBEAT_INTERVAL * 3

    def __init__(self, config=None):
        """Initialize and configure a PipelineServer.

        :Parameters:
        ----------
        config : dict
            Python representation of the yaml configuration file. Example:
            pipelines:
              # sequence of piped operations for use on front door cam
              daytime_front_door_watch:
                - source: *src_front_door_cam
                  ...
                - detect_objects: # run ai inference on the input data
                  ...
                - save_detections: # save samples from the inference results
                  ...

        """
        self._threaded_jobs = []
        self._pipelines = []
        self._config = None
        self.reset(config)

    def reset(self, config=None):
        self._threaded_jobs = []
        self._pipelines = []
        if config is not None:
            self._config = config
        if self._config:
            pipelines_config = self._config.get("pipelines", None)
            if pipelines_config:
                # get main data dir config and pass
                # on to pipelines to use
                data_dir = self._config.get("data_dir", DEFAULT_DATA_DIR)
                self._pipelines = get_pipelines(pipelines_config, data_dir=data_dir)
                for pp in self._pipelines:
                    pj = ThreadedJob(pp)
                    self._threaded_jobs.append(pj)

    def _on_terminal_pipeline_health(self, pipeline=None, lapse=None):
        log.error(
            "Pipeline %s in terminal condition. "
            "Unable to recover."
            "Latest heartbeat was %f seconds ago. ",
            pipeline.name,
            lapse,
        )

    def _on_pipeline_job_ended(self, threaded_job=None):
        p = threaded_job.job
        log.debug('Pipeline "%s" has ended. ' "Removing from health watch.", p.name)
        self._threaded_jobs.remove(threaded_job)

    def healthcheck(self):
        """Check the health of all managed pipelines.

        Return the oldest heartbeat among all managed pipeline heartbeats.
        Try to heal pipelines that haven't reported a heartbeat and awhile.

        :returns: (timestamp, status) tuple with most outdated heartbeat
            and worst known status among managed pipelines
        """
        oldest_heartbeat = time.monotonic()
        # iterate over a copy of jobs, because
        # we may need to remove dead jobs in the loop
        for j in list(self._threaded_jobs):
            # get the pipeline object out of the threaded job wrapper
            p = j.job
            if j.is_alive():
                latest_heartbeat, status = p.healthcheck()
                now = time.monotonic()
                lapse = now - latest_heartbeat
                if lapse > self.TERMINAL_HEALTH_INTERVAL:
                    self._on_terminal_pipeline_health(p, lapse)
                    # more than a reasonable amount of time has passed
                    # since the pipeline reported a heartbeat.
                    # Let's recycle it
                elif lapse > self.MAX_HEARTBEAT_INTERVAL:
                    log.warning(
                        'Pipeline "%s" is not responsive. '
                        "Latest heartbeat was %f seconds ago. "
                        "Will attempt to heal it.",
                        p.name,
                        lapse,
                    )
                    self.heal_pipeline_job(j)
                if oldest_heartbeat > latest_heartbeat:
                    oldest_heartbeat = latest_heartbeat
            else:
                self._on_pipeline_job_ended(threaded_job=j)
        status = True  # At some point status may carry richer information
        return oldest_heartbeat, status

    def heal(self):
        """Heal the PipelineServer.

        PipelineServer manages its own health as best possible.
        Not much to do here at this time.
        """

    def heal_pipeline_job(self, threaded_job=None):
        assert threaded_job
        pipeline = threaded_job.job
        log.debug("pipline %s healing request...", pipeline.name)
        threaded_job.heal()
        log.debug("pipeline %s healing request completed.", pipeline.name)

    def start(self):
        # Start pipeline interpreter threads
        log.info("pipeline jobs starting...")
        for tj in self._threaded_jobs:
            tj.start()
        log.info("pipeline jobs started")

    def stop(self):
        log.info("pipeline jobs stopping...")
        # Signal pipeline interpreter threads to close
        for tj in self._threaded_jobs:
            tj.stop()
        # Wait for the pipeline interpreter threads to close...
        for tj in self._threaded_jobs:
            tj.join()
        log.info("pipeline jobs stopped.")


class HealingThread(threading.Thread):
    """A thread focused on healing a broken pipeline."""

    def __init__(self, target=None, on_finished=None):
        assert target, "Healing target required"
        assert on_finished, "on_finished callback required"
        threading.Thread.__init__(self, daemon=True)
        self._target = target
        self._on_finished = on_finished

    def run(self):
        log.debug("invoking healing target method %r", self._target)
        try:
            self._target()
        except Exception as e:
            log.warning("Error %r while running healing method %r.", e, self._target)
            log.warning(stacktrace())
        log.debug("invoking healing on_finished method %r", self._on_finished)
        try:
            self._on_finished()
        except Exception as e:
            log.warning(
                "Error %r while calling on_finished method %r.", e, self._on_finished
            )
            log.warning(stacktrace())


class Pipeline(ManagedService):
    """The main Ambianic data processing structure.

    Data flow is arranged in independent pipelines.
    """

    # valid pipeline operators
    PIPELINE_OPS = {
        "source": AVSourceElement,
        "detect_objects": ObjectDetector,
        "save_detections": SaveDetectionEvents,
        "detect_faces": FaceDetector,
        "detect_falls": FallDetector,
    }

    def _on_unknown_pipe_element(self, name=None):
        log.warning(
            "Pipeline definition has unknown "
            "pipeline element: %s ."
            " Ignoring element and moving forward.",
            name,
        )

    def __init__(self, pname=None, pconfig=None, data_dir=None):
        """Init and load pipeline config."""
        assert pname, "Pipeline name required"
        self.name = pname
        assert pconfig, "Pipeline config required"
        self.config = pconfig
        self.data_dir = data_dir
        self._pipe_elements = []
        self._latest_heartbeat_time = time.monotonic()
        # in the future status may represent a spectrum of health issues
        self._latest_health_status = True
        self._healing_thread = None
        self._context = pipeline_event.PipelineContext(unique_pipeline_name=self.name)
        self._context.data_dir = self.data_dir
        self._event_log = pipeline_event.get_event_log(pipeline_context=self._context)
        self.load_elements()

    def load_elements(self):
        """load pipeline elements based on configuration"""
        self._pipe_elements = []

        log.debug("Pipeline starts with element %r", self.config[0])
        source_element_key = [*self.config[0]][0]
        assert (
            source_element_key == "source"
        ), "Pipeline config must begin with a 'source' element instead of {}".format(
            source_element_key
        )

        for element_def in self.config:
            log.info("Pipeline %s loading next element: %s", self.name, element_def)

            is_valid = self.parse_source_config(element_def)
            if not is_valid:
                self._pipe_elements = []
                break

            is_valid = self.parse_ai_model_config(element_def)
            if not is_valid:
                self._pipe_elements = []
                break

            element_name = [*element_def][0]
            assert element_name
            element_config = element_def[element_name]

            # if dealing with a static reference, pass the whole object
            # eg. { [source]: [source-name] }
            if isinstance(element_config, str):
                element_config = {element_name: element_config}

            element_class = self.PIPELINE_OPS.get(element_name, None)

            if element_class:
                log.info(
                    "Pipeline %s adding element name %s " "with class %s and config %s",
                    self.name,
                    element_name,
                    element_class,
                    element_config,
                )
                element = element_class(
                    **element_config,
                    element_name=element_name,
                    context=self._context,
                    event_log=self._event_log
                )
                self._pipe_elements.append(element)
            else:
                self._on_unknown_pipe_element(name=element_name)

    def parse_ai_model_config(self, element_def: dict):
        """parse AI model configuration"""

        # its one
        ai_element = None
        for element_name in element_def:
            # ai_model: accept just a source_id and take it from sources
            if "ai_model" in element_def[element_name]:
                ai_element = element_def[element_name]
                break

        if ai_element is None:
            return True

        ai_model_id = None
        if isinstance(ai_element["ai_model"], str):
            ai_model_id = ai_element["ai_model"]

        if (
            ai_element["ai_model"] is not None
            and "ai_model_id" in ai_element["ai_model"]
        ):
            ai_model_id = ai_element["ai_model"]["ai_model_id"]

        if ai_model_id is None:
            return True

        root_config = get_root_config()
        ai_model = root_config.ai_models[ai_model_id]
        if ai_model is None:
            log.warning(
                "AI model id %s not found, cannot start pipeline %s",
                ai_model_id,
                self.name,
            )
            return False

        # merge the model config but keep the pipe element specific one
        for key, val in ai_model.items():
            if key not in ai_element:
                ai_element[key] = val

        # track the id
        ai_element["ai_model_id"] = ai_model_id

        return True

    def parse_source_config(self, element_def: dict):
        """parse source configuration"""
        # source: accept just a source_id and take it from sources
        if "source" not in element_def:
            return True

        source_id = None
        if isinstance(element_def["source"], str):
            source_id = element_def["source"]
        if "source_id" in element_def["source"]:
            source_id = element_def["source"]["source_id"]

        if source_id is None:
            return True

        # track the source_id
        root_config = get_root_config()
        source = root_config.sources.get(source_id, None)
        if source is None:
            log.warning(
                "Source id %s not found, cannot start pipeline %s",
                source_id,
                self.name,
            )
            return False

        element_def["source"] = source
        element_def["source"]["source_id"] = source_id

        return True

    def restart(self):
        """Restart a pipeline"""
        self.stop()
        self.reset()
        self.start()
        log.info("Pipeline restarted")

    def reset(self):
        """Reset the pipeline elements"""
        self._pipe_elements = []

    def _heartbeat(self):
        """Set the heartbeat timestamp to time.monotonic()."""
        log.debug("Pipeline %s heartbeat signal.", self.name)
        now = time.monotonic()
        lapse = now - self._latest_heartbeat_time
        log.debug("Pipeline %s heartbeat lapse %f", self.name, lapse)
        self._latest_heartbeat_time = now

    def _on_start_no_elements(self):
        return

    def start(self):
        """Start the pipeline loop.

        Run until the pipeline has input from its configured source
        or until a stop() signal is received.
        """
        if len(self._pipe_elements) == 0:
            self.load_elements()

        log.info("Starting %s main pipeline loop", self.__class__.__name__)
        if not self._pipe_elements:
            return self._on_start_no_elements()

        self._heartbeat()
        # connect pipeline elements as defined by user
        for i in range(1, len(self._pipe_elements)):
            e = self._pipe_elements[i - 1]
            assert isinstance(e, PipeElement)
            e_next = self._pipe_elements[i]
            e.connect_to_next_element(e_next)
        last_element = self._pipe_elements[len(self._pipe_elements) - 1]
        hc = HealthChecker(
            health_status_callback=self._heartbeat, element_name="health_check"
        )
        last_element.connect_to_next_element(hc)
        self._pipe_elements[0].start()
        log.info("Started %s", self.__class__.__name__)

    def healthcheck(self):
        """Return health vitals status report.

        :Returns:
        -------
        (timestamp, status)
            a tuple of
                monotonically increasing timestamp of the last known healthy
                heartbeat and a status with additional health information.

        """
        return self._latest_heartbeat_time, self._latest_health_status

    def _on_healing_already_in_progress(self):
        log.debug(
            "pipeline %s healing thread in progress."
            " Skipping request. "
            "Thread id: %d. ",
            self.name,
            self._healing_thread.ident,
        )

    def heal(self):
        """Nonblocking asynchronous heal function."""
        # register a heartbeat to prevent looping back
        # into heal while healing
        self._heartbeat()
        if self._healing_thread:
            self._on_healing_already_in_progress()
        else:
            log.debug("pipeline %s launching healing thread...", self.name)
            heal_target = self._pipe_elements[0].heal

            def healing_finished():
                log.debug(
                    "pipeline %s healing thread id: %d ended. ",
                    self.name,
                    self._healing_thread.ident,
                )
                self._healing_thread = None
                # let's notify healthchecker that progress is being made
                self._heartbeat()

            # launch healing function in a non-blocking way
            self._healing_thread = HealingThread(
                target=heal_target, on_finished=healing_finished
            )
            self._healing_thread.start()
            log.debug("pipeline %s launched healing thread.", self.name)

    def stop(self):
        """Stop pipeline processing.

        Disconnect from the source and all other external resources.
        """
        log.info("Requesting pipeline elements to stop... %s", self.__class__.__name__)
        if len(self._pipe_elements) > 0:
            self._pipe_elements[0].stop()
        log.info(
            "Completed request to pipeline elements to stop. %s",
            self.__class__.__name__,
        )

Functions

def get_pipelines(pipelines_config, data_dir=None)

Initialize and return pipelines given config parameters.

:Parameters:

pipelines_config : dict Example: daytime_front_door_watch: - source: *src_front_door_cam … - detect_objects: # run ai inference on the input data …

:Returns:

list List of configured pipelines.

Expand source code
def get_pipelines(pipelines_config, data_dir=None):
    """Initialize and return pipelines given config parameters.

    :Parameters:
    ----------
    pipelines_config : dict
        Example:
        daytime_front_door_watch:
        - source: *src_front_door_cam
          ...
        - detect_objects: # run ai inference on the input data
          ...

    :Returns:
    -------
    list
        List of configured pipelines.

    """
    pipelines = []
    if pipelines_config:
        for pname, pdef in pipelines_config.items():
            log.info("loading %s pipeline configuration", pname)
            pipeline_class = Pipeline if PIPELINE_CLASS is None else PIPELINE_CLASS
            pipe = pipeline_class(pname=pname, pconfig=pdef, data_dir=data_dir)
            pipelines.append(pipe)
    else:
        log.warning("No pipelines configured.")
    return pipelines

Classes

class HealingThread (target=None, on_finished=None)

A thread focused on healing a broken pipeline.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class HealingThread(threading.Thread):
    """A thread focused on healing a broken pipeline."""

    def __init__(self, target=None, on_finished=None):
        assert target, "Healing target required"
        assert on_finished, "on_finished callback required"
        threading.Thread.__init__(self, daemon=True)
        self._target = target
        self._on_finished = on_finished

    def run(self):
        log.debug("invoking healing target method %r", self._target)
        try:
            self._target()
        except Exception as e:
            log.warning("Error %r while running healing method %r.", e, self._target)
            log.warning(stacktrace())
        log.debug("invoking healing on_finished method %r", self._on_finished)
        try:
            self._on_finished()
        except Exception as e:
            log.warning(
                "Error %r while calling on_finished method %r.", e, self._on_finished
            )
            log.warning(stacktrace())

Ancestors

  • threading.Thread

Methods

def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Expand source code
def run(self):
    log.debug("invoking healing target method %r", self._target)
    try:
        self._target()
    except Exception as e:
        log.warning("Error %r while running healing method %r.", e, self._target)
        log.warning(stacktrace())
    log.debug("invoking healing on_finished method %r", self._on_finished)
    try:
        self._on_finished()
    except Exception as e:
        log.warning(
            "Error %r while calling on_finished method %r.", e, self._on_finished
        )
        log.warning(stacktrace())
class Pipeline (pname=None, pconfig=None, data_dir=None)

The main Ambianic data processing structure.

Data flow is arranged in independent pipelines.

Init and load pipeline config.

Expand source code
class Pipeline(ManagedService):
    """The main Ambianic data processing structure.

    Data flow is arranged in independent pipelines.
    """

    # valid pipeline operators
    PIPELINE_OPS = {
        "source": AVSourceElement,
        "detect_objects": ObjectDetector,
        "save_detections": SaveDetectionEvents,
        "detect_faces": FaceDetector,
        "detect_falls": FallDetector,
    }

    def _on_unknown_pipe_element(self, name=None):
        log.warning(
            "Pipeline definition has unknown "
            "pipeline element: %s ."
            " Ignoring element and moving forward.",
            name,
        )

    def __init__(self, pname=None, pconfig=None, data_dir=None):
        """Init and load pipeline config."""
        assert pname, "Pipeline name required"
        self.name = pname
        assert pconfig, "Pipeline config required"
        self.config = pconfig
        self.data_dir = data_dir
        self._pipe_elements = []
        self._latest_heartbeat_time = time.monotonic()
        # in the future status may represent a spectrum of health issues
        self._latest_health_status = True
        self._healing_thread = None
        self._context = pipeline_event.PipelineContext(unique_pipeline_name=self.name)
        self._context.data_dir = self.data_dir
        self._event_log = pipeline_event.get_event_log(pipeline_context=self._context)
        self.load_elements()

    def load_elements(self):
        """load pipeline elements based on configuration"""
        self._pipe_elements = []

        log.debug("Pipeline starts with element %r", self.config[0])
        source_element_key = [*self.config[0]][0]
        assert (
            source_element_key == "source"
        ), "Pipeline config must begin with a 'source' element instead of {}".format(
            source_element_key
        )

        for element_def in self.config:
            log.info("Pipeline %s loading next element: %s", self.name, element_def)

            is_valid = self.parse_source_config(element_def)
            if not is_valid:
                self._pipe_elements = []
                break

            is_valid = self.parse_ai_model_config(element_def)
            if not is_valid:
                self._pipe_elements = []
                break

            element_name = [*element_def][0]
            assert element_name
            element_config = element_def[element_name]

            # if dealing with a static reference, pass the whole object
            # eg. { [source]: [source-name] }
            if isinstance(element_config, str):
                element_config = {element_name: element_config}

            element_class = self.PIPELINE_OPS.get(element_name, None)

            if element_class:
                log.info(
                    "Pipeline %s adding element name %s " "with class %s and config %s",
                    self.name,
                    element_name,
                    element_class,
                    element_config,
                )
                element = element_class(
                    **element_config,
                    element_name=element_name,
                    context=self._context,
                    event_log=self._event_log
                )
                self._pipe_elements.append(element)
            else:
                self._on_unknown_pipe_element(name=element_name)

    def parse_ai_model_config(self, element_def: dict):
        """parse AI model configuration"""

        # its one
        ai_element = None
        for element_name in element_def:
            # ai_model: accept just a source_id and take it from sources
            if "ai_model" in element_def[element_name]:
                ai_element = element_def[element_name]
                break

        if ai_element is None:
            return True

        ai_model_id = None
        if isinstance(ai_element["ai_model"], str):
            ai_model_id = ai_element["ai_model"]

        if (
            ai_element["ai_model"] is not None
            and "ai_model_id" in ai_element["ai_model"]
        ):
            ai_model_id = ai_element["ai_model"]["ai_model_id"]

        if ai_model_id is None:
            return True

        root_config = get_root_config()
        ai_model = root_config.ai_models[ai_model_id]
        if ai_model is None:
            log.warning(
                "AI model id %s not found, cannot start pipeline %s",
                ai_model_id,
                self.name,
            )
            return False

        # merge the model config but keep the pipe element specific one
        for key, val in ai_model.items():
            if key not in ai_element:
                ai_element[key] = val

        # track the id
        ai_element["ai_model_id"] = ai_model_id

        return True

    def parse_source_config(self, element_def: dict):
        """parse source configuration"""
        # source: accept just a source_id and take it from sources
        if "source" not in element_def:
            return True

        source_id = None
        if isinstance(element_def["source"], str):
            source_id = element_def["source"]
        if "source_id" in element_def["source"]:
            source_id = element_def["source"]["source_id"]

        if source_id is None:
            return True

        # track the source_id
        root_config = get_root_config()
        source = root_config.sources.get(source_id, None)
        if source is None:
            log.warning(
                "Source id %s not found, cannot start pipeline %s",
                source_id,
                self.name,
            )
            return False

        element_def["source"] = source
        element_def["source"]["source_id"] = source_id

        return True

    def restart(self):
        """Restart a pipeline"""
        self.stop()
        self.reset()
        self.start()
        log.info("Pipeline restarted")

    def reset(self):
        """Reset the pipeline elements"""
        self._pipe_elements = []

    def _heartbeat(self):
        """Set the heartbeat timestamp to time.monotonic()."""
        log.debug("Pipeline %s heartbeat signal.", self.name)
        now = time.monotonic()
        lapse = now - self._latest_heartbeat_time
        log.debug("Pipeline %s heartbeat lapse %f", self.name, lapse)
        self._latest_heartbeat_time = now

    def _on_start_no_elements(self):
        return

    def start(self):
        """Start the pipeline loop.

        Run until the pipeline has input from its configured source
        or until a stop() signal is received.
        """
        if len(self._pipe_elements) == 0:
            self.load_elements()

        log.info("Starting %s main pipeline loop", self.__class__.__name__)
        if not self._pipe_elements:
            return self._on_start_no_elements()

        self._heartbeat()
        # connect pipeline elements as defined by user
        for i in range(1, len(self._pipe_elements)):
            e = self._pipe_elements[i - 1]
            assert isinstance(e, PipeElement)
            e_next = self._pipe_elements[i]
            e.connect_to_next_element(e_next)
        last_element = self._pipe_elements[len(self._pipe_elements) - 1]
        hc = HealthChecker(
            health_status_callback=self._heartbeat, element_name="health_check"
        )
        last_element.connect_to_next_element(hc)
        self._pipe_elements[0].start()
        log.info("Started %s", self.__class__.__name__)

    def healthcheck(self):
        """Return health vitals status report.

        :Returns:
        -------
        (timestamp, status)
            a tuple of
                monotonically increasing timestamp of the last known healthy
                heartbeat and a status with additional health information.

        """
        return self._latest_heartbeat_time, self._latest_health_status

    def _on_healing_already_in_progress(self):
        log.debug(
            "pipeline %s healing thread in progress."
            " Skipping request. "
            "Thread id: %d. ",
            self.name,
            self._healing_thread.ident,
        )

    def heal(self):
        """Nonblocking asynchronous heal function."""
        # register a heartbeat to prevent looping back
        # into heal while healing
        self._heartbeat()
        if self._healing_thread:
            self._on_healing_already_in_progress()
        else:
            log.debug("pipeline %s launching healing thread...", self.name)
            heal_target = self._pipe_elements[0].heal

            def healing_finished():
                log.debug(
                    "pipeline %s healing thread id: %d ended. ",
                    self.name,
                    self._healing_thread.ident,
                )
                self._healing_thread = None
                # let's notify healthchecker that progress is being made
                self._heartbeat()

            # launch healing function in a non-blocking way
            self._healing_thread = HealingThread(
                target=heal_target, on_finished=healing_finished
            )
            self._healing_thread.start()
            log.debug("pipeline %s launched healing thread.", self.name)

    def stop(self):
        """Stop pipeline processing.

        Disconnect from the source and all other external resources.
        """
        log.info("Requesting pipeline elements to stop... %s", self.__class__.__name__)
        if len(self._pipe_elements) > 0:
            self._pipe_elements[0].stop()
        log.info(
            "Completed request to pipeline elements to stop. %s",
            self.__class__.__name__,
        )

Ancestors

Class variables

var PIPELINE_OPS

Methods

def heal(self)

Nonblocking asynchronous heal function.

Expand source code
def heal(self):
    """Nonblocking asynchronous heal function."""
    # register a heartbeat to prevent looping back
    # into heal while healing
    self._heartbeat()
    if self._healing_thread:
        self._on_healing_already_in_progress()
    else:
        log.debug("pipeline %s launching healing thread...", self.name)
        heal_target = self._pipe_elements[0].heal

        def healing_finished():
            log.debug(
                "pipeline %s healing thread id: %d ended. ",
                self.name,
                self._healing_thread.ident,
            )
            self._healing_thread = None
            # let's notify healthchecker that progress is being made
            self._heartbeat()

        # launch healing function in a non-blocking way
        self._healing_thread = HealingThread(
            target=heal_target, on_finished=healing_finished
        )
        self._healing_thread.start()
        log.debug("pipeline %s launched healing thread.", self.name)
def healthcheck(self)

Return health vitals status report.

:Returns:

(timestamp, status) a tuple of monotonically increasing timestamp of the last known healthy heartbeat and a status with additional health information.

Expand source code
def healthcheck(self):
    """Return health vitals status report.

    :Returns:
    -------
    (timestamp, status)
        a tuple of
            monotonically increasing timestamp of the last known healthy
            heartbeat and a status with additional health information.

    """
    return self._latest_heartbeat_time, self._latest_health_status
def load_elements(self)

load pipeline elements based on configuration

Expand source code
def load_elements(self):
    """load pipeline elements based on configuration"""
    self._pipe_elements = []

    log.debug("Pipeline starts with element %r", self.config[0])
    source_element_key = [*self.config[0]][0]
    assert (
        source_element_key == "source"
    ), "Pipeline config must begin with a 'source' element instead of {}".format(
        source_element_key
    )

    for element_def in self.config:
        log.info("Pipeline %s loading next element: %s", self.name, element_def)

        is_valid = self.parse_source_config(element_def)
        if not is_valid:
            self._pipe_elements = []
            break

        is_valid = self.parse_ai_model_config(element_def)
        if not is_valid:
            self._pipe_elements = []
            break

        element_name = [*element_def][0]
        assert element_name
        element_config = element_def[element_name]

        # if dealing with a static reference, pass the whole object
        # eg. { [source]: [source-name] }
        if isinstance(element_config, str):
            element_config = {element_name: element_config}

        element_class = self.PIPELINE_OPS.get(element_name, None)

        if element_class:
            log.info(
                "Pipeline %s adding element name %s " "with class %s and config %s",
                self.name,
                element_name,
                element_class,
                element_config,
            )
            element = element_class(
                **element_config,
                element_name=element_name,
                context=self._context,
                event_log=self._event_log
            )
            self._pipe_elements.append(element)
        else:
            self._on_unknown_pipe_element(name=element_name)
def parse_ai_model_config(self, element_def: dict)

parse AI model configuration

Expand source code
def parse_ai_model_config(self, element_def: dict):
    """parse AI model configuration"""

    # its one
    ai_element = None
    for element_name in element_def:
        # ai_model: accept just a source_id and take it from sources
        if "ai_model" in element_def[element_name]:
            ai_element = element_def[element_name]
            break

    if ai_element is None:
        return True

    ai_model_id = None
    if isinstance(ai_element["ai_model"], str):
        ai_model_id = ai_element["ai_model"]

    if (
        ai_element["ai_model"] is not None
        and "ai_model_id" in ai_element["ai_model"]
    ):
        ai_model_id = ai_element["ai_model"]["ai_model_id"]

    if ai_model_id is None:
        return True

    root_config = get_root_config()
    ai_model = root_config.ai_models[ai_model_id]
    if ai_model is None:
        log.warning(
            "AI model id %s not found, cannot start pipeline %s",
            ai_model_id,
            self.name,
        )
        return False

    # merge the model config but keep the pipe element specific one
    for key, val in ai_model.items():
        if key not in ai_element:
            ai_element[key] = val

    # track the id
    ai_element["ai_model_id"] = ai_model_id

    return True
def parse_source_config(self, element_def: dict)

parse source configuration

Expand source code
def parse_source_config(self, element_def: dict):
    """parse source configuration"""
    # source: accept just a source_id and take it from sources
    if "source" not in element_def:
        return True

    source_id = None
    if isinstance(element_def["source"], str):
        source_id = element_def["source"]
    if "source_id" in element_def["source"]:
        source_id = element_def["source"]["source_id"]

    if source_id is None:
        return True

    # track the source_id
    root_config = get_root_config()
    source = root_config.sources.get(source_id, None)
    if source is None:
        log.warning(
            "Source id %s not found, cannot start pipeline %s",
            source_id,
            self.name,
        )
        return False

    element_def["source"] = source
    element_def["source"]["source_id"] = source_id

    return True
def reset(self)

Reset the pipeline elements

Expand source code
def reset(self):
    """Reset the pipeline elements"""
    self._pipe_elements = []
def restart(self)

Restart a pipeline

Expand source code
def restart(self):
    """Restart a pipeline"""
    self.stop()
    self.reset()
    self.start()
    log.info("Pipeline restarted")
def start(self)

Start the pipeline loop.

Run until the pipeline has input from its configured source or until a stop() signal is received.

Expand source code
def start(self):
    """Start the pipeline loop.

    Run until the pipeline has input from its configured source
    or until a stop() signal is received.
    """
    if len(self._pipe_elements) == 0:
        self.load_elements()

    log.info("Starting %s main pipeline loop", self.__class__.__name__)
    if not self._pipe_elements:
        return self._on_start_no_elements()

    self._heartbeat()
    # connect pipeline elements as defined by user
    for i in range(1, len(self._pipe_elements)):
        e = self._pipe_elements[i - 1]
        assert isinstance(e, PipeElement)
        e_next = self._pipe_elements[i]
        e.connect_to_next_element(e_next)
    last_element = self._pipe_elements[len(self._pipe_elements) - 1]
    hc = HealthChecker(
        health_status_callback=self._heartbeat, element_name="health_check"
    )
    last_element.connect_to_next_element(hc)
    self._pipe_elements[0].start()
    log.info("Started %s", self.__class__.__name__)
def stop(self)

Stop pipeline processing.

Disconnect from the source and all other external resources.

Expand source code
def stop(self):
    """Stop pipeline processing.

    Disconnect from the source and all other external resources.
    """
    log.info("Requesting pipeline elements to stop... %s", self.__class__.__name__)
    if len(self._pipe_elements) > 0:
        self._pipe_elements[0].stop()
    log.info(
        "Completed request to pipeline elements to stop. %s",
        self.__class__.__name__,
    )
class PipelineServer (config)

Thin wrapper around PipelineServer constructs.

Allows controlled start and stop of the web app server in a separate process.

Parameters

config : yaml
reference to the yaml configuration file
Expand source code
class PipelineServer(ManagedService):
    """Thin wrapper around PipelineServer constructs.

    Allows controlled start and stop of the web app server
    in a separate process.

    Parameters
    ----------
    config : yaml
        reference to the yaml configuration file

    """

    def __init__(self, config):
        self.config = config
        self.pipeline_server_job = None

    def start(self, **kwargs):
        log.info("PipelineServer server job starting...")
        f = PipelineServerJob(self.config)
        self.pipeline_server_job = ThreadedJob(f)
        self.pipeline_server_job.start()
        log.info("Pipeline server job started")

    def healthcheck(self):
        return time.monotonic(), True

    def heal(self):
        """Heal the server.

        TODO: Keep an eye for potential scenarios that cause this server to
         become unresponsive.
        """

    def stop(self):
        if self.pipeline_server_job:
            log.info("Pipeline server job stopping...")
            try:
                self.pipeline_server_job.stop()
                self.pipeline_server_job.join()
            except RuntimeError as err:
                log.warning("Failed stopping: %s" % err)
            self.pipeline_server_job = None
            log.info("Pipeline server job stopped.")

Ancestors

Methods

def heal(self)

Heal the server.

TODO: Keep an eye for potential scenarios that cause this server to become unresponsive.

Expand source code
def heal(self):
    """Heal the server.

    TODO: Keep an eye for potential scenarios that cause this server to
     become unresponsive.
    """

Inherited members

class PipelineServerJob (config=None)

Main pipeline interpreter class.

Responsible for loading, running and overseeing the health of all Ambianic pipelines.

Initialize and configure a PipelineServer.

:Parameters:

config : dict Python representation of the yaml configuration file. Example: pipelines: # sequence of piped operations for use on front door cam daytime_front_door_watch: - source: *src_front_door_cam … - detect_objects: # run ai inference on the input data … - save_detections: # save samples from the inference results …

Expand source code
class PipelineServerJob(ManagedService):
    """Main pipeline interpreter class.

    Responsible for loading, running and overseeing the health
    of all Ambianic pipelines.

    """

    MAX_HEARTBEAT_INTERVAL = 40
    TERMINAL_HEALTH_INTERVAL = MAX_HEARTBEAT_INTERVAL * 3

    def __init__(self, config=None):
        """Initialize and configure a PipelineServer.

        :Parameters:
        ----------
        config : dict
            Python representation of the yaml configuration file. Example:
            pipelines:
              # sequence of piped operations for use on front door cam
              daytime_front_door_watch:
                - source: *src_front_door_cam
                  ...
                - detect_objects: # run ai inference on the input data
                  ...
                - save_detections: # save samples from the inference results
                  ...

        """
        self._threaded_jobs = []
        self._pipelines = []
        self._config = None
        self.reset(config)

    def reset(self, config=None):
        self._threaded_jobs = []
        self._pipelines = []
        if config is not None:
            self._config = config
        if self._config:
            pipelines_config = self._config.get("pipelines", None)
            if pipelines_config:
                # get main data dir config and pass
                # on to pipelines to use
                data_dir = self._config.get("data_dir", DEFAULT_DATA_DIR)
                self._pipelines = get_pipelines(pipelines_config, data_dir=data_dir)
                for pp in self._pipelines:
                    pj = ThreadedJob(pp)
                    self._threaded_jobs.append(pj)

    def _on_terminal_pipeline_health(self, pipeline=None, lapse=None):
        log.error(
            "Pipeline %s in terminal condition. "
            "Unable to recover."
            "Latest heartbeat was %f seconds ago. ",
            pipeline.name,
            lapse,
        )

    def _on_pipeline_job_ended(self, threaded_job=None):
        p = threaded_job.job
        log.debug('Pipeline "%s" has ended. ' "Removing from health watch.", p.name)
        self._threaded_jobs.remove(threaded_job)

    def healthcheck(self):
        """Check the health of all managed pipelines.

        Return the oldest heartbeat among all managed pipeline heartbeats.
        Try to heal pipelines that haven't reported a heartbeat and awhile.

        :returns: (timestamp, status) tuple with most outdated heartbeat
            and worst known status among managed pipelines
        """
        oldest_heartbeat = time.monotonic()
        # iterate over a copy of jobs, because
        # we may need to remove dead jobs in the loop
        for j in list(self._threaded_jobs):
            # get the pipeline object out of the threaded job wrapper
            p = j.job
            if j.is_alive():
                latest_heartbeat, status = p.healthcheck()
                now = time.monotonic()
                lapse = now - latest_heartbeat
                if lapse > self.TERMINAL_HEALTH_INTERVAL:
                    self._on_terminal_pipeline_health(p, lapse)
                    # more than a reasonable amount of time has passed
                    # since the pipeline reported a heartbeat.
                    # Let's recycle it
                elif lapse > self.MAX_HEARTBEAT_INTERVAL:
                    log.warning(
                        'Pipeline "%s" is not responsive. '
                        "Latest heartbeat was %f seconds ago. "
                        "Will attempt to heal it.",
                        p.name,
                        lapse,
                    )
                    self.heal_pipeline_job(j)
                if oldest_heartbeat > latest_heartbeat:
                    oldest_heartbeat = latest_heartbeat
            else:
                self._on_pipeline_job_ended(threaded_job=j)
        status = True  # At some point status may carry richer information
        return oldest_heartbeat, status

    def heal(self):
        """Heal the PipelineServer.

        PipelineServer manages its own health as best possible.
        Not much to do here at this time.
        """

    def heal_pipeline_job(self, threaded_job=None):
        assert threaded_job
        pipeline = threaded_job.job
        log.debug("pipline %s healing request...", pipeline.name)
        threaded_job.heal()
        log.debug("pipeline %s healing request completed.", pipeline.name)

    def start(self):
        # Start pipeline interpreter threads
        log.info("pipeline jobs starting...")
        for tj in self._threaded_jobs:
            tj.start()
        log.info("pipeline jobs started")

    def stop(self):
        log.info("pipeline jobs stopping...")
        # Signal pipeline interpreter threads to close
        for tj in self._threaded_jobs:
            tj.stop()
        # Wait for the pipeline interpreter threads to close...
        for tj in self._threaded_jobs:
            tj.join()
        log.info("pipeline jobs stopped.")

Ancestors

Class variables

var MAX_HEARTBEAT_INTERVAL
var TERMINAL_HEALTH_INTERVAL

Methods

def heal(self)

Heal the PipelineServer.

PipelineServer manages its own health as best possible. Not much to do here at this time.

Expand source code
def heal(self):
    """Heal the PipelineServer.

    PipelineServer manages its own health as best possible.
    Not much to do here at this time.
    """
def heal_pipeline_job(self, threaded_job=None)
Expand source code
def heal_pipeline_job(self, threaded_job=None):
    assert threaded_job
    pipeline = threaded_job.job
    log.debug("pipline %s healing request...", pipeline.name)
    threaded_job.heal()
    log.debug("pipeline %s healing request completed.", pipeline.name)
def healthcheck(self)

Check the health of all managed pipelines.

Return the oldest heartbeat among all managed pipeline heartbeats. Try to heal pipelines that haven't reported a heartbeat and awhile.

:returns: (timestamp, status) tuple with most outdated heartbeat and worst known status among managed pipelines

Expand source code
def healthcheck(self):
    """Check the health of all managed pipelines.

    Return the oldest heartbeat among all managed pipeline heartbeats.
    Try to heal pipelines that haven't reported a heartbeat and awhile.

    :returns: (timestamp, status) tuple with most outdated heartbeat
        and worst known status among managed pipelines
    """
    oldest_heartbeat = time.monotonic()
    # iterate over a copy of jobs, because
    # we may need to remove dead jobs in the loop
    for j in list(self._threaded_jobs):
        # get the pipeline object out of the threaded job wrapper
        p = j.job
        if j.is_alive():
            latest_heartbeat, status = p.healthcheck()
            now = time.monotonic()
            lapse = now - latest_heartbeat
            if lapse > self.TERMINAL_HEALTH_INTERVAL:
                self._on_terminal_pipeline_health(p, lapse)
                # more than a reasonable amount of time has passed
                # since the pipeline reported a heartbeat.
                # Let's recycle it
            elif lapse > self.MAX_HEARTBEAT_INTERVAL:
                log.warning(
                    'Pipeline "%s" is not responsive. '
                    "Latest heartbeat was %f seconds ago. "
                    "Will attempt to heal it.",
                    p.name,
                    lapse,
                )
                self.heal_pipeline_job(j)
            if oldest_heartbeat > latest_heartbeat:
                oldest_heartbeat = latest_heartbeat
        else:
            self._on_pipeline_job_ended(threaded_job=j)
    status = True  # At some point status may carry richer information
    return oldest_heartbeat, status
def reset(self, config=None)
Expand source code
def reset(self, config=None):
    self._threaded_jobs = []
    self._pipelines = []
    if config is not None:
        self._config = config
    if self._config:
        pipelines_config = self._config.get("pipelines", None)
        if pipelines_config:
            # get main data dir config and pass
            # on to pipelines to use
            data_dir = self._config.get("data_dir", DEFAULT_DATA_DIR)
            self._pipelines = get_pipelines(pipelines_config, data_dir=data_dir)
            for pp in self._pipelines:
                pj = ThreadedJob(pp)
                self._threaded_jobs.append(pj)

Inherited members