Module ambianic.pipeline

Main module for Ambianic AI pipelines.

Expand source code
"""Main module for Ambianic AI pipelines."""
import logging
import time
from typing import Iterable

from ambianic.pipeline.pipeline_event import PipelineContext
from ambianic.util import ManagedService

log = logging.getLogger(__name__)

# Define pipe lifecycle states
PIPE_STATE_STOPPED = 0
PIPE_STATE_RUNNING = 10
PIPE_STATES = [PIPE_STATE_RUNNING, PIPE_STATE_STOPPED]


class PipeElement(ManagedService):
    """The basic building block of an Ambianic pipeline."""

    def __init__(
        self,
        element_name=None,
        context: PipelineContext = None,
        event_log: logging.Logger = None,
        **kwargs
    ):
        """Create a PipeElement instance."""
        super().__init__()
        self._name = element_name
        self._state = PIPE_STATE_STOPPED
        self._next_element = None
        self._latest_heartbeat = time.monotonic()
        self._context = context
        self._timeline_event_log = event_log

    @property
    def name(self) -> str:
        """Return this element's reference name in pipeline definitions."""
        return self._name

    @property
    def context(self) -> PipelineContext:
        """Pipeline execution context.

        :Returns:
        -------
        type: PipelineContext
            pipeline execution context

        """
        return self._context

    def push_context(self, element_context: dict = None):
        """Push this element information to the context stack.

        Invoke before the element yields its first sample output
        for a given input sample.

        :Parameters:
        ----------
        element_context : dict
            Contextual info about this element.

        """
        if element_context is None:
            element_context = {}
        element_context["class"] = self.__class__.__name__
        self._context.push_element_context(element_context)

    def pop_context(self) -> dict:
        """Pop element information from the context stack.

        Invoke after the element yields its last sample output
        for a given input sample.

        :Returns:
        -------
        type: dict
            Element context info.

        """
        return self._context.pop_element_context()

    @property
    def event_log(self) -> logging.Logger:
        """Get timeline event log for the current pipe execution context."""
        return self._timeline_event_log

    @property
    def state(self):
        """Lifecycle state of the pipe element."""
        return self._state

    def start(self):
        """Only sourcing elements (first in a pipeline) need to override.

        It is invoked once when the enclosing pipeline is started. It should
        continue to run until the corresponding stop() method is invoked on the
        same object from a separate pipeline lifecycle manager thread.

        It is recommended for overriding methods to invoke this base method
        via super().start() before proceeding with custom logic.

        """
        self._state = PIPE_STATE_RUNNING

    def heal(self):  # pragma: no cover
        """Override with adequate implementation of a healing procedure.

        heal() is invoked by a lifecycle manager when its determined that
        the element does not respond within reasonable timeframe.
        This can happen for example if the element depends on external IO
        resources, which become unavailable for an extended period of time.

        The healing procedure should be considered a chance to recover or find
        an alternative way to proceed.

        If heal does not reset the pipe element back to a responsive state,
        it is likely that the lifecycle manager will stop the
        element and its ecnlosing pipeline.

        """

    def healthcheck(self):
        """Check the health of this element.

        :returns: (timestamp, status) tuple with most recent heartbeat
        timestamp and health status code ('OK' normally).
        """
        status = "OK"  # At some point status may carry richer information
        return self._latest_heartbeat, status

    def heartbeat(self):
        """Set the heartbeat timestamp to time.monotonic().

        Keeping the heartbeat timestamp current informs
        the lifecycle manager that this element is functioning
        well.

        """
        now = time.monotonic()
        self._latest_heartbeat = now

    def stop(self):
        """Receive stop signal and act accordingly.

        Subclasses implementing sourcing elements should override this method
        by first invoking their super class implementation and then running
        through steps specific to stopping their ongoing sample processing.

        """
        self._state = PIPE_STATE_STOPPED

    def connect_to_next_element(self, next_element=None):
        """Connect this element to the next element in the pipe.

        Subclasses should not override this method.

        """
        assert next_element
        assert isinstance(next_element, PipeElement)
        self._next_element = next_element

    def receive_next_sample(self, **sample):
        """Receive next sample from a connected previous element if applicable.

        All pipeline elements except for the first (sourcing) element
        in the pipeline will depend on this method to feed them with new
        samples to process.

        Subclasses should not override this method.

        :Parameters:
        ----------
        **sample : dict
            A dict of (key, value) pairs that represent the sample.
            It is left to specialized implementations of PipeElement to specify
            their in/out sample formats and enforce compatibility with
            adjacent connected pipe elements.

        """
        self.heartbeat()
        for processed_sample in self.process_sample(**sample):
            if self._next_element:
                if processed_sample:
                    self._next_element.receive_next_sample(**processed_sample)
                else:
                    self._next_element.receive_next_sample()
                self.heartbeat()

    def process_sample(self, **sample) -> Iterable[dict]:
        """Override and implement as generator.

        Invoked by receive_next_sample() when the previous element
        (or pipeline source) feeds another data input sample.

        Implementing subclasses should process input samples and yield
        output samples for the next element in the pipeline.

        :Parameters:
        ----------
        **sample : dict
            A dict of (key, value) pairs that represent the sample.
            It is left to specialized implementations of PipeElement to specify
            their in/out sample formats and enforce compatibility with
            adjacent connected pipe elements.

        :Returns:
        ----------
        processed_sample: Iterable[dict]
            Generates processed samples to be passed on
            to the next pipeline element.

        """
        yield sample


class HealthChecker(PipeElement):
    """Monitor overall pipeline throughput health.

    Attaches at the end of a pipeline to monitor its health status
    based on received output samples and their frequency.
    """

    def __init__(self, health_status_callback=None, **kwargs):
        """Create instance given health status callback.

        The health status call back will be invoked each time
        the sample_process method is invoked.

        :Parameters:
        ----------
        health_status_callback : function
            Method that is expected to measure the overall pipeline throughput
            health.
        """
        super().__init__(**kwargs)
        assert health_status_callback
        self._health_status_callback = health_status_callback

    def process_sample(self, **sample):
        """Call health callback and pass on sample as is."""
        log.debug(
            "%s received sample from the connected " "preceding pipe element.",
            self.__class__.__name__,
        )
        self._health_status_callback()
        yield sample

Sub-modules

ambianic.pipeline.ai
ambianic.pipeline.avsource
ambianic.pipeline.interpreter

Ambianic pipeline interpreter module.

ambianic.pipeline.pipeline_event

Pipeline event timeline read/write/search functions.

ambianic.pipeline.save_event

Pipeline sample storage elements.

Classes

class HealthChecker (health_status_callback=None, **kwargs)

Monitor overall pipeline throughput health.

Attaches at the end of a pipeline to monitor its health status based on received output samples and their frequency.

Create instance given health status callback.

The health status call back will be invoked each time the sample_process method is invoked.

:Parameters:

health_status_callback : function Method that is expected to measure the overall pipeline throughput health.

Expand source code
class HealthChecker(PipeElement):
    """Monitor overall pipeline throughput health.

    Attaches at the end of a pipeline to monitor its health status
    based on received output samples and their frequency.
    """

    def __init__(self, health_status_callback=None, **kwargs):
        """Create instance given health status callback.

        The health status call back will be invoked each time
        the sample_process method is invoked.

        :Parameters:
        ----------
        health_status_callback : function
            Method that is expected to measure the overall pipeline throughput
            health.
        """
        super().__init__(**kwargs)
        assert health_status_callback
        self._health_status_callback = health_status_callback

    def process_sample(self, **sample):
        """Call health callback and pass on sample as is."""
        log.debug(
            "%s received sample from the connected " "preceding pipe element.",
            self.__class__.__name__,
        )
        self._health_status_callback()
        yield sample

Ancestors

Methods

def process_sample(self, **sample)

Call health callback and pass on sample as is.

Expand source code
def process_sample(self, **sample):
    """Call health callback and pass on sample as is."""
    log.debug(
        "%s received sample from the connected " "preceding pipe element.",
        self.__class__.__name__,
    )
    self._health_status_callback()
    yield sample

Inherited members

class PipeElement (element_name=None, context: PipelineContext = None, event_log: logging.Logger = None, **kwargs)

The basic building block of an Ambianic pipeline.

Create a PipeElement instance.

Expand source code
class PipeElement(ManagedService):
    """The basic building block of an Ambianic pipeline."""

    def __init__(
        self,
        element_name=None,
        context: PipelineContext = None,
        event_log: logging.Logger = None,
        **kwargs
    ):
        """Create a PipeElement instance."""
        super().__init__()
        self._name = element_name
        self._state = PIPE_STATE_STOPPED
        self._next_element = None
        self._latest_heartbeat = time.monotonic()
        self._context = context
        self._timeline_event_log = event_log

    @property
    def name(self) -> str:
        """Return this element's reference name in pipeline definitions."""
        return self._name

    @property
    def context(self) -> PipelineContext:
        """Pipeline execution context.

        :Returns:
        -------
        type: PipelineContext
            pipeline execution context

        """
        return self._context

    def push_context(self, element_context: dict = None):
        """Push this element information to the context stack.

        Invoke before the element yields its first sample output
        for a given input sample.

        :Parameters:
        ----------
        element_context : dict
            Contextual info about this element.

        """
        if element_context is None:
            element_context = {}
        element_context["class"] = self.__class__.__name__
        self._context.push_element_context(element_context)

    def pop_context(self) -> dict:
        """Pop element information from the context stack.

        Invoke after the element yields its last sample output
        for a given input sample.

        :Returns:
        -------
        type: dict
            Element context info.

        """
        return self._context.pop_element_context()

    @property
    def event_log(self) -> logging.Logger:
        """Get timeline event log for the current pipe execution context."""
        return self._timeline_event_log

    @property
    def state(self):
        """Lifecycle state of the pipe element."""
        return self._state

    def start(self):
        """Only sourcing elements (first in a pipeline) need to override.

        It is invoked once when the enclosing pipeline is started. It should
        continue to run until the corresponding stop() method is invoked on the
        same object from a separate pipeline lifecycle manager thread.

        It is recommended for overriding methods to invoke this base method
        via super().start() before proceeding with custom logic.

        """
        self._state = PIPE_STATE_RUNNING

    def heal(self):  # pragma: no cover
        """Override with adequate implementation of a healing procedure.

        heal() is invoked by a lifecycle manager when its determined that
        the element does not respond within reasonable timeframe.
        This can happen for example if the element depends on external IO
        resources, which become unavailable for an extended period of time.

        The healing procedure should be considered a chance to recover or find
        an alternative way to proceed.

        If heal does not reset the pipe element back to a responsive state,
        it is likely that the lifecycle manager will stop the
        element and its ecnlosing pipeline.

        """

    def healthcheck(self):
        """Check the health of this element.

        :returns: (timestamp, status) tuple with most recent heartbeat
        timestamp and health status code ('OK' normally).
        """
        status = "OK"  # At some point status may carry richer information
        return self._latest_heartbeat, status

    def heartbeat(self):
        """Set the heartbeat timestamp to time.monotonic().

        Keeping the heartbeat timestamp current informs
        the lifecycle manager that this element is functioning
        well.

        """
        now = time.monotonic()
        self._latest_heartbeat = now

    def stop(self):
        """Receive stop signal and act accordingly.

        Subclasses implementing sourcing elements should override this method
        by first invoking their super class implementation and then running
        through steps specific to stopping their ongoing sample processing.

        """
        self._state = PIPE_STATE_STOPPED

    def connect_to_next_element(self, next_element=None):
        """Connect this element to the next element in the pipe.

        Subclasses should not override this method.

        """
        assert next_element
        assert isinstance(next_element, PipeElement)
        self._next_element = next_element

    def receive_next_sample(self, **sample):
        """Receive next sample from a connected previous element if applicable.

        All pipeline elements except for the first (sourcing) element
        in the pipeline will depend on this method to feed them with new
        samples to process.

        Subclasses should not override this method.

        :Parameters:
        ----------
        **sample : dict
            A dict of (key, value) pairs that represent the sample.
            It is left to specialized implementations of PipeElement to specify
            their in/out sample formats and enforce compatibility with
            adjacent connected pipe elements.

        """
        self.heartbeat()
        for processed_sample in self.process_sample(**sample):
            if self._next_element:
                if processed_sample:
                    self._next_element.receive_next_sample(**processed_sample)
                else:
                    self._next_element.receive_next_sample()
                self.heartbeat()

    def process_sample(self, **sample) -> Iterable[dict]:
        """Override and implement as generator.

        Invoked by receive_next_sample() when the previous element
        (or pipeline source) feeds another data input sample.

        Implementing subclasses should process input samples and yield
        output samples for the next element in the pipeline.

        :Parameters:
        ----------
        **sample : dict
            A dict of (key, value) pairs that represent the sample.
            It is left to specialized implementations of PipeElement to specify
            their in/out sample formats and enforce compatibility with
            adjacent connected pipe elements.

        :Returns:
        ----------
        processed_sample: Iterable[dict]
            Generates processed samples to be passed on
            to the next pipeline element.

        """
        yield sample

Ancestors

Subclasses

Instance variables

var contextPipelineContext

Pipeline execution context.

:Returns:

type: PipelineContext pipeline execution context

Expand source code
@property
def context(self) -> PipelineContext:
    """Pipeline execution context.

    :Returns:
    -------
    type: PipelineContext
        pipeline execution context

    """
    return self._context
var event_log : logging.Logger

Get timeline event log for the current pipe execution context.

Expand source code
@property
def event_log(self) -> logging.Logger:
    """Get timeline event log for the current pipe execution context."""
    return self._timeline_event_log
var name : str

Return this element's reference name in pipeline definitions.

Expand source code
@property
def name(self) -> str:
    """Return this element's reference name in pipeline definitions."""
    return self._name
var state

Lifecycle state of the pipe element.

Expand source code
@property
def state(self):
    """Lifecycle state of the pipe element."""
    return self._state

Methods

def connect_to_next_element(self, next_element=None)

Connect this element to the next element in the pipe.

Subclasses should not override this method.

Expand source code
def connect_to_next_element(self, next_element=None):
    """Connect this element to the next element in the pipe.

    Subclasses should not override this method.

    """
    assert next_element
    assert isinstance(next_element, PipeElement)
    self._next_element = next_element
def heal(self)

Override with adequate implementation of a healing procedure.

heal() is invoked by a lifecycle manager when its determined that the element does not respond within reasonable timeframe. This can happen for example if the element depends on external IO resources, which become unavailable for an extended period of time.

The healing procedure should be considered a chance to recover or find an alternative way to proceed.

If heal does not reset the pipe element back to a responsive state, it is likely that the lifecycle manager will stop the element and its ecnlosing pipeline.

Expand source code
def heal(self):  # pragma: no cover
    """Override with adequate implementation of a healing procedure.

    heal() is invoked by a lifecycle manager when its determined that
    the element does not respond within reasonable timeframe.
    This can happen for example if the element depends on external IO
    resources, which become unavailable for an extended period of time.

    The healing procedure should be considered a chance to recover or find
    an alternative way to proceed.

    If heal does not reset the pipe element back to a responsive state,
    it is likely that the lifecycle manager will stop the
    element and its ecnlosing pipeline.

    """
def healthcheck(self)

Check the health of this element.

:returns: (timestamp, status) tuple with most recent heartbeat timestamp and health status code ('OK' normally).

Expand source code
def healthcheck(self):
    """Check the health of this element.

    :returns: (timestamp, status) tuple with most recent heartbeat
    timestamp and health status code ('OK' normally).
    """
    status = "OK"  # At some point status may carry richer information
    return self._latest_heartbeat, status
def heartbeat(self)

Set the heartbeat timestamp to time.monotonic().

Keeping the heartbeat timestamp current informs the lifecycle manager that this element is functioning well.

Expand source code
def heartbeat(self):
    """Set the heartbeat timestamp to time.monotonic().

    Keeping the heartbeat timestamp current informs
    the lifecycle manager that this element is functioning
    well.

    """
    now = time.monotonic()
    self._latest_heartbeat = now
def pop_context(self) ‑> dict

Pop element information from the context stack.

Invoke after the element yields its last sample output for a given input sample.

:Returns:

type: dict Element context info.

Expand source code
def pop_context(self) -> dict:
    """Pop element information from the context stack.

    Invoke after the element yields its last sample output
    for a given input sample.

    :Returns:
    -------
    type: dict
        Element context info.

    """
    return self._context.pop_element_context()
def process_sample(self, **sample) ‑> Iterable[dict]

Override and implement as generator.

Invoked by receive_next_sample() when the previous element (or pipeline source) feeds another data input sample.

Implementing subclasses should process input samples and yield output samples for the next element in the pipeline.

:Parameters:

**sample : dict A dict of (key, value) pairs that represent the sample. It is left to specialized implementations of PipeElement to specify their in/out sample formats and enforce compatibility with adjacent connected pipe elements.

:Returns:

processed_sample: Iterable[dict] Generates processed samples to be passed on to the next pipeline element.

Expand source code
def process_sample(self, **sample) -> Iterable[dict]:
    """Override and implement as generator.

    Invoked by receive_next_sample() when the previous element
    (or pipeline source) feeds another data input sample.

    Implementing subclasses should process input samples and yield
    output samples for the next element in the pipeline.

    :Parameters:
    ----------
    **sample : dict
        A dict of (key, value) pairs that represent the sample.
        It is left to specialized implementations of PipeElement to specify
        their in/out sample formats and enforce compatibility with
        adjacent connected pipe elements.

    :Returns:
    ----------
    processed_sample: Iterable[dict]
        Generates processed samples to be passed on
        to the next pipeline element.

    """
    yield sample
def push_context(self, element_context: dict = None)

Push this element information to the context stack.

Invoke before the element yields its first sample output for a given input sample.

:Parameters:

element_context : dict Contextual info about this element.

Expand source code
def push_context(self, element_context: dict = None):
    """Push this element information to the context stack.

    Invoke before the element yields its first sample output
    for a given input sample.

    :Parameters:
    ----------
    element_context : dict
        Contextual info about this element.

    """
    if element_context is None:
        element_context = {}
    element_context["class"] = self.__class__.__name__
    self._context.push_element_context(element_context)
def receive_next_sample(self, **sample)

Receive next sample from a connected previous element if applicable.

All pipeline elements except for the first (sourcing) element in the pipeline will depend on this method to feed them with new samples to process.

Subclasses should not override this method.

:Parameters:

**sample : dict A dict of (key, value) pairs that represent the sample. It is left to specialized implementations of PipeElement to specify their in/out sample formats and enforce compatibility with adjacent connected pipe elements.

Expand source code
def receive_next_sample(self, **sample):
    """Receive next sample from a connected previous element if applicable.

    All pipeline elements except for the first (sourcing) element
    in the pipeline will depend on this method to feed them with new
    samples to process.

    Subclasses should not override this method.

    :Parameters:
    ----------
    **sample : dict
        A dict of (key, value) pairs that represent the sample.
        It is left to specialized implementations of PipeElement to specify
        their in/out sample formats and enforce compatibility with
        adjacent connected pipe elements.

    """
    self.heartbeat()
    for processed_sample in self.process_sample(**sample):
        if self._next_element:
            if processed_sample:
                self._next_element.receive_next_sample(**processed_sample)
            else:
                self._next_element.receive_next_sample()
            self.heartbeat()
def start(self)

Only sourcing elements (first in a pipeline) need to override.

It is invoked once when the enclosing pipeline is started. It should continue to run until the corresponding stop() method is invoked on the same object from a separate pipeline lifecycle manager thread.

It is recommended for overriding methods to invoke this base method via super().start() before proceeding with custom logic.

Expand source code
def start(self):
    """Only sourcing elements (first in a pipeline) need to override.

    It is invoked once when the enclosing pipeline is started. It should
    continue to run until the corresponding stop() method is invoked on the
    same object from a separate pipeline lifecycle manager thread.

    It is recommended for overriding methods to invoke this base method
    via super().start() before proceeding with custom logic.

    """
    self._state = PIPE_STATE_RUNNING
def stop(self)

Receive stop signal and act accordingly.

Subclasses implementing sourcing elements should override this method by first invoking their super class implementation and then running through steps specific to stopping their ongoing sample processing.

Expand source code
def stop(self):
    """Receive stop signal and act accordingly.

    Subclasses implementing sourcing elements should override this method
    by first invoking their super class implementation and then running
    through steps specific to stopping their ongoing sample processing.

    """
    self._state = PIPE_STATE_STOPPED